Skip to content

Commit

Permalink
FLARE-45: Implemented submit_job and list_jobs. (NVIDIA#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz authored Apr 22, 2022
1 parent b40885e commit 48daa2a
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 163 deletions.
3 changes: 3 additions & 0 deletions nvflare/apis/study_manager_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class Study:

DEFAULT_STUDY_NAME = "__default_study__"

def __init__(
self,
name: str,
Expand Down
63 changes: 13 additions & 50 deletions nvflare/fuel/hci/client/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import traceback

import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.job_def import JobMetaKey
from nvflare.fuel.hci.base64_utils import (
b64str_to_binary_file,
b64str_to_bytes,
Expand Down Expand Up @@ -150,7 +148,7 @@ def _remove_loading_dotdot(path):
class FileTransferModule(CommandModule):
"""Command module with commands relevant to file transfer."""

def __init__(self, upload_dir: str, download_dir: str, upload_folder_cmd_name="upload_app"):
def __init__(self, upload_dir: str, download_dir: str, upload_folder_cmd_name="submit_job"):
if not os.path.isdir(upload_dir):
raise ValueError("upload_dir {} is not a valid dir".format(upload_dir))

Expand Down Expand Up @@ -191,23 +189,17 @@ def get_spec(self):
usage="download_binary file_name ...",
handler_func=self.download_binary_file,
),
CommandSpec(
name=self.upload_folder_cmd_name,
description="upload application to the server",
usage=self.upload_folder_cmd_name + " application_folder",
handler_func=self.upload_folder,
),
CommandSpec(
name="download_folder",
description="download a folder from the server",
usage="download_folder folder_name",
handler_func=self.download_folder,
),
CommandSpec(
name="upload_job",
description="upload application to the server",
usage="upload_job job_folder",
handler_func=self.upload_job,
name="submit_job",
description="Submit application to the server",
usage="submit_job job_folder",
handler_func=self.submit_job,
),
CommandSpec(
name="download_job",
Expand Down Expand Up @@ -302,9 +294,9 @@ def download_folder(self, args, api: AdminAPISpec):
reply_processor = _DownloadFolderProcessor(self.download_dir)
return api.server_execute(command, reply_processor)

def upload_job(self, args, api: AdminAPISpec):
def submit_job(self, args, api: AdminAPISpec):
if len(args) != 2:
return {"status": APIStatus.ERROR_SYNTAX, "details": "usage: upload_job job_folder_name"}
return {"status": APIStatus.ERROR_SYNTAX, "details": "usage: submit_job job_folder"}

folder_name = args[1]
if folder_name.endswith("/"):
Expand All @@ -314,42 +306,13 @@ def upload_job(self, args, api: AdminAPISpec):
if not os.path.isdir(full_path):
return {"status": APIStatus.ERROR_RUNTIME, "details": f"'{full_path}' is not a valid folder."}

try:
meta_path = os.path.join(full_path, "meta.json")
with open(meta_path) as file:
meta = json.load(file)
except Exception as e:
return {
"status": APIStatus.ERROR_RUNTIME,
"details": "Exception while loading required meta.json in job directory: " + str(e),
}
try:
if not isinstance(meta.get(JobMetaKey.STUDY_NAME), str):
return {"status": APIStatus.ERROR_RUNTIME, "details": "STUDY_NAME is expected in meta.json to be a str"}
if not isinstance(meta.get(JobMetaKey.RESOURCE_SPEC), dict):
return {
"status": APIStatus.ERROR_RUNTIME,
"details": "RESOURCE_SPEC is expected in meta.json to be a dict",
}
if not isinstance(meta.get(JobMetaKey.DEPLOY_MAP), dict):
return {
"status": APIStatus.ERROR_RUNTIME,
"details": "DEPLOY_MAP is expected in meta.json to be a dict",
}

# zip the data
data = zip_directory_to_bytes(self.upload_dir, folder_name)
rel_path = os.path.relpath(full_path, self.upload_dir)
folder_name = _remove_loading_dotdot(rel_path)
meta[JobMetaKey.JOB_FOLDER_NAME.value] = folder_name

b64str = bytes_to_b64str(data)
serialized_meta = json.dumps(meta).encode("utf-8")
meta_b64str = bytes_to_b64str(serialized_meta)
except Exception as e:
return {"status": APIStatus.ERROR_RUNTIME, "details": f"Exception: {e}"}
# zip the data
data = zip_directory_to_bytes(self.upload_dir, folder_name)

parts = [_server_cmd_name(ftd.SERVER_CMD_UPLOAD_JOB), meta_b64str, b64str]
rel_path = os.path.relpath(full_path, self.upload_dir)
folder_name = _remove_loading_dotdot(rel_path)
b64str = bytes_to_b64str(data)
parts = [_server_cmd_name(ftd.SERVER_CMD_SUBMIT_JOB), folder_name, b64str]
command = join_args(parts)
return api.server_execute(command)

Expand Down
20 changes: 3 additions & 17 deletions nvflare/fuel/hci/client/fl_admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,14 @@ def delete_run(self, run_number: str) -> FLAdminAPIResponse:
)

@wrap_with_return_exception_responses
def upload_job(self, job_folder: str) -> FLAdminAPIResponse:
def submit_job(self, job_folder: str) -> FLAdminAPIResponse:
if not job_folder:
raise APISyntaxError("job_folder is required but not specified.")
if not isinstance(job_folder, str):
raise APISyntaxError("job_folder must be str but got {}.".format(type(job_folder)))
success, reply_data_full_response, reply = self._get_processed_cmd_reply_data("upload_job " + job_folder)
success, reply_data_full_response, reply = self._get_processed_cmd_reply_data("submit_job " + job_folder)
if reply_data_full_response:
if "Uploaded job" in reply_data_full_response:
if "Submitted job" in reply_data_full_response:
# TODO:: this is a hack to get job id
return FLAdminAPIResponse(
APIStatus.SUCCESS,
Expand All @@ -359,20 +359,6 @@ def upload_job(self, job_folder: str) -> FLAdminAPIResponse:
APIStatus.ERROR_RUNTIME, {"message": "Runtime error: could not handle server reply."}, reply
)

@wrap_with_return_exception_responses
def upload_app(self, app: str) -> FLAdminAPIResponse:
if not app:
raise APISyntaxError("app is required but not specified.")
if not isinstance(app, str):
raise APISyntaxError("app must be str but got {}.".format(type(app)))
success, reply_data_full_response, reply = self._get_processed_cmd_reply_data("upload_app " + app)
if reply_data_full_response:
if "Created folder" in reply_data_full_response:
return FLAdminAPIResponse(APIStatus.SUCCESS, {"message": reply_data_full_response})
return FLAdminAPIResponse(
APIStatus.ERROR_RUNTIME, {"message": "Runtime error: could not handle server reply."}, reply
)

@wrap_with_return_exception_responses
def deploy_app(
self, run_number: str, app: str, target_type: TargetType, targets: Optional[List[str]] = None
Expand Down
2 changes: 1 addition & 1 deletion nvflare/fuel/hci/client/fl_admin_api_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def run(
print(f"api.set_run_number({run_number})")
api_command_wrapper(self.api.set_run_number(run_number))
print(f'api.upload_app("{app}")')
api_command_wrapper(self.api.upload_app(app))
api_command_wrapper(self.api.submit_job(app))
print(f'api.deploy_app("{app}", TargetType.ALL)')
api_command_wrapper(self.api.deploy_app(app, TargetType.ALL))
print("api.check_status(TargetType.CLIENT)")
Expand Down
20 changes: 3 additions & 17 deletions nvflare/fuel/hci/client/fl_admin_api_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,13 @@ def delete_run(self, run_number: str) -> FLAdminAPIResponse:
pass

@abstractmethod
def upload_job(self, job_folder: str) -> FLAdminAPIResponse:
"""Uploads specified job folder.
def submit_job(self, job_folder: str) -> FLAdminAPIResponse:
"""Submit a job.
Assumes job folder is in the upload_dir set in API init.
Args:
job_folder (str): name of the job folder in upload_dir to upload
Returns: FLAdminAPIResponse
"""
pass

@abstractmethod
def upload_app(self, app: str) -> FLAdminAPIResponse:
"""Uploads specified app to the upload directory of FL server.
Currently assumes app is in the upload_dir set in API init.
Args:
app (str): name of the folder in upload_dir to upload
job_folder (str): name of the job folder in upload_dir to submit
Returns: FLAdminAPIResponse
Expand Down
2 changes: 1 addition & 1 deletion nvflare/fuel/hci/file_transfer_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
SERVER_CMD_DOWNLOAD_BINARY = "_download_binary_file"
SERVER_CMD_UPLOAD_FOLDER = "_upload_folder"
SERVER_CMD_DOWNLOAD_FOLDER = "_download_folder"
SERVER_CMD_UPLOAD_JOB = "_upload_job"
SERVER_CMD_SUBMIT_JOB = "_submit_job"
SERVER_CMD_DOWNLOAD_JOB = "_download_job"
SERVER_CMD_INFO = "_info"
101 changes: 83 additions & 18 deletions nvflare/fuel/hci/server/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import shutil
import tempfile
import traceback
from typing import List
from io import BytesIO
from typing import List, Tuple
from zipfile import ZipFile

import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.fl_constant import SystemComponents
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.apis.study_manager_spec import Study, StudyManagerSpec
from nvflare.fuel.hci.base64_utils import (
b64str_to_binary_file,
b64str_to_bytes,
Expand All @@ -31,7 +36,7 @@
)
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.zip_utils import unzip_all_from_bytes, zip_directory_to_bytes
from nvflare.fuel.hci.zip_utils import convert_legacy_zip, unzip_all_from_bytes, zip_directory_to_bytes


class FileTransferModule(CommandModule):
Expand Down Expand Up @@ -101,10 +106,10 @@ def get_spec(self):
visible=False,
),
CommandSpec(
name=ftd.SERVER_CMD_UPLOAD_JOB,
description="upload a job def",
usage="upload_job job_folder_name",
handler_func=self.upload_job,
name=ftd.SERVER_CMD_SUBMIT_JOB,
description="Submit a job",
usage="submit_job job_folder",
handler_func=self.submit_job,
visible=False,
),
CommandSpec(
Expand Down Expand Up @@ -251,26 +256,44 @@ def download_folder(self, conn: Connection, args: List[str]):
traceback.print_exc()
conn.append_error("exception occurred")

def upload_job(self, conn: Connection, args: List[str]):
meta_b64str = args[1]
def submit_job(self, conn: Connection, args: List[str]):

folder_name = args[1]
zip_b64str = args[2]
data_bytes = b64str_to_bytes(zip_b64str)
meta = json.loads(b64str_to_bytes(meta_b64str))
data_bytes = convert_legacy_zip(b64str_to_bytes(zip_b64str))
engine = conn.app_ctx
study_manager = engine.get_component(SystemComponents.STUDY_MANAGER)
if not study_manager:
conn.append_error("Server configuration error: no study manager is configured")
return

try:
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
with ZipFile(BytesIO(data_bytes), "r") as zf:
meta_file = folder_name + "/meta.json"
meta_data = zf.read(meta_file)
meta = json.loads(meta_data)
if JobMetaKey.JOB_FOLDER_NAME not in meta:
meta[JobMetaKey.JOB_FOLDER_NAME.value] = folder_name
valid, error = self._validate_job(meta, study_manager, fl_ctx)
if not valid:
conn.append_error(error)
return

job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)

meta = job_def_manager.create(meta, data_bytes, fl_ctx)
conn.set_prop("meta", meta)
conn.set_prop("upload_job_id", meta.get(JobMetaKey.JOB_ID))
conn.append_string("Uploaded job: {}".format(meta.get(JobMetaKey.JOB_ID)))
conn.set_prop("submit_job_id", meta.get(JobMetaKey.JOB_ID))
conn.append_string("Submitted job: {}".format(meta.get(JobMetaKey.JOB_ID)))
except Exception as e:
conn.append_error("Exception occurred trying to upload job: " + str(e))
conn.append_error("Exception occurred trying to submit job: " + str(e))
return

conn.append_success("")

def download_job(self, conn: Connection, args: List[str]):
Expand Down Expand Up @@ -308,3 +331,45 @@ def download_job(self, conn: Connection, args: List[str]):
def info(self, conn: Connection, args: List[str]):
conn.append_string("Server Upload Destination: {}".format(self.upload_dir))
conn.append_string("Server Download Source: {}".format(self.download_dir))

def _validate_job(self, meta: dict, study_manager: StudyManagerSpec, fl_ctx: FLContext) -> Tuple[bool, str]:

study_name = meta.get(JobMetaKey.STUDY_NAME)
if not isinstance(study_name, str) or not str:
return False, "study_name is expected in meta.json to be a str"

if study_name == Study.DEFAULT_STUDY_NAME:
return True, ""

if not isinstance(meta.get(JobMetaKey.RESOURCE_SPEC), dict):
return False, "RESOURCE_SPEC is expected in meta.json to be a dict"
if not isinstance(meta.get(JobMetaKey.DEPLOY_MAP), dict):
return False, "DEPLOY_MAP is expected in meta.json to be a dict"

study = study_manager.get_study(study_name, fl_ctx)
if not study:
return False, "Study {} does not exist".format(study_name)

if study.participating_clients:

# Gather all the clients used in deploy_map or which are mandatory
required_clients = set()
deploy_map = meta.get(JobMetaKey.DEPLOY_MAP)
if deploy_map:
for k, v in deploy_map:
required_clients.update(v)

mandatory_clients = meta.get(JobMetaKey.MANDATORY_CLIENTS)
if mandatory_clients:
required_clients.update(mandatory_clients)

required_clients.remove("server")

participants = set(study.participating_clients)
if not required_clients.issubset(participants):
diff = required_clients - participants
return False, "Following clients are not participants of the study {}: {}".format(study_name, diff)

# TODO: Validate resources against resource manager

return True, ""
Loading

0 comments on commit 48daa2a

Please sign in to comment.