From d019dbddcf2dae57a3b8313f16a3f891914832e8 Mon Sep 17 00:00:00 2001 From: Isaac Yang Date: Wed, 27 Apr 2022 07:54:35 -0700 Subject: [PATCH] Tests run with PASSED (#427) * Tests run with PASSED * Server/client/overseer working * Action triggered * Log shows correct behavior * Fix site launcher stop server * With some hacks, the ha test run passed * Resolve four conflicts from new dev-2.1 commits * Rename yaml file * Make server restart later * Remove threads * Test code exits without being stuck * Resolve admin controller conflicts * Add back internal tests * Fix issues after merging with dev-2.1 --- nvflare/poc/admin/startup/fed_admin_HA.json | 4 +- nvflare/poc/client/startup/fed_client_HA.json | 27 +- nvflare/poc/overseer/startup/start.sh | 2 +- nvflare/poc/server/startup/fed_server_HA.json | 38 ++- tests/integration_test/admin_controller.py | 231 ++++++++++++++- tests/integration_test/site_launcher.py | 266 ++++++++++++++---- tests/integration_test/system_test.py | 24 +- tests/integration_test/test_ha.py | 257 +++++++++++++++++ tests/integration_test/test_ha.yml | 19 ++ tests/integration_test/utils.py | 1 + 10 files changed, 792 insertions(+), 77 deletions(-) create mode 100644 tests/integration_test/test_ha.py create mode 100644 tests/integration_test/test_ha.yml diff --git a/nvflare/poc/admin/startup/fed_admin_HA.json b/nvflare/poc/admin/startup/fed_admin_HA.json index b10ea3c2c1d..3f29eb74a29 100644 --- a/nvflare/poc/admin/startup/fed_admin_HA.json +++ b/nvflare/poc/admin/startup/fed_admin_HA.json @@ -10,10 +10,10 @@ "path": "nvflare.ha.overseer_agent.HttpOverseerAgent", "args": { "role": "admin", - "overseer_end_point": "http://localhost:6000/api/v1", + "overseer_end_point": "http://localhost:5000/api/v1", "project": "example_project", "name": "admin" } } } -} +} \ No newline at end of file diff --git a/nvflare/poc/client/startup/fed_client_HA.json b/nvflare/poc/client/startup/fed_client_HA.json index a151f529d1f..6305dcdc3e3 100644 --- a/nvflare/poc/client/startup/fed_client_HA.json +++ b/nvflare/poc/client/startup/fed_client_HA.json @@ -27,10 +27,33 @@ "overseer_agent": { "path": "nvflare.ha.overseer_agent.HttpOverseerAgent", "args": { - "overseer_end_point": "http://localhost:6000/api/v1", + "overseer_end_point": "http://localhost:5000/api/v1", "project": "example_project", "role": "client", "name": "site1" } - } + }, + "components": [ + { + "id": "resource_manager", + "path": "nvflare.apis.impl.list_resource_manager.ListResourceManager", + "args": { + "resources": { + "gpu": [ + 0, + 1, + 2, + 3 + ] + } + } + }, + { + "id": "resource_consumer", + "path": "nvflare.apis.impl.gpu_resource_consumer.GPUResourceConsumer", + "args": { + "gpu_resource_key": "gpu" + } + } + ] } \ No newline at end of file diff --git a/nvflare/poc/overseer/startup/start.sh b/nvflare/poc/overseer/startup/start.sh index b3717e27c46..72aacd865f1 100755 --- a/nvflare/poc/overseer/startup/start.sh +++ b/nvflare/poc/overseer/startup/start.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -FLASK_APP=nvflare.ha.overseer.overseer flask run --host=localhost --port=6000 +FLASK_APP=nvflare.ha.overseer.overseer flask run --host=localhost --port=5000 diff --git a/nvflare/poc/server/startup/fed_server_HA.json b/nvflare/poc/server/startup/fed_server_HA.json index fb6742332f6..b4759d904c8 100644 --- a/nvflare/poc/server/startup/fed_server_HA.json +++ b/nvflare/poc/server/startup/fed_server_HA.json @@ -42,7 +42,7 @@ "overseer_agent": { "path": "nvflare.ha.overseer_agent.HttpOverseerAgent", "args": { - "overseer_end_point": "http://localhost:6000/api/v1", + "overseer_end_point": "http://localhost:5000/api/v1", "project": "example_project", "role": "server", "name": "localhost", @@ -50,5 +50,39 @@ "admin_port": "8003", "heartbeat_interval": 6 } - } + }, + "components": [ + { + "id": "job_scheduler", + "path": "nvflare.apis.impl.job_scheduler.DefaultJobScheduler", + "args": { + "max_jobs": 4 + } + }, + { + "id": "job_manager", + "path": "nvflare.apis.impl.job_def_manager.SimpleJobDefManager", + "args": { + "uri_root": "/tmp/jobs-storage", + "job_store_id": "job_store" + } + }, + { + "id": "job_store", + "name": "FilesystemStorage", + "args": {} + }, + { + "id": "study_manager", + "path": "nvflare.apis.impl.study_manager.StudyManager", + "args": { + "study_store_id": "study_store" + } + }, + { + "id": "study_store", + "path": "nvflare.app_common.storages.filesystem_storage.FilesystemStorage", + "args": {} + } + ] } \ No newline at end of file diff --git a/tests/integration_test/admin_controller.py b/tests/integration_test/admin_controller.py index aa1c5a21037..f6e9789bff4 100644 --- a/tests/integration_test/admin_controller.py +++ b/tests/integration_test/admin_controller.py @@ -13,8 +13,10 @@ # limitations under the License. import logging +import re import time +from nvflare.ha.overseer_agent import HttpOverseerAgent from nvflare.fuel.hci.client.api_status import APIStatus from nvflare.fuel.hci.client.fl_admin_api import FLAdminAPI from nvflare.fuel.hci.client.fl_admin_api_constants import FLDetailKey @@ -23,7 +25,7 @@ class AdminController: - def __init__(self, jobs_root_dir, poll_period=10): + def __init__(self, jobs_root_dir, ha, poll_period=10): """ This class runs an app on a given server and clients. """ @@ -32,10 +34,17 @@ def __init__(self, jobs_root_dir, poll_period=10): self.jobs_root_dir = jobs_root_dir self.poll_period = poll_period + if ha: + overseer_agent = HttpOverseerAgent( + role="admin", overseer_end_point="http://127.0.0.1:5000/api/v1", project="example_project", name="admin" + ) + else: + overseer_agent = DummyOverseerAgent(sp_end_point="localhost:8002:8003") + self.admin_api: FLAdminAPI = FLAdminAPI( upload_dir=self.jobs_root_dir, download_dir=self.jobs_root_dir, - overseer_agent=DummyOverseerAgent(sp_end_point="localhost:8002:8003"), + overseer_agent=overseer_agent, poc=True, debug=False, user_name="admin", @@ -73,6 +82,9 @@ def get_run_data(self): return run_data + def get_stats(self, target): + return self.admin_api.show_stats(self.job_id, target) + def ensure_clients_started(self, num_clients): if not self.admin_api: return False @@ -132,7 +144,7 @@ def submit_job(self, job_name) -> bool: if response["status"] != APIStatus.SUCCESS: raise RuntimeError(f"submit_job failed: {response}") self.job_id = response["details"]["job_id"] - + self.last_job_name = job_name return True def wait_for_job_done(self): @@ -152,6 +164,219 @@ def wait_for_job_done(self): continue training_done = True + def run_app_ha(self, site_launcher, ha_test): + run_state = {"workflow": None, "task": None, "round_number": None, "run_finished": None} + + last_read_line = 0 + event = 0 + ha_events = ha_test["events"] + event_test_status = [False for _ in range(len(ha_events))] # whether event has been successfully triggered + + i = 0 + training_done = False + while not training_done: + i += 1 + + server_logs = self.admin_api.cat_target(TargetType.SERVER, file="log.txt")["details"][ + "message" + ].splitlines()[last_read_line:] + last_read_line = len(server_logs) + last_read_line + server_logs_string = "\n".join(server_logs) + + stats = self.get_stats(TargetType.SERVER) + + # update run_state + changed, wfs, run_state = self.process_stats(stats, run_state) + + if changed or i % (10 / self.poll_period) == 0: + i = 0 + print("STATS: ", stats) + self.print_state(ha_test, run_state) + + # check if event is triggered -> then execute the corresponding actions + if event <= len(ha_events) - 1 and not event_test_status[event]: + event_trigger = [] + + if isinstance(ha_events[event]["trigger"], dict): + for k, v in ha_events[event]["trigger"].items(): + if k == "workflow": + print(run_state) + print(wfs) + event_trigger.append(run_state[k] == wfs[v][0]) + else: + event_trigger.append(run_state[k] == v) + elif isinstance(ha_events[event]["trigger"], str) and ha_events[event]["trigger"] in server_logs_string: + event_trigger.append(True) + + if event_trigger and all(event_trigger): + print(f"EVENT TRIGGERED: {ha_events[event]['trigger']}") + event_test_status[event] = True + self.execute_actions(site_launcher, ha_events[event]["actions"]) + continue + + response = self.admin_api.check_status(target_type=TargetType.SERVER) + if response and "status" in response and response["status"] != APIStatus.SUCCESS: + print("NO ACTIVE SERVER!") + + elif ( + response and "status" in response and "details" in response and response["status"] == APIStatus.SUCCESS + ): + + # compare run_state to expected result_state from the test case + if event <= len(ha_events) - 1 and event_test_status[event] and response["status"] == APIStatus.SUCCESS: + result_state = ha_events[event]["result_state"] + if any(list(run_state.values())): + if result_state == "unchanged": + result_state = ha_events[event]["trigger"] + for k, v in result_state.items(): + if k == "workflow": + print(f"ASSERT Current {k}: {run_state[k]} == Expected {k}: {wfs[v][0]}") + assert run_state[k] == wfs[v][0] + else: + print(f"ASSERT Current {k}: {run_state[k]} == Expected {k}: {v}") + assert run_state[k] == v + print("\n") + event += 1 + + # check if run is stopped + if ( + FLDetailKey.SERVER_ENGINE_STATUS in response["details"] + and response["details"][FLDetailKey.SERVER_ENGINE_STATUS] == "stopped" + ): + response = self.admin_api.check_status(target_type=TargetType.CLIENT) + if response["status"] != APIStatus.SUCCESS: + print(f"CHECK status failed: {response}") + for row in response["details"]["client_statuses"]: + if row[3] != "stopped": + continue + training_done = True + time.sleep(self.poll_period) + + assert all(event_test_status), "Test failed: not all test events were triggered" + + def execute_actions(self, site_launcher, actions): + for action in actions: + tokens = action.split(" ") + command = tokens[0] + args = tokens[1:] + + print(f"ACTION: {action}") + + if command == "sleep": + time.sleep(int(args[0])) + + elif command == "kill": + if args[0] == "server": + active_server_id = site_launcher.get_active_server_id(self.admin_api.port) + site_launcher.stop_server(active_server_id) + elif args[0] == "overseer": + site_launcher.stop_overseer() + elif args[0] == "client": # TODO fix client kill & restart during run + if len(args) == 2: + client_id = int(args[1]) + else: + client_id = list(site_launcher.client_properties.keys())[0] + self.admin_api.remove_client([site_launcher.client_properties[client_id]["name"]]) + site_launcher.stop_client(client_id) + + elif command == "restart": + if args[0] == "server": + if len(args) == 2: + server_id = int(args[1]) + else: + print(site_launcher.server_properties) + server_id = list(site_launcher.server_properties.keys())[0] + site_launcher.start_server() + elif args[0] == "overseer": + site_launcher.start_overseer() + elif args[0] == "client": # TODO fix client kill & restart during run + if len(args) == 2: + client_id = int(args[1]) + else: + client_id = list(site_launcher.client_properties.keys())[0] + site_launcher.start_client(client_id) + + def print_state(self, ha_test, state): + print("\n") + print(f"Job name: {self.last_job_name}") + print(f"HA test case: {ha_test['name']}") + print("-" * 30) + for k, v in state.items(): + print(f"{k}: {v}") + print("-" * 30 + "\n") + + def process_stats(self, stats, run_state): + # extract run_state from stats + # {'status': , + # 'details': { + # 'message': { + # 'ScatterAndGather': { + # 'tasks': {'train': []}, + # 'phase': 'train', + # 'current_round': 0, + # 'num_rounds': 2}, + # 'CrossSiteModelEval': + # {'tasks': {}}, + # 'ServerRunner': { + # 'run_number': 1, + # 'status': 'started', + # 'workflow': 'scatter_and_gather' + # } + # } + # }, + # 'raw': {'time': '2022-04-04 15:13:09.367350', 'data': [{'type': 'dict', 'data': {'ScatterAndGather': {'tasks': {'train': []}, 'phase': 'train', 'current_round': 0, 'num_rounds': 2}, 'CrossSiteModelEval': {'tasks': {}}, 'ServerRunner': {'run_number': 1, 'status': 'started', 'workflow': 'scatter_and_gather'}}}], 'status': }} + wfs = {} + prev_run_state = run_state.copy() + print(f"run_state {prev_run_state}", flush=True) + print(f"stats {stats}", flush=True) + if stats and "status" in stats and "details" in stats and stats["status"] == APIStatus.SUCCESS: + if "message" in stats["details"]: + wfs = stats["details"]["message"] + if wfs: + run_state["workflow"], run_state["task"], run_state["round_number"] = None, None, None + for item in wfs: + if wfs[item].get("tasks"): + run_state["workflow"] = item + run_state["task"] = list(wfs[item].get("tasks").keys())[0] + if "current_round" in wfs[item]: + run_state["round_number"] = wfs[item]["current_round"] + # if wfs[item].get("workflow"): + # workflow = wfs[item].get("workflow") + + if stats["status"] == APIStatus.SUCCESS: + run_state["run_finished"] = "ServerRunner" not in wfs.keys() + else: + run_state["run_finished"] = False + + wfs = [wf for wf in list(wfs.items()) if "tasks" in wf[1]] + + return run_state != prev_run_state, wfs, run_state + + def process_logs(self, logs, run_state): + # regex to extract run_state from logs + + prev_run_state = run_state.copy() + + # matches latest instance of "wf={workflow}," or "wf={workflow}]" + match = re.search("(wf=)([^\,\]]+)(\,|\])(?!.*(wf=)([^\,\]]+)(\,|\]))", logs) + if match: + run_state["workflow"] = match.group(2) + + # matches latest instance of "task_name={validate}, or "task_name={validate}" + match = re.search("(task_name=)([^\,\]]+)(\,|\])(?!.*(task_name=)([^\,\]]+)(\,|\]))", logs) + if match: + run_state["task"] = match.group(2) + + # matches latest instance of "Round {0-999} started." + match = re.search( + "Round ([0-9]|[1-9][0-9]|[1-9][0-9][0-9]) started\.(?!.*Round ([0-9]|[1-9][0-9]|[1-9][0-9][0-9]) started\.)", + logs, + ) + if match: + run_state["round_number"] = int(match.group(1)) + + return run_state != prev_run_state, run_state + def finalize(self): self.admin_api.overseer_agent.end() self.admin_api.shutdown(target_type=TargetType.ALL) diff --git a/tests/integration_test/site_launcher.py b/tests/integration_test/site_launcher.py index dc645e26e8d..10526a5d8fa 100644 --- a/tests/integration_test/site_launcher.py +++ b/tests/integration_test/site_launcher.py @@ -19,6 +19,7 @@ import signal import subprocess import sys +import time import tempfile import threading import traceback @@ -38,17 +39,27 @@ def process_logs(log_path, pid): class SiteLauncher: - def __init__(self, poc_directory, server_dir_name="server", client_dir_name="client", admin_dir_name="admin"): + def __init__( + self, + poc_directory, + ha=False, + overseer_dir_name="overseer", + server_dir_name="server", + client_dir_name="client", + admin_dir_name="admin", + ): """ This class sets up the test environment for a test. It will launch and keep track of servers and clients. """ super().__init__() self.original_poc_directory = poc_directory + self.overseer_dir_name = overseer_dir_name self.server_dir_name = server_dir_name self.client_dir_name = client_dir_name self.admin_dir_name = admin_dir_name + self.overseer_properties = {} self.server_properties = {} self.client_properties = {} @@ -68,19 +79,105 @@ def __init__(self, poc_directory, server_dir_name="server", client_dir_name="cli self.poc_directory = root_dir print(f"Using root dir: {root_dir}") - def start_server(self): - server_dir = os.path.join(self.poc_directory, self.server_dir_name) - log_path = os.path.join(server_dir, "log.txt") + if ha: + for name in ("admin", "server", "client"): + dir = os.path.join(self.poc_directory, f"{name}", "startup") + os.rename(os.path.join(dir, f"fed_{name}.json"), os.path.join(dir, f"fed_{name}_old.json")) + os.rename(os.path.join(dir, f"fed_{name}_HA.json"), os.path.join(dir, f"fed_{name}.json")) + + def start_overseer(self): + overseer_dir = os.path.join(self.poc_directory, self.overseer_dir_name) + log_path = os.path.join(overseer_dir, "log.txt") new_env = os.environ.copy() + # new_env.update({"FLASK_RUN_PORT": "6000"}) + # new_env.get("PYTHONPATH", "") + os.sep + "nvflare.ha.overseer.overseer"}) + command = f"{sys.executable} -m nvflare.ha.overseer.overseer" + + process = subprocess.Popen( + shlex.split(command, " "), + preexec_fn=os.setsid, + env=new_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + print("Starting overseer ...") + + # t = threading.Thread(target=process_logs, args=(log_path, process)) + # t.start() + + self.overseer_properties["path"] = overseer_dir + self.overseer_properties["process"] = process + self.overseer_properties["log_path"] = log_path + + def start_servers(self, n=1): + src_server_dir = os.path.join(self.poc_directory, self.server_dir_name) # Create upload directory - os.makedirs(os.path.join(server_dir, "transfer"), exist_ok=True) + os.makedirs(os.path.join(src_server_dir, "transfer"), exist_ok=True) + + if n == 1: + server_name = self.server_dir_name + + # Copy and create new directory + server_dir_name = os.path.join(self.poc_directory, server_name) + + # replace fed_server.json ports + fed_server_path = os.path.join(server_dir_name, "startup", "fed_server.json") + with open(fed_server_path, "r") as f: + fed_server_json = f.read() + + admin_port = "8003" + fed_server_json = fed_server_json.replace("8002", "8002").replace("8003", admin_port) + + with open(fed_server_path, "w") as f: + f.write(fed_server_json) + + self.start_server() + + else: + for i in range(n): + server_id = i + server_name = self.server_dir_name + f"_{server_id}" + + # Copy and create new directory + server_dir_name = os.path.join(self.poc_directory, server_name) + shutil.copytree(src_server_dir, server_dir_name) + + # replace fed_server.json ports + fed_server_path = os.path.join(server_dir_name, "startup", "fed_server.json") + with open(fed_server_path, "r") as f: + fed_server_json = f.read() + + admin_port = f"8{i}03" + fed_server_json = fed_server_json.replace("8002", f"8{i}02").replace("8003", admin_port) + + with open(fed_server_path, "w") as f: + f.write(fed_server_json) + + self.start_server(server_id) + time.sleep(5) + + def start_server(self, server_id=None): + if server_id is None: + server_name = self.server_dir_name + server_id = 0 + else: + server_name = self.server_dir_name + f"_{server_id}" + server_dir_name = os.path.join(self.poc_directory, server_name) + log_path = os.path.join(server_dir_name, "log.txt") + + self.server_properties[server_id] = {} + self.server_properties[server_id]["path"] = server_dir_name + self.server_properties[server_id]["name"] = server_name + self.server_properties[server_id]["port"] = f"8003" + self.server_properties[server_id]["log_path"] = log_path command = ( f"{sys.executable} -m nvflare.private.fed.app.server.server_train " - f"-m {server_dir} -s fed_server.json" + f"-m {server_dir_name} -s fed_server.json" f" --set secure_train=false config_folder=config" ) + new_env = os.environ.copy() process = subprocess.Popen( shlex.split(command, " "), preexec_fn=os.setsid, @@ -88,14 +185,17 @@ def start_server(self): stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) - print("Starting server ...") + print(f"start server with {command}: {process.pid}") + self.server_properties[server_id]["process"] = process + + print(f"Starting server {server_id}...") - t = threading.Thread(target=process_logs, args=(log_path, process)) - t.start() + # t = threading.Thread(target=process_logs, args=(log_path, process)) + # t.start() - self.server_properties["path"] = server_dir - self.server_properties["process"] = process - self.server_properties["log_path"] = log_path + # self.server_properties["path"] = server_dir_name + # self.server_properties["process"] = process + # self.server_properties["log_path"] = log_path def start_clients(self, n=2): # Make sure that previous clients are killed @@ -113,46 +213,67 @@ def start_clients(self, n=2): # Copy and create new directory client_dir_name = os.path.join(self.poc_directory, client_name) shutil.copytree(src_client_directory, client_dir_name) - log_path = os.path.join(client_dir_name, "log.txt") - - self.client_properties[client_id] = {} - self.client_properties[client_id]["path"] = client_dir_name - self.client_properties[client_id]["name"] = client_name - self.client_properties[client_id]["log_path"] = log_path - - # Launch the new client - client_startup_dir = os.path.join(client_dir_name) - command = ( - f"{sys.executable} -m nvflare.private.fed.app.client.client_train -m " - f"{client_startup_dir} -s fed_client.json --set secure_train=false config_folder=config" - f" uid={client_name}" - ) - - process = subprocess.Popen( - shlex.split(command, " "), - preexec_fn=os.setsid, - env=new_env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - self.client_properties[client_id]["process"] = process - - print(f"Launched client {client_id} process.") - - t = threading.Thread(target=process_logs, args=(log_path, process)) - t.start() - - def get_server_data(self): - if "log_file" in self.server_properties: - self.server_properties["log_file"].flush() - - server_data = { - "server_path": self.server_properties["path"], - "server_process": self.server_properties["process"], - "server_name": self.server_dir_name, - "root_dir": self.poc_directory, - "log_path": self.server_properties["log_path"], - } + + self.start_client(client_id) + + def start_client(self, client_id): + client_name = self.client_dir_name + f"_{client_id}" + client_dir_name = os.path.join(self.poc_directory, client_name) + log_path = os.path.join(client_dir_name, "log.txt") + new_env = os.environ.copy() + + self.client_properties[client_id] = {} + self.client_properties[client_id]["path"] = client_dir_name + self.client_properties[client_id]["name"] = client_name + self.client_properties[client_id]["log_path"] = log_path + + # Launch the new client + client_startup_dir = os.path.join(client_dir_name) + command = ( + f"{sys.executable} -m nvflare.private.fed.app.client.client_train -m " + f"{client_startup_dir} -s fed_client.json --set secure_train=false config_folder=config" + f" uid={client_name}" + ) + + process = subprocess.Popen( + shlex.split(command, " "), + preexec_fn=os.setsid, + env=new_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + self.client_properties[client_id]["process"] = process + + print(f"Launched client {client_id} process.") + + # t = threading.Thread(target=process_logs, args=(log_path, process)) + # t.start() + + def get_active_server_id(self, port): + active_server_id = -1 + for i in range(len(self.server_properties)): + if ( + self.server_properties[i] + and "port" in self.server_properties[i] + and self.server_properties[i]["port"] == str(port) + ): + active_server_id = i + return active_server_id + + def get_server_data(self, server_id=0): + server_prop = self.server_properties[server_id] + if "log_file" in server_prop: + server_prop["log_file"].flush() + + server_data = {} + if server_prop: + server_data = { + "server_path": server_prop["path"], + "server_process": server_prop["process"], + "server_name": server_prop["name"], + "root_dir": self.poc_directory, + "log_path": server_prop["log_path"], + } return server_data @@ -165,24 +286,42 @@ def get_client_data(self): return client_data - def stop_server(self): + def stop_overseer(self): + try: + # Kill the process + if "process" in self.overseer_properties and self.overseer_properties["process"]: + os.killpg(self.overseer_properties["process"].pid, signal.SIGTERM) + + subprocess.call(["kill", str(self.overseer_properties["process"].pid)]) + self.overseer_properties["process"].wait() + print("Sent SIGTERM to overseer.") + else: + print("No overseer process.") + except Exception as e: + print(f"Exception in stopping overseer: {e.__str__()}") + finally: + self.overseer_properties.clear() + + def stop_server(self, server_id): + server_prop = self.server_properties[server_id] # Kill all clients first try: - self.stop_all_clients() + # clients will be stopped by a separate call + # self.stop_all_clients() # Kill the process - if "process" in self.server_properties and self.server_properties["process"]: - os.killpg(self.server_properties["process"].pid, signal.SIGTERM) + if "process" in server_prop and server_prop["process"]: + os.killpg(server_prop["process"].pid, signal.SIGTERM) - subprocess.call(["kill", str(self.server_properties["process"].pid)]) - self.server_properties["process"].wait() + subprocess.call(["kill", str(server_prop["process"].pid)]) + server_prop["process"].wait() print("Sent SIGTERM to server.") else: print("No server process.") except Exception as e: print(f"Exception in stopping server: {e.__str__()}") finally: - self.server_properties.clear() + server_prop.clear() def stop_client(self, client_id) -> bool: if client_id not in self.client_properties: @@ -213,8 +352,15 @@ def stop_all_clients(self): for client_id in client_ids: self.stop_client(client_id) + def stop_all_servers(self): + server_ids = list(self.server_properties.keys()) + for server_id in server_ids: + self.stop_server(server_id) + def stop_all_sites(self): - self.stop_server() + self.stop_all_clients() + self.stop_all_servers() + self.stop_overseer() def cleanup(self): print(f"Deleting temporary directory: {self.poc_directory}.") diff --git a/tests/integration_test/system_test.py b/tests/integration_test/system_test.py index aad991b9a6a..69b598751bb 100644 --- a/tests/integration_test/system_test.py +++ b/tests/integration_test/system_test.py @@ -25,6 +25,7 @@ from tests.integration_test.admin_controller import AdminController from tests.integration_test.site_launcher import SiteLauncher from tests.integration_test.utils import generate_job_dir_for_single_app_job +from tests.integration_test.test_ha import ha_tests def get_module_class_from_full_path(full_path): @@ -46,7 +47,8 @@ def read_yaml(yaml_file_path): params = [ # "./test_examples.yml", - "./test_internal.yml" + "./test_internal.yml", + "./test_ha.yml", ] @@ -80,6 +82,7 @@ def test_run_job_complete(self, system_config): n_clients = system_config["n_clients"] jobs_root_dir = system_config["jobs_root_dir"] snapshot_path = system_config["snapshot_path"] + ha = system_config.get("ha", False) try: print(f"cleanup = {cleanup}") print(f"poc = {poc}") @@ -87,9 +90,10 @@ def test_run_job_complete(self, system_config): print(f"jobs_root_dir = {jobs_root_dir}") print(f"snapshot_path = {snapshot_path}") - site_launcher = SiteLauncher(poc_directory=poc) - - site_launcher.start_server() + site_launcher = SiteLauncher(poc_directory=poc, ha=ha) + if ha: + site_launcher.start_overseer() + site_launcher.start_servers(1) site_launcher.start_clients(n=n_clients) # testing jobs @@ -108,7 +112,7 @@ def test_run_job_complete(self, system_config): test_jobs.append((x["app_name"], x["validators"])) generated_jobs.append(job) - admin_controller = AdminController(jobs_root_dir=jobs_root_dir) + admin_controller = AdminController(jobs_root_dir=jobs_root_dir, ha=ha) admin_controller.initialize() admin_controller.ensure_clients_started(num_clients=n_clients) @@ -125,10 +129,14 @@ def test_run_job_complete(self, system_config): admin_controller.submit_job(job_name=test_job) + time.sleep(15) print(f"Server status after job submission: {admin_controller.server_status()}.") print(f"Client status after job submission: {admin_controller.client_status()}") - admin_controller.wait_for_job_done() + if ha: + admin_controller.run_app_ha(site_launcher, ha_tests["pt"][0]) + else: + admin_controller.wait_for_job_done() server_data = site_launcher.get_server_data() client_data = site_launcher.get_client_data() @@ -143,6 +151,7 @@ def test_run_job_complete(self, system_config): app_validator_cls = getattr(importlib.import_module(module_name), class_name) app_validator = app_validator_cls() + print(server_data) app_validate_res = app_validator.validate_results( server_data=server_data, client_data=client_data, @@ -174,9 +183,10 @@ def test_run_job_complete(self, system_config): print(f"Exception in test run: {e.__str__()}") raise ValueError("Tests failed") from e finally: + if ha and admin_controller: + admin_controller.admin_api.overseer_agent.end() if admin_controller: admin_controller.finalize() - if site_launcher: site_launcher.stop_all_sites() diff --git a/tests/integration_test/test_ha.py b/tests/integration_test/test_ha.py new file mode 100644 index 00000000000..efbe6db88c9 --- /dev/null +++ b/tests/integration_test/test_ha.py @@ -0,0 +1,257 @@ +# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# HA test cases (referenced from NVFlare 2.1 Test Plan- High Availability) + +# supported trigger events: +# - match a string based on log output from server +# - state based on predefined tracked state variables (workflow, task, round_number, run_finished etc.) +# supported actions: +# - kill {target} [id] +# - sleep {seconds} +# - restart {target} [id] + + +# 14 upload a job, kill the server during training, restart it should pick up the work +test_case_14 = { + "name": "test_case_14", + "description": "upload a job, kill the server during training, restart it should pick up the work", + "setup": { + "n_servers": 1, + "n_clients": 2, + }, # TODO potentially add ability to start overseer & choose order of starting overseer/server/client? + "events": [ + { + "trigger": {"workflow": 0, "round_number": 1, "task": "train"}, + "actions": [ + "sleep 5", + "kill server", + "sleep 10", + "restart server", + ], + "result_state": "unchanged", # "unchanged" is same as trigger state + }, + ], +} + +# 15 upload a job, kill the server after we start training but no round is completed, restart it should pick up the work +test_case_15 = { + "name": "test_case_15", + "description": "upload a job, kill the server after we start training but no round is completed, restart it should pick up the work", + "setup": {"n_servers": 1, "n_clients": 2}, + "events": [ + { + "trigger": {"workflow": 0, "round_number": 0, "task": "train"}, + "actions": [ + "kill server", + "sleep 15", + "restart server", + ], + "result_state": {"workflow": 0, "round_number": 0, "task": "train"}, + }, + # { + # "trigger": "Round 1 started.", + # "actions": [ + # "sleep 5", + # "kill server", + # "sleep 10", + # "restart server", + # ], + # "result_state": {"workflow": 0, "round_number": 1, "task": "train"}, + # }, + ], +} + +# 16 upload a job, kill the server during sending models to clients, restart it should pick up the work +test_case_16 = { + "name": "test_case_16", + "description": "upload a job, kill the server during sending models to clients, restart it should pick up the work", + "setup": {"n_servers": 1, "n_clients": 2}, + "events": [ + { + "trigger": "sent task assignment to client", + "actions": ["kill server", "sleep 10", "restart server"], + "result_state": {"workflow": 0, "round_number": 0}, + }, + ], +} + +# 17 upload a job, kill the primary server during training, the second one should pick up the work +test_case_17 = { + "name": "test_case_17", + "description": "upload a job, kill the primary server during training, the second one should pick up the work", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + { + "trigger": {"workflow": 0, "round_number": 1, "task": "train"}, + "actions": [ + "sleep 5", + "kill server", + "sleep 10", + ], + "result_state": "unchanged", + }, + ], +} + +# 18 upload a job, kill the primary server after we start training but no round is completed, the second one should start from round 0 +test_case_18 = { + "name": "test_case_18", + "description": "upload a job, kill the primary server after we start training but no round is completed, the second one should start from round 0", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + { + "trigger": {"workflow": 0, "round_number": 0, "task": "train"}, + "actions": [ + "sleep 5", + "kill server", + "sleep 10", + ], + "result_state": "unchanged", + }, + ], +} + +# 19 upload a job, kill the primary server during sending models to clients, the second one should start from round 0 +test_case_19 = { + "name": "test_case_16", + "description": "upload a job, kill the primary server during sending models to clients, the second one should start from round 0", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + { + "trigger": "sent task assignment to client", + "actions": [ + "kill server", + "sleep 10", + ], + "result_state": {"workflow": 0, "round_number": 0, "task": "train"}, + }, + ], +} + +# 20 upload a job that has multiple workflows, kill the primary server when the first workflow is completed, the second one should start with the second workflow +test_case_20 = { + "name": "test_case_20", + "description": "upload a job that has multiple workflows, kill the primary server when the first workflow is completed, the second one should start with the second workflow", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + {"trigger": {"workflow": 1}, "actions": ["sleep 5", "kill server", "sleep 10"], "result_state": "unchanged"}, + ], +} + +# 21 upload a job, kill the OverSeer, since the primary server is still running, things should run into completion (if client sites already got the old SP info.) + +# 22 kill overseer with old information +test_case_22 = { + "name": "test_case_22", + "description": "kill overseer with old information", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + { + "trigger": {"workflow": 0, "task": "train"}, + "actions": ["sleep 5", "kill overseer", "sleep 10"], + "result_state": "unchanged", + }, + ], +} + +# 23 +# kill overseer with no information + +# 24 +# start overseer, client, then server + +# 25 +# overseer returns there is no hot endpoint available? -> fallback to previous SP + +# 26 +# overseer gives wrong information? -> keep trying + +# 27 +# sequence of kills and restarts of server + +# 28 +# kill client during training, change server, restart client +test_case_28 = { # FAILS: gets stuck "Communicator - ERROR - Action: getTask grpc communication error." + "name": "test_case_28", + "description": "kill client during training, change server, restart client", + "setup": {"n_servers": 2, "n_clients": 1}, + "events": [ + { + "trigger": {"workflow": 0, "round_number": 0, "task": "train"}, + "actions": ["sleep 10", "kill client", "sleep 10", "kill server", "sleep 15", "restart client"], + "result_state": "unchanged", + }, + ], +} + +# 28 +# kill client during training, restart client +test_case_28b = { # FAILS: gets stuck " - Communicator - ERROR - Action: getTask grpc communication error." + "name": "test_case_28b", + "description": "kill client during training, restart client", + "setup": {"n_servers": 2, "n_clients": 1}, + "events": [ + { + "trigger": {"workflow": 0, "round_number": 0, "task": "train"}, + "actions": ["sleep 10", "kill client", "sleep 15", "restart client"], + "result_state": "unchanged", + }, + ], +} + +# 29 +# overseer dies, will job submit fail? + +# 30 +# no overseer section in fed_client.json + +# 31 After the successful run is completed, kill the server, restart it, it should NOT go into training phase +test_case_31 = { # FAILS: still in cross validate workflow even though cross validate workflow finished (relook into "run_finished" state variable) + "name": "test_case_31", + "description": "After the successful run is completed, kill the server, restart it, it should NOT go into training phase", + "setup": {"n_servers": 1, "n_clients": 2}, + "events": [ + { + "trigger": {"run_finished": True}, + "actions": ["sleep 1", "kill server", "sleep 5", "restart server"], + "result_state": {"run_finished": True}, + }, + ], +} + +# 32 After the successful run is completed, kill the 1st server, the second server should NOT go into training phase +test_case_32 = { # FAILS + "name": "test_case_32", + "description": "After the successful run is completed, kill the server, restart it, it should NOT go into training phase", + "setup": {"n_servers": 2, "n_clients": 2}, + "events": [ + { + "trigger": {"run_finished": True}, + "actions": [ + "kill server", + "sleep 5", + ], + "result_state": {"run_finished": True}, + }, + ], +} + +# add kills and restarts at various times for stress testing? + +sag_tests = [test_case_15] # , test_case_16, test_case_17, test_case_18, test_case_19, test_case_20, test_case_22] +cyclic_tests = [test_case_22] # note: some tests are not applicable to cyclic_tests, as cyclic app only has 1 workflow + +ha_tests = {"pt": sag_tests, "cyclic": cyclic_tests} diff --git a/tests/integration_test/test_ha.yml b/tests/integration_test/test_ha.yml new file mode 100644 index 00000000000..9fb4544fdec --- /dev/null +++ b/tests/integration_test/test_ha.yml @@ -0,0 +1,19 @@ +poc: ../../nvflare/poc +n_clients: 2 +jobs_root_dir: ./jobs +apps_root_dir: ./apps +snapshot_path: /tmp/snapshot-storage +ha: True +cleanup: True + +tests: + # - app_name: pt + # validators: + # - tests.integration_test.validators.pt_model_validator.PTModelValidator + # - app_name: cyclic + # validators: + # - tests.integration_test.validators.tf_model_validator.TFModelValidator + - app_name: sag + validators: + - tests.integration_test.validators.sag_result_validator.SAGResultValidator + diff --git a/tests/integration_test/utils.py b/tests/integration_test/utils.py index 2429852fbd3..78a814e1b21 100644 --- a/tests/integration_test/utils.py +++ b/tests/integration_test/utils.py @@ -39,6 +39,7 @@ def generate_meta( def generate_job_dir_for_single_app_job(app_root_folder: str, app_name: str, clients: List[str], destination: str): app_folder = os.path.join(app_root_folder, app_name) + print(os.path.abspath(app_folder)) if not os.path.exists(app_folder): raise RuntimeError(f"App folder {app_folder} does not exist.") if not os.path.isdir(app_folder):