From 298517f83dc2870870931125843037ea5024ba24 Mon Sep 17 00:00:00 2001 From: Dien Duong Date: Tue, 29 Dec 2020 14:48:04 +0700 Subject: [PATCH 01/12] refs#55704: Implement api create serve task --- server/app/routes.py | 111 ++++++++++++++++++++++++++++- server/nmtwizard/mongo_database.py | 15 +++- server/nmtwizard/task.py | 22 ++++++ 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 178e6503..f2d4ade6 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -24,12 +24,13 @@ get_cpu_count, get_params, boolean_param, change_parent_task, remove_config_option, model_name_analysis from nmtwizard.capacity import Capacity from nmtwizard.task import TaskEnum, TaskInfos, TasksCreationInfos, TaskPreprocess, TaskTrain, TaskTranslate, \ - TaskScoring, TASK_RELEASE_TYPE + TaskScoring, TaskServe, TASK_RELEASE_TYPE # only for launch() maybe deprecated from nmtwizard.task import TaskBase from utils.storage_utils import StorageUtils GLOBAL_POOL_NAME = "global_pool" +SERVE_POOL_NAME = "serve_pool" logger = logging.getLogger(__name__) logger.addHandler(app.logger) @@ -814,7 +815,6 @@ def get_translate_score_corpus(testing_data_infos, request_data, routes_config, source = request_data["source"] target = request_data["target"] default_test_data = get_default_test_data(routes_config.storage_client, source, target) if with_default_test else [] - to_translate_corpus = [] to_score_corpus = [] for corpus in testing_data_infos: @@ -1182,6 +1182,113 @@ def get_evaluations(): return cust_jsonify(evaluation_catalogs) +@app.route("/models/deploy", methods=["POST"]) +@filter_request("POST/models/deploy", "train") +def deploy_model_local(): + request_data = _parse_request_data_for_deploy_model(request) + service_config = config.get_service_config(mongo_client, SERVE_POOL_NAME) + resource = _select_resource_for_deploy_model(service_config) + port_to_deploy = _select_port_for_deploy_model(resource) + + if port_to_deploy is None: + raise Exception("Not available port") + + create_serve_task(request_data["model"], SERVE_POOL_NAME, request_data["source"], request_data["target"], + port_to_deploy, resource["host"]) + + return cust_jsonify({"message": "ok"}) + + +def _parse_request_data_for_deploy_model(current_request): + # model = "SAAKH_enes_JDBroadCarrot_52_de9bcac-c7565_release" + # TODO: Validate request data + request_data = current_request.form + model = request_data.get("model") + # TODO: Get language_pair from model catalog + language_pair = "en_fr" + source_language = language_pair.split("_")[0] + target_language = language_pair.split("_")[1] + + return { + "source": source_language, + "target": target_language, + "model": model + } + + +def _select_resource_for_deploy_model(service_config): + # TODO: Select suitable resource for deploy + return service_config["variables"]["server_pool"][0] + + +def _select_port_for_deploy_model(resource_config): + busy_ports = _get_all_busy_port_of_resource(resource_config["host"]) + port_range = resource_config["serve_port_range"] + port_range_from = port_range.get("from") + port_range_to = port_range.get("to") + + if len(busy_ports) == 0: + return port_range_from + + for port in range(port_range_from, port_range_to + 1): + if port not in busy_ports: + return port + return None + + +def _get_all_busy_port_of_resource(resource): + deployment_of_resources = mongo_client.get_all_deployment_of_resource(resource) + busy_ports = list(map(lambda deployment: deployment["port"], deployment_of_resources)) + return busy_ports + + +def create_serve_task(model, service, source_language, target_language, resource_port, resource): + request_data = { + "source": source_language, + "target": target_language + } + routes_config = RoutesConfiguration(flask.g, service) + content = get_serve_task_config(service=service, request_data=request_data, routes_config=routes_config) + task_infos = TaskInfos(content=content, files={}, request_data=request_data, routes_configuration=routes_config, + service=service, resource=resource) + + serve_task = TaskServe(task_infos, model, resource_port) + + serve_task.create(redis_db, taskfile_dir) + + +def get_serve_task_config(service, request_data, routes_config): + docker_image_info = TaskBase.get_docker_image_info(routes_config, request_data.get("docker_image"), mongo_client) + + content = { + "service": service, + "docker": {**docker_image_info}, + "wait_after_launch": 2, + "creator": f"{routes_config.entity_owner}{routes_config.creator['user_code']}", + "options": {}, + } + + if request_data.get("ncpus"): + content["ncpus"] = request_data["ncpus"] + if request_data.get("priority"): + content["priority"] = request_data["priority"] + if request_data.get("iterations"): + content["iterations"] = request_data["iterations"] + return json.loads(json.dumps(content)) + + +# Notes: Call this function when deploy success +def create_model_deployment_info(creator, model, resource_host, resource_port): + model_deployment_info = { + "model": model, + "resource": resource_host, + "port": resource_port, + "creator": creator + } + + mongo_client.create_deployment_info(model_deployment_info) + + @app.route("/task/launch/", methods=["POST"]) @filter_request("POST/task/launch", "train") def launch(service): diff --git a/server/nmtwizard/mongo_database.py b/server/nmtwizard/mongo_database.py index b2984e15..fcb80626 100644 --- a/server/nmtwizard/mongo_database.py +++ b/server/nmtwizard/mongo_database.py @@ -6,7 +6,8 @@ "tags": "pn9-tag", "dockers": "pn9-docker", "evaluations": "pn9-evaluation", - "dataset": "pn9-dataset" + "dataset": "pn9-dataset", + "serving_models": "pn9-serving-model" } @@ -181,3 +182,15 @@ def get_dataset_by_ids(self, dataset_ids): "$in": dataset_ids }}) return dataset + + def get_all_deployment_of_resource(self, resource): + the_table = self.get("serving_models") + query = { + "resource": resource + } + deployments = the_table.find(query) + return deployments + + def create_deployment_info(self, deployment_info): + the_table = self.get("serving_models") + the_table.insert(deployment_info) diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index c4e0560a..8d8cc85a 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -249,6 +249,28 @@ def __init__(self, task_infos, parent_task_id, to_translate): TaskBase.__init__(self, task_infos) +class TaskServe(TaskBase): + def __init__(self, task_infos, parent_task_id, resource_port): + self._task_suffix = "serve" + self._task_type = "serve" + self._parent_task_id = parent_task_id + self._resource_port = resource_port + self._docker_container_port = 4000 + + task_infos.content["priority"] = task_infos.content.get("priority", 0) + 1 + task_infos.content["ngpus"] = 0 + if "ncpus" not in task_infos.content: + task_infos.content["ncpus"] = get_cpu_count(task_infos.routes_configuration.service_config, + task_infos.content["ngpus"], "trans") + + task_infos.content["docker"]["command"] = ["-m", self._parent_task_id] + task_infos.content["docker"]["command"].extend(["-ms", "s3_release:"]) + task_infos.content["docker"]["command"].extend(["serve", "-p", self._docker_container_port]) + change_parent_task(task_infos.content["docker"]["command"], parent_task_id) + + TaskBase.__init__(self, task_infos) + + class TaskScoring(TaskBase): def __init__(self, task_infos, parent_task_id, model, to_score): self._task_suffix = "score" From 2d28e1863a7700ae03442f7a7721eaefee037192 Mon Sep 17 00:00:00 2001 From: Dien Duong Date: Wed, 30 Dec 2020 09:33:19 +0700 Subject: [PATCH 02/12] refs#55704: Implement api predict --- server/app/routes.py | 28 ++++++++++++++++++++++++++++ server/nmtwizard/mongo_database.py | 8 ++++++++ server/utils/request_utils.py | 17 +++++++++++++++++ 3 files changed, 53 insertions(+) create mode 100644 server/utils/request_utils.py diff --git a/server/app/routes.py b/server/app/routes.py index f2d4ade6..7f569365 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -28,6 +28,7 @@ # only for launch() maybe deprecated from nmtwizard.task import TaskBase from utils.storage_utils import StorageUtils +from utils.request_utils import RequestUtils GLOBAL_POOL_NAME = "global_pool" SERVE_POOL_NAME = "serve_pool" @@ -1182,6 +1183,33 @@ def get_evaluations(): return cust_jsonify(evaluation_catalogs) +@app.route("/models/predict", methods=["POST"]) +@filter_request("POST/models/predict", "train") +def predict(): + request_data = _parse_request_data_for_predict(request) + model = request_data["model"] + model_deployment_info = mongo_client.get_deployment_info_of_model(model) + if not model_deployment_info: + abort(flask.make_response(flask.jsonify(message="Model has not been deployed: %s" % model), 400)) + request_url = f"http://{model_deployment_info['resource']}:{model_deployment_info['port']}/translate" + request_data = {"src": [{"text": request_data["text"]}]} + response = RequestUtils.post(request_url, request_data) + return response.json(), response.status_code + + +def _parse_request_data_for_predict(current_request): + # model = "SAAKH_enes_JDBroadCarrot_52_de9bcac-c7565_release" + # TODO: Validate request data + request_data = current_request.form + model = request_data.get("model") + text = request_data.get("text") + + return { + "model": model, + "text": text + } + + @app.route("/models/deploy", methods=["POST"]) @filter_request("POST/models/deploy", "train") def deploy_model_local(): diff --git a/server/nmtwizard/mongo_database.py b/server/nmtwizard/mongo_database.py index fcb80626..f761a137 100644 --- a/server/nmtwizard/mongo_database.py +++ b/server/nmtwizard/mongo_database.py @@ -194,3 +194,11 @@ def get_all_deployment_of_resource(self, resource): def create_deployment_info(self, deployment_info): the_table = self.get("serving_models") the_table.insert(deployment_info) + + def get_deployment_info_of_model(self, model): + the_table = self.get("serving_models") + query = { + "model": model + } + deployment = the_table.find_one(query) + return deployment diff --git a/server/utils/request_utils.py b/server/utils/request_utils.py new file mode 100644 index 00000000..c23749e3 --- /dev/null +++ b/server/utils/request_utils.py @@ -0,0 +1,17 @@ +import requests + + +class RequestUtils: + @staticmethod + def get(url, cookies=None): + if cookies is None: + cookies = {} + response = requests.get(url, cookies=cookies) + return response + + @staticmethod + def post(url, json_data=None): + if json_data is None: + json_data = {} + response = requests.post(url, json=json_data) + return response From 1c6ebcc9d110569fa5fc642a38c7180ee241cf4a Mon Sep 17 00:00:00 2001 From: Dien Duong Date: Wed, 30 Dec 2020 09:41:57 +0700 Subject: [PATCH 03/12] refs#55704: Edit deploy model api: check model has deployed. --- server/app/routes.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 7f569365..8de79d42 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -1214,12 +1214,15 @@ def _parse_request_data_for_predict(current_request): @filter_request("POST/models/deploy", "train") def deploy_model_local(): request_data = _parse_request_data_for_deploy_model(request) + model_deployment_info = mongo_client.get_deployment_info_of_model(request_data["model"]) + if model_deployment_info: + abort(flask.make_response(flask.jsonify(message="The model has deployed!"), 400)) + service_config = config.get_service_config(mongo_client, SERVE_POOL_NAME) resource = _select_resource_for_deploy_model(service_config) port_to_deploy = _select_port_for_deploy_model(resource) - if port_to_deploy is None: - raise Exception("Not available port") + abort(flask.make_response(flask.jsonify(message="Not available port to deploy"), 400)) create_serve_task(request_data["model"], SERVE_POOL_NAME, request_data["source"], request_data["target"], port_to_deploy, resource["host"]) From 3db9e721ef47b1fbd7d5fa1f76986f09280ea071 Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Wed, 27 Jan 2021 10:28:56 +0700 Subject: [PATCH 04/12] add create serve task and predict feature --- server/app/routes.py | 73 ++++++++++++++++++------------ server/nmtwizard/common.py | 26 +++++++---- server/nmtwizard/mongo_database.py | 40 +++++++++++++--- server/nmtwizard/task.py | 55 +++++++++++----------- server/nmtwizard/worker.py | 14 ++++++ server/services/ssh.py | 8 ++++ 6 files changed, 144 insertions(+), 72 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 8de79d42..2ca9d0c7 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -31,7 +31,6 @@ from utils.request_utils import RequestUtils GLOBAL_POOL_NAME = "global_pool" -SERVE_POOL_NAME = "serve_pool" logger = logging.getLogger(__name__) logger.addHandler(app.logger) @@ -1183,22 +1182,31 @@ def get_evaluations(): return cust_jsonify(evaluation_catalogs) -@app.route("/models/predict", methods=["POST"]) -@filter_request("POST/models/predict", "train") +@app.route("/model/predict", methods=["POST"]) +@filter_request("POST/model/predict", "train") def predict(): request_data = _parse_request_data_for_predict(request) + + if len(request_data["text"]) > config.get_system_config()["predict"]["max_characters"]: + abort(flask.make_response(flask.jsonify(message="your input is too long"), 403)) + model = request_data["model"] - model_deployment_info = mongo_client.get_deployment_info_of_model(model) - if not model_deployment_info: + deployment_info = mongo_client.get_deployment_info_of_model(model) + if not deployment_info: abort(flask.make_response(flask.jsonify(message="Model has not been deployed: %s" % model), 400)) - request_url = f"http://{model_deployment_info['resource']}:{model_deployment_info['port']}/translate" + request_url = f"http://{deployment_info['resource']}:{deployment_info['serving_port']}/translate" request_data = {"src": [{"text": request_data["text"]}]} response = RequestUtils.post(request_url, request_data) return response.json(), response.status_code +@app.route("/model/inactive", methods=["POST"]) +@filter_request("POST/model/inactive", "train") +def inactive_model(): + pass + + def _parse_request_data_for_predict(current_request): - # model = "SAAKH_enes_JDBroadCarrot_52_de9bcac-c7565_release" # TODO: Validate request data request_data = current_request.form model = request_data.get("model") @@ -1210,35 +1218,38 @@ def _parse_request_data_for_predict(current_request): } -@app.route("/models/deploy", methods=["POST"]) -@filter_request("POST/models/deploy", "train") -def deploy_model_local(): +@app.route("/model/active", methods=["POST"]) +@filter_request("POST/model/active", "train") +def active_model_local(): + """ + deploy model on local machine for prediction (only for model studio lite) + """ request_data = _parse_request_data_for_deploy_model(request) - model_deployment_info = mongo_client.get_deployment_info_of_model(request_data["model"]) - if model_deployment_info: - abort(flask.make_response(flask.jsonify(message="The model has deployed!"), 400)) + deployment_info = mongo_client.get_deployment_info_of_model(request_data["model"]) + if deployment_info: + abort(flask.make_response(flask.jsonify(message="The model has been active!"), 400)) - service_config = config.get_service_config(mongo_client, SERVE_POOL_NAME) + service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) + # TODO: select resource for deployment resource = _select_resource_for_deploy_model(service_config) - port_to_deploy = _select_port_for_deploy_model(resource) + # TODO: select port for deployment + port_to_deploy = 4000 # _select_port_for_deploy_model(resource) if port_to_deploy is None: - abort(flask.make_response(flask.jsonify(message="Not available port to deploy"), 400)) + abort(flask.make_response(flask.jsonify(message="Not available port to active"), 400)) - create_serve_task(request_data["model"], SERVE_POOL_NAME, request_data["source"], request_data["target"], - port_to_deploy, resource["host"]) + create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], + port_to_deploy, "auto") # resource["host"] return cust_jsonify({"message": "ok"}) def _parse_request_data_for_deploy_model(current_request): - # model = "SAAKH_enes_JDBroadCarrot_52_de9bcac-c7565_release" # TODO: Validate request data request_data = current_request.form model = request_data.get("model") - # TODO: Get language_pair from model catalog - language_pair = "en_fr" - source_language = language_pair.split("_")[0] - target_language = language_pair.split("_")[1] + language_pair = model.split('_')[1] + source_language = language_pair[:2] + target_language = language_pair[2:] return { "source": source_language, @@ -1268,12 +1279,16 @@ def _select_port_for_deploy_model(resource_config): def _get_all_busy_port_of_resource(resource): + # TODO: change port type to list deployment_of_resources = mongo_client.get_all_deployment_of_resource(resource) - busy_ports = list(map(lambda deployment: deployment["port"], deployment_of_resources)) - return busy_ports + if deployment_of_resources: + busy_ports = list(map(lambda deployment: deployment["serving_port"], deployment_of_resources)) + return busy_ports + else: + return [] -def create_serve_task(model, service, source_language, target_language, resource_port, resource): +def create_serve_task(model, service, source_language, target_language, exposed_port, resource): request_data = { "source": source_language, "target": target_language @@ -1283,7 +1298,7 @@ def create_serve_task(model, service, source_language, target_language, resource task_infos = TaskInfos(content=content, files={}, request_data=request_data, routes_configuration=routes_config, service=service, resource=resource) - serve_task = TaskServe(task_infos, model, resource_port) + serve_task = TaskServe(task_infos, model, exposed_port) serve_task.create(redis_db, taskfile_dir) @@ -1637,7 +1652,7 @@ def launch(service): "registry": get_registry(service_module, image_score), "tag": "2.1.0-beta1", "command": ["score", "-o"] + oref["output"] + ["-r"] + oref["ref"] + - option_lang + ['-f', "launcher:scores"] + option_lang + ['-f', "launcher:scores"] } score_task_id, explicit_name = build_task_id(content_score, xxyy, "score", parent_task_id) @@ -1692,7 +1707,7 @@ def launch(service): "registry": get_registry(service_module, image_score), "tag": "latest", "command": ["tuminer", "--tumode", "score", "--srcfile"] + in_out["infile"] + ["--tgtfile"] + - in_out["outfile"] + ["--output"] + in_out["scorefile"] + in_out["outfile"] + ["--output"] + in_out["scorefile"] } tuminer_task_id, explicit_name = build_task_id(content_tuminer, xxyy, "tuminer", parent_task_id) diff --git a/server/nmtwizard/common.py b/server/nmtwizard/common.py index 95a444b2..9c415779 100644 --- a/server/nmtwizard/common.py +++ b/server/nmtwizard/common.py @@ -18,7 +18,7 @@ def displaycmd(lst): while p != -1: q = t.find("]]", p) if q != -1: - t = t[0:p] + t[q+2:] + t = t[0:p] + t[q + 2:] else: t = t[0:p] p = t.find("[[private:", p) @@ -45,10 +45,10 @@ def rmprivate(lst): if isinstance(t, six.string_types): p = t.find("[[private:") while p != -1: - t = t[0:p] + t[p+10:] + t = t[0:p] + t[p + 10:] q = t.find("]]") if q != -1: - t = t[0:q] + t[q+2:] + t = t[0:q] + t[q + 2:] p = t.find("[[private:", q) else: p = -1 @@ -165,7 +165,7 @@ def ssh_connect_with_retry(hostname, pkey=pkey, key_filename=key_filename, look_for_keys=False) - logger.info("Connection to %s successful (%f)", hostname, time.time()-start) + logger.info("Connection to %s successful (%f)", hostname, time.time() - start) if login_cmd is not None: if not run_and_check_command(client, login_cmd): raise RuntimeError("failed to run login command") @@ -267,9 +267,9 @@ def cmd_connect_private_registry(docker_registry): if docker_registry['type'] == "aws": return ('$(AWS_ACCESS_KEY_ID=%s AWS_SECRET_ACCESS_KEY=%s ' 'aws ecr get-login --no-include-email --region %s)') % ( - docker_registry['credentials']['AWS_ACCESS_KEY_ID'], - docker_registry['credentials']['AWS_SECRET_ACCESS_KEY'], - docker_registry['region']) + docker_registry['credentials']['AWS_ACCESS_KEY_ID'], + docker_registry['credentials']['AWS_SECRET_ACCESS_KEY'], + docker_registry['region']) username = docker_registry['credentials']['username'] password = docker_registry['credentials']['password'] return 'docker login --username %s --password %s' % (username, password) @@ -291,8 +291,8 @@ def cmd_docker_run(lxpu, docker_options, task_id, if nbgpu == 0 or (nbgpu == 1 and lgpu[0] == 0): gpu_id = '0' else: - env['NV_GPU'] = ",".join([str(int(g)-1) for g in lgpu]) - gpu_id = ",".join([str(v) for v in range(1, nbgpu+1)]) + env['NV_GPU'] = ",".join([str(int(g) - 1) for g in lgpu]) + gpu_id = ",".join([str(v) for v in range(1, nbgpu + 1)]) if docker_options.get('dev') == 1: return "sleep 35" @@ -306,6 +306,13 @@ def cmd_docker_run(lxpu, docker_options, task_id, # launch the task cmd = '%s_o_run_o_-i_o_--rm' % docker_cmd + + # only for serve tasks + if task_id.split("_")[-1] == "serve": + if "port" in server_params: + cmd += "_o_-p_o_{}".format(docker_command[-1]) + docker_command = docker_command[:-1] + if 'mount' in docker_options: for k in docker_options['mount']: cmd += '_o_-v_o_%s' % k @@ -410,6 +417,7 @@ def launch_task(task_id, * `callback_url`: server to callback for beat of activity * `callback_interval`: time between 2 beats """ + (lgpu, lcpu) = lxpu gpu_id = ",".join(lgpu) logger.info("launching task - %s / %s", task_id, gpu_id) diff --git a/server/nmtwizard/mongo_database.py b/server/nmtwizard/mongo_database.py index f761a137..92e5cc20 100644 --- a/server/nmtwizard/mongo_database.py +++ b/server/nmtwizard/mongo_database.py @@ -7,7 +7,7 @@ "dockers": "pn9-docker", "evaluations": "pn9-evaluation", "dataset": "pn9-dataset", - "serving_models": "pn9-serving-model" + "models": "pn9-model" } @@ -184,21 +184,47 @@ def get_dataset_by_ids(self, dataset_ids): return dataset def get_all_deployment_of_resource(self, resource): - the_table = self.get("serving_models") + the_table = self.get("models") query = { "resource": resource } - deployments = the_table.find(query) - return deployments + model_info = the_table.find(query) + + if model_info and model_info.get("resource") is not None: + deployment_info = { + "resource": model_info["resource"], + "serving_port": model_info["serving_port"] + } + return deployment_info + + return None def create_deployment_info(self, deployment_info): the_table = self.get("serving_models") the_table.insert(deployment_info) def get_deployment_info_of_model(self, model): - the_table = self.get("serving_models") + the_table = self.get("models") query = { "model": model } - deployment = the_table.find_one(query) - return deployment + model_info = the_table.find_one(query) + + if model_info and model_info.get("resource") is not None: + deployment_info = { + "resource": model_info["resource"], + "serving_port": model_info["serving_port"] + } + return deployment_info + + return None + + def update_document(self, model, data): + the_table = self.get("models") + query = { + "model": model + } + update = { + "$set": data + } + the_table.update_one(query, update) diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index 8d8cc85a..aa67d44d 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -250,12 +250,12 @@ def __init__(self, task_infos, parent_task_id, to_translate): class TaskServe(TaskBase): - def __init__(self, task_infos, parent_task_id, resource_port): + def __init__(self, task_infos, parent_task_id, serving_port): self._task_suffix = "serve" self._task_type = "serve" self._parent_task_id = parent_task_id - self._resource_port = resource_port - self._docker_container_port = 4000 + self._serving_port = serving_port + self._docker_container_port = "4000" task_infos.content["priority"] = task_infos.content.get("priority", 0) + 1 task_infos.content["ngpus"] = 0 @@ -264,8 +264,9 @@ def __init__(self, task_infos, parent_task_id, resource_port): task_infos.content["ngpus"], "trans") task_infos.content["docker"]["command"] = ["-m", self._parent_task_id] - task_infos.content["docker"]["command"].extend(["-ms", "s3_release:"]) - task_infos.content["docker"]["command"].extend(["serve", "-p", self._docker_container_port]) + task_infos.content["docker"]["command"].extend(["-ms", "pn9_model_catalog:"]) + task_infos.content["docker"]["command"].extend(["serve"]) + task_infos.content["docker"]["command"].extend([self._docker_container_port + ':' + str(self._serving_port)]) change_parent_task(task_infos.content["docker"]["command"], parent_task_id) TaskBase.__init__(self, task_infos) @@ -387,7 +388,7 @@ def terminate(redis, task_id, phase): # remove from service queue if it was there service = redis.hget(keyt, "service") if service is not None: - redis.lrem('queued:'+service, 0, task_id) + redis.lrem('queued:' + service, 0, task_id) redis.hset(keyt, "message", phase) set_status(redis, keyt, "terminating") @@ -396,49 +397,49 @@ def terminate(redis, task_id, phase): def work_queue(redis, task_id, service=None, delay=0): if service is None: - service = redis.hget('task:'+task_id, 'service') + service = redis.hget('task:' + task_id, 'service') # Queues the task in the work queue with a delay. if delay == 0: with redis.acquire_lock(f'work_queue:{task_id}', acquire_timeout=1, expire_time=10): if task_id not in redis.lrange(f'work:{service}', 0, -1): - redis.lpush('work:'+service, task_id) - redis.delete('queue:'+task_id) + redis.lpush('work:' + service, task_id) + redis.delete('queue:' + task_id) else: - redis.set('queue:'+task_id, delay) - redis.expire('queue:'+task_id, int(delay)) + redis.set('queue:' + task_id, delay) + redis.expire('queue:' + task_id, int(delay)) def work_unqueue(redis, service): """Pop a task from the work queue.""" - return redis.rpop('work:'+service) + return redis.rpop('work:' + service) def service_queue(redis, task_id, service): """Queue the task on the service queue.""" - with redis.acquire_lock('service:'+service): - redis.lrem('queued:'+service, 0, task_id) - redis.lpush('queued:'+service, task_id) - redis.delete('queue:'+task_id) + with redis.acquire_lock('service:' + service): + redis.lrem('queued:' + service, 0, task_id) + redis.lpush('queued:' + service, task_id) + redis.delete('queue:' + task_id) def enable(redis, task_id, service=None): if service is None: - service = redis.hget('task:'+task_id, 'service') + service = redis.hget('task:' + task_id, 'service') # Marks a task as enabled. - redis.sadd("active:"+service, task_id) + redis.sadd("active:" + service, task_id) def disable(redis, task_id, service=None): if service is None: - service = redis.hget('task:'+task_id, 'service') + service = redis.hget('task:' + task_id, 'service') # Marks a task as disabled. - redis.srem("active:"+service, task_id) - redis.delete("beat:"+task_id) + redis.srem("active:" + service, task_id) + redis.delete("beat:" + task_id) def list_active(redis, service): """Returns all active tasks (i.e. non stopped).""" - return redis.smembers("active:"+service) + return redis.smembers("active:" + service) def info(redis, taskfile_dir, task_id, fields): @@ -482,8 +483,8 @@ def change(redis, task_id, service, priority, ngpus): disable(redis, task_id, prev_service) enable(redis, task_id, service) redis.hset(keyt, "service", service) - redis.lrem('queued:'+prev_service, 0, task_id) - redis.lpush('queued:'+service, task_id) + redis.lrem('queued:' + prev_service, 0, task_id) + redis.lpush('queued:' + service, task_id) if priority: redis.hset(keyt, "priority", priority) if ngpus: @@ -577,7 +578,7 @@ def set_file(redis, taskfile_dir, task_id, content, filename, limit=None): with open(os.path.join(taskdir, filename), "wb") as fh: content = six.ensure_binary(content) if limit and len(content) >= limit: - content = content[:limit-len(disclaimer)] + disclaimer + content = content[:limit - len(disclaimer)] + disclaimer fh.write(content) return content @@ -593,8 +594,8 @@ def append_file(redis, taskfile_dir, task_id, content, filename, limit=None): if limit and current_size >= limit: return content = six.ensure_binary(content, encoding="utf-8") - if limit and len(content)+current_size >= limit: - content = content[:limit-len(disclaimer)] + disclaimer + if limit and len(content) + current_size >= limit: + content = content[:limit - len(disclaimer)] + disclaimer with open(filepath, "ab") as fh: fh.write(content) diff --git a/server/nmtwizard/worker.py b/server/nmtwizard/worker.py index d73381c6..3dc8a23d 100644 --- a/server/nmtwizard/worker.py +++ b/server/nmtwizard/worker.py @@ -9,6 +9,8 @@ from nmtwizard import task, configuration as config from nmtwizard.capacity import Capacity +from utils.request_utils import RequestUtils +from app import mongo_client def _compatible_resource(resource, request_resource): @@ -210,11 +212,13 @@ def _handle_allocated_task(self, task_id): task.terminate(self._redis, task_id, phase='launch_error') self._logger.info(traceback.format_exc()) return + self._logger.info('%s: task started on %s', task_id, service.name) self._redis.hset(keyt, 'job', json.dumps(data)) task.set_status(self._redis, keyt, 'running') # For services that do not notify their activity, we should # poll the task status more regularly. + task.work_queue(self._redis, task_id, service.name, delay=service.is_notifying_activity and 120 or 30) @@ -230,6 +234,16 @@ def _handle_running_task(self, task_id): task_id, service.name) task.terminate(self._redis, task_id, phase='exited') else: + # check serving status for serve tasks + if task_id.split('_')[-1] == "serve": + model_name = json.loads(self._redis.hget(keyt, 'content'))["docker"]["command"][1] + deployment_info = mongo_client.get_deployment_info_of_model(model_name) + serving_status = RequestUtils.get(f"http://{deployment_info['resource']}:{deployment_info['serving_port']}/status") + if json.loads(serving_status.text)["status"] == "ready": + task.terminate(self._redis, task_id, phase='successful activation') + # update serving status in mongodb + mongo_client.update_document(model_name, {"serving_status": "running"}) + task.work_queue(self._redis, task_id, service.name, delay=service.is_notifying_activity and 600 or 120) except Exception as e: diff --git a/server/services/ssh.py b/server/services/ssh.py index b21ae916..5dc8d778 100644 --- a/server/services/ssh.py +++ b/server/services/ssh.py @@ -1,6 +1,7 @@ import logging import time +from app import mongo_client from nmtwizard import common from nmtwizard.service import Service from nmtwizard.capacity import Capacity @@ -149,6 +150,13 @@ def launch(self, support_statistics): options['server'] = resource params = _get_params(self._config, options) + + # update resource for serve tasks + if task_id.split("_")[-1] == "serve": + model_name = docker_command[1] + mongo_client.update_document(model_name, {"resource": params["host"], "serving_port": + docker_command[-1].split(":")[-1]}) + client = self._get_client(params=params) try: callback_url = self._config.get('callback_url') From bd61ebf1950c74a7385db72baafbeb34fb96070a Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Mon, 1 Feb 2021 15:30:21 +0700 Subject: [PATCH 05/12] refs#55704: fix docker port = 4000 --- server/nmtwizard/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/nmtwizard/common.py b/server/nmtwizard/common.py index 9c415779..5e6d1f1e 100644 --- a/server/nmtwizard/common.py +++ b/server/nmtwizard/common.py @@ -309,9 +309,9 @@ def cmd_docker_run(lxpu, docker_options, task_id, # only for serve tasks if task_id.split("_")[-1] == "serve": - if "port" in server_params: - cmd += "_o_-p_o_{}".format(docker_command[-1]) - docker_command = docker_command[:-1] + serving_port = docker_command[-1] + cmd += "_o_-p_o_4000:{}".format(serving_port) + docker_command = docker_command[:-1] if 'mount' in docker_options: for k in docker_options['mount']: From fb7c5ac569db36ea283ea969e5f33c370e0c50ef Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Mon, 1 Feb 2021 15:43:41 +0700 Subject: [PATCH 06/12] refs#55704: update predict and inactive model feature, add limit deployed model checking to creating feature --- server/app/routes.py | 111 +++++++++++++++++++++-------- server/nmtwizard/mongo_database.py | 49 ++++++++++--- server/nmtwizard/task.py | 4 +- server/nmtwizard/worker.py | 16 ++--- server/services/ssh.py | 7 -- 5 files changed, 127 insertions(+), 60 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 2ca9d0c7..0ba05b4c 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -1182,19 +1182,64 @@ def get_evaluations(): return cust_jsonify(evaluation_catalogs) +def _parse_request_data_for_predict(current_request): + # TODO: Validate request data + request_data = current_request.form + model = request_data.get("model") + text = request_data.get("text") + + return { + "model": model, + "text": text + } + + +def _get_deployment_info_of_model(model): + serve_task_id = mongo_client.get_serve_task_id_of_model(model) + task_info = redis_db.hgetall(f"task:{serve_task_id}") + + alloc_resource = task_info["alloc_resource"] + serving_port = json.loads(task_info["content"])["docker"]["command"][-1] + service = task_info["service"] + list_machines = config.get_service_config(mongo_client, service)["variables"]["server_pool"] + host = [machine["host"] for machine in list_machines if machine["name"] == task_info["alloc_resource"]][0] + + return {"host": host, "port": serving_port} + + +def _check_limit_deployed_model(model): + # filter list model of entity + entity_code = mongo_client.get_entity_code_of_model(model) + list_models = mongo_client.get_models_of_entity(entity_code) + + count = 0 + # count number of deployed model + for model in list_models: + tasks = model["tasks"] + for task_id in tasks: + if task_id.split("_")[-1] == "serve": + count += 1 + + if count >= config.get_system_config()["predict"]["limit_activate"]: + return True + + return False + + @app.route("/model/predict", methods=["POST"]) @filter_request("POST/model/predict", "train") def predict(): request_data = _parse_request_data_for_predict(request) if len(request_data["text"]) > config.get_system_config()["predict"]["max_characters"]: - abort(flask.make_response(flask.jsonify(message="your input is too long"), 403)) + abort(flask.make_response(flask.jsonify(message="your input is too long!"), 403)) model = request_data["model"] - deployment_info = mongo_client.get_deployment_info_of_model(model) + deployment_info = _get_deployment_info_of_model(model) if not deployment_info: abort(flask.make_response(flask.jsonify(message="Model has not been deployed: %s" % model), 400)) - request_url = f"http://{deployment_info['resource']}:{deployment_info['serving_port']}/translate" + + request_url = f"http://{deployment_info['host']}:{deployment_info['port']}/translate" request_data = {"src": [{"text": request_data["text"]}]} response = RequestUtils.post(request_url, request_data) return response.json(), response.status_code @@ -1203,20 +1248,24 @@ def predict(): @app.route("/model/inactive", methods=["POST"]) @filter_request("POST/model/inactive", "train") def inactive_model(): - pass + request_data = _parse_request_data_for_deploy_model(request) + serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) + if not serve_task_id: + abort(flask.make_response(flask.jsonify(message="The model has not been active!"), 400)) + # terminate task + task.terminate(redis_db, serve_task_id, "inactive") -def _parse_request_data_for_predict(current_request): - # TODO: Validate request data - request_data = current_request.form - model = request_data.get("model") - text = request_data.get("text") + # release port (resource, port) + task_info = redis_db.hgetall(f"task:{serve_task_id}") + alloc_resource = task_info["alloc_resource"] + port = json.loads(task_info["content"])["docker"]["command"][-1] + redis_db.hdel(f"ports:{task_info['service']}:{alloc_resource}", port) - return { - "model": model, - "text": text - } + # remove serve task in mongodb + mongo_client.remove_serving_task_id(request_data["model"], serve_task_id) + return cust_jsonify({"message": "ok"}) @app.route("/model/active", methods=["POST"]) @filter_request("POST/model/active", "train") @@ -1225,19 +1274,26 @@ def active_model_local(): deploy model on local machine for prediction (only for model studio lite) """ request_data = _parse_request_data_for_deploy_model(request) - deployment_info = mongo_client.get_deployment_info_of_model(request_data["model"]) - if deployment_info: - abort(flask.make_response(flask.jsonify(message="The model has been active!"), 400)) + + # check limit deployed model + if _check_limit_deployed_model(request_data["model"]): + abort(flask.make_response(flask.jsonify(message="the numbers of deployed model exceeded the limit number"), 403)) + + # check model's serve task existed + serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) + if serve_task_id: + abort(flask.make_response(flask.jsonify(message="The model has been active!"), 403)) service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) # TODO: select resource for deployment resource = _select_resource_for_deploy_model(service_config) # TODO: select port for deployment port_to_deploy = 4000 # _select_port_for_deploy_model(resource) + if port_to_deploy is None: - abort(flask.make_response(flask.jsonify(message="Not available port to active"), 400)) + abort(flask.make_response(flask.jsonify(message="Not available port to active"), 404)) - create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], + _create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], port_to_deploy, "auto") # resource["host"] return cust_jsonify({"message": "ok"}) @@ -1288,7 +1344,7 @@ def _get_all_busy_port_of_resource(resource): return [] -def create_serve_task(model, service, source_language, target_language, exposed_port, resource): +def _create_serve_task(model, service, source_language, target_language, exposed_port, resource): request_data = { "source": source_language, "target": target_language @@ -1300,8 +1356,13 @@ def create_serve_task(model, service, source_language, target_language, exposed_ serve_task = TaskServe(task_infos, model, exposed_port) + # gen auth token for callback url + _, _ = post_function('POST/task/launch_v2', [serve_task.task_id], [serve_task]) + serve_task.create(redis_db, taskfile_dir) + mongo_client.insert_serving_task(model, serve_task.task_id) + def get_serve_task_config(service, request_data, routes_config): docker_image_info = TaskBase.get_docker_image_info(routes_config, request_data.get("docker_image"), mongo_client) @@ -1323,18 +1384,6 @@ def get_serve_task_config(service, request_data, routes_config): return json.loads(json.dumps(content)) -# Notes: Call this function when deploy success -def create_model_deployment_info(creator, model, resource_host, resource_port): - model_deployment_info = { - "model": model, - "resource": resource_host, - "port": resource_port, - "creator": creator - } - - mongo_client.create_deployment_info(model_deployment_info) - - @app.route("/task/launch/", methods=["POST"]) @filter_request("POST/task/launch", "train") def launch(service): diff --git a/server/nmtwizard/mongo_database.py b/server/nmtwizard/mongo_database.py index 92e5cc20..7aac5783 100644 --- a/server/nmtwizard/mongo_database.py +++ b/server/nmtwizard/mongo_database.py @@ -203,28 +203,57 @@ def create_deployment_info(self, deployment_info): the_table = self.get("serving_models") the_table.insert(deployment_info) - def get_deployment_info_of_model(self, model): + def insert_serving_task(self, model, task_id): the_table = self.get("models") query = { "model": model } - model_info = the_table.find_one(query) + update = { + "$push": {"tasks": task_id} + } + the_table.update_one(query, update) - if model_info and model_info.get("resource") is not None: - deployment_info = { - "resource": model_info["resource"], - "serving_port": model_info["serving_port"] - } - return deployment_info + def get_serve_task_id_of_model(self, model): + the_table = self.get("models") + query = { + "model": model + } + model_info = the_table.find_one(query) + tasks = model_info.get("tasks") + for task_id in tasks: + if task_id.split("_")[-1] == "serve": + return task_id return None - def update_document(self, model, data): + def remove_serving_task_id(self, model, task_id): the_table = self.get("models") query = { "model": model } + update = { - "$set": data + "$pull": {"tasks": task_id} } + the_table.update_one(query, update) + + def get_entity_code_of_model(self, model): + the_table = self.get("models") + query = { + "model": model + } + + return the_table.find_one(query)["owner"]["entity_code"] + + def get_models_of_entity(self, entity_code): + """get all models of the entity""" + the_table = self.get("models") + + query = { + "type": None, + "owner": {"entity_code": entity_code} + } + + return the_table.find(query) + diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index aa67d44d..b0c30423 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -251,11 +251,11 @@ def __init__(self, task_infos, parent_task_id, to_translate): class TaskServe(TaskBase): def __init__(self, task_infos, parent_task_id, serving_port): + """we set default 4000 for docker serving port""" self._task_suffix = "serve" self._task_type = "serve" self._parent_task_id = parent_task_id self._serving_port = serving_port - self._docker_container_port = "4000" task_infos.content["priority"] = task_infos.content.get("priority", 0) + 1 task_infos.content["ngpus"] = 0 @@ -266,7 +266,7 @@ def __init__(self, task_infos, parent_task_id, serving_port): task_infos.content["docker"]["command"] = ["-m", self._parent_task_id] task_infos.content["docker"]["command"].extend(["-ms", "pn9_model_catalog:"]) task_infos.content["docker"]["command"].extend(["serve"]) - task_infos.content["docker"]["command"].extend([self._docker_container_port + ':' + str(self._serving_port)]) + task_infos.content["docker"]["command"].extend([str(self._serving_port)]) change_parent_task(task_infos.content["docker"]["command"], parent_task_id) TaskBase.__init__(self, task_infos) diff --git a/server/nmtwizard/worker.py b/server/nmtwizard/worker.py index 3dc8a23d..3f8a97aa 100644 --- a/server/nmtwizard/worker.py +++ b/server/nmtwizard/worker.py @@ -174,6 +174,12 @@ def _handle_allocated_task(self, task_id): if v == task_id: lcpu.append(k) self._redis.hset(keyt, 'alloc_lcpu', ",".join(lcpu)) + + if task_id.split('_')[-1] == "serve": + key_port = "ports:%s:%s" % (service.name, resource) + serving_port = content['docker']['command'][-1] + self._redis.hset(key_port, serving_port, task_id) + data = service.launch( task_id, content['options'], @@ -234,16 +240,6 @@ def _handle_running_task(self, task_id): task_id, service.name) task.terminate(self._redis, task_id, phase='exited') else: - # check serving status for serve tasks - if task_id.split('_')[-1] == "serve": - model_name = json.loads(self._redis.hget(keyt, 'content'))["docker"]["command"][1] - deployment_info = mongo_client.get_deployment_info_of_model(model_name) - serving_status = RequestUtils.get(f"http://{deployment_info['resource']}:{deployment_info['serving_port']}/status") - if json.loads(serving_status.text)["status"] == "ready": - task.terminate(self._redis, task_id, phase='successful activation') - # update serving status in mongodb - mongo_client.update_document(model_name, {"serving_status": "running"}) - task.work_queue(self._redis, task_id, service.name, delay=service.is_notifying_activity and 600 or 120) except Exception as e: diff --git a/server/services/ssh.py b/server/services/ssh.py index 5dc8d778..b2052482 100644 --- a/server/services/ssh.py +++ b/server/services/ssh.py @@ -1,7 +1,6 @@ import logging import time -from app import mongo_client from nmtwizard import common from nmtwizard.service import Service from nmtwizard.capacity import Capacity @@ -151,12 +150,6 @@ def launch(self, options['server'] = resource params = _get_params(self._config, options) - # update resource for serve tasks - if task_id.split("_")[-1] == "serve": - model_name = docker_command[1] - mongo_client.update_document(model_name, {"resource": params["host"], "serving_port": - docker_command[-1].split(":")[-1]}) - client = self._get_client(params=params) try: callback_url = self._config.get('callback_url') From 8f90c1e8307afe49bf500075b0745245871c3562 Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Tue, 2 Feb 2021 10:39:59 +0700 Subject: [PATCH 07/12] refs#55704: allocate ports for serve tasks using redis --- server/app/routes.py | 30 +++++++++++++++++------------- server/nmtwizard/common.py | 2 +- server/nmtwizard/task.py | 8 ++++++++ 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 0ba05b4c..a218d881 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -1256,7 +1256,7 @@ def inactive_model(): # terminate task task.terminate(redis_db, serve_task_id, "inactive") - # release port (resource, port) + # release port task_info = redis_db.hgetall(f"task:{serve_task_id}") alloc_resource = task_info["alloc_resource"] port = json.loads(task_info["content"])["docker"]["command"][-1] @@ -1287,8 +1287,7 @@ def active_model_local(): service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) # TODO: select resource for deployment resource = _select_resource_for_deploy_model(service_config) - # TODO: select port for deployment - port_to_deploy = 4000 # _select_port_for_deploy_model(resource) + port_to_deploy = _select_port_for_deployed_model(resource) if port_to_deploy is None: abort(flask.make_response(flask.jsonify(message="Not available port to active"), 404)) @@ -1319,8 +1318,8 @@ def _select_resource_for_deploy_model(service_config): return service_config["variables"]["server_pool"][0] -def _select_port_for_deploy_model(resource_config): - busy_ports = _get_all_busy_port_of_resource(resource_config["host"]) +def _select_port_for_deployed_model(resource_config): + busy_ports = _get_all_busy_port_of_resource(GLOBAL_POOL_NAME, resource_config["name"]) port_range = resource_config["serve_port_range"] port_range_from = port_range.get("from") port_range_to = port_range.get("to") @@ -1328,17 +1327,16 @@ def _select_port_for_deploy_model(resource_config): if len(busy_ports) == 0: return port_range_from - for port in range(port_range_from, port_range_to + 1): - if port not in busy_ports: - return port + port = max(busy_ports) + 1 + if port < port_range_to: + return port return None -def _get_all_busy_port_of_resource(resource): - # TODO: change port type to list - deployment_of_resources = mongo_client.get_all_deployment_of_resource(resource) - if deployment_of_resources: - busy_ports = list(map(lambda deployment: deployment["serving_port"], deployment_of_resources)) +def _get_all_busy_port_of_resource(service, resource): + busy_ports = redis_db.hgetall(f"ports:{service}:{resource}") + if busy_ports: + busy_ports = list(map(int, list(busy_ports))) return busy_ports else: return [] @@ -1959,6 +1957,12 @@ def terminate_internal(task_id): return "problem while posting model: %s" % res, current_status task.terminate(redis_db, task_id, phase=phase) + + # release its port if it is a serve task + if task_id.split("_")[-1] == "serve": + alloc_resource, port = task.get_task_deployment_info(redis_db, task_id) + redis_db.hdel(f"ports:{GLOBAL_POOL_NAME}:{alloc_resource}", port) + return "terminating %s" % task_id, current_status diff --git a/server/nmtwizard/common.py b/server/nmtwizard/common.py index 5e6d1f1e..33ed95b5 100644 --- a/server/nmtwizard/common.py +++ b/server/nmtwizard/common.py @@ -310,7 +310,7 @@ def cmd_docker_run(lxpu, docker_options, task_id, # only for serve tasks if task_id.split("_")[-1] == "serve": serving_port = docker_command[-1] - cmd += "_o_-p_o_4000:{}".format(serving_port) + cmd += "_o_-p_o_{}:4000".format(serving_port) docker_command = docker_command[:-1] if 'mount' in docker_options: diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index b0c30423..9ccef57b 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -442,6 +442,14 @@ def list_active(redis, service): return redis.smembers("active:" + service) +def get_task_deployment_info(redis, task_id): + """get deployment info of task: host, port""" + task_info = redis.hgetall(f"task:{task_id}") + alloc_resource = task_info["alloc_resource"] + port = json.loads(task_info["content"])["docker"]["command"][-1] + + return alloc_resource, port + def info(redis, taskfile_dir, task_id, fields): """Gets information on a task.""" keyt = "task:" + task_id From 99275d44f4930d9aa091d6439e254eef82c9f9fe Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Tue, 2 Feb 2021 10:40:39 +0700 Subject: [PATCH 08/12] refs#55704: remove redundant codes --- server/nmtwizard/mongo_database.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/server/nmtwizard/mongo_database.py b/server/nmtwizard/mongo_database.py index 7aac5783..48918e1c 100644 --- a/server/nmtwizard/mongo_database.py +++ b/server/nmtwizard/mongo_database.py @@ -183,22 +183,6 @@ def get_dataset_by_ids(self, dataset_ids): }}) return dataset - def get_all_deployment_of_resource(self, resource): - the_table = self.get("models") - query = { - "resource": resource - } - model_info = the_table.find(query) - - if model_info and model_info.get("resource") is not None: - deployment_info = { - "resource": model_info["resource"], - "serving_port": model_info["serving_port"] - } - return deployment_info - - return None - def create_deployment_info(self, deployment_info): the_table = self.get("serving_models") the_table.insert(deployment_info) From 1a53cacb7d09a3c23f0d934ab0d2e2e9be3134fd Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Tue, 2 Feb 2021 11:23:37 +0700 Subject: [PATCH 09/12] refs#55704: remove serve task id in mongodb when terminating from trainer --- server/app/routes.py | 21 ++++++++++----------- server/nmtwizard/task.py | 10 ++++++---- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index a218d881..59d177e0 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -1196,13 +1196,12 @@ def _parse_request_data_for_predict(current_request): def _get_deployment_info_of_model(model): serve_task_id = mongo_client.get_serve_task_id_of_model(model) - task_info = redis_db.hgetall(f"task:{serve_task_id}") - alloc_resource = task_info["alloc_resource"] - serving_port = json.loads(task_info["content"])["docker"]["command"][-1] - service = task_info["service"] + _, alloc_resource, serving_port = task.get_task_deployment_info(redis_db, serve_task_id) + + service = GLOBAL_POOL_NAME list_machines = config.get_service_config(mongo_client, service)["variables"]["server_pool"] - host = [machine["host"] for machine in list_machines if machine["name"] == task_info["alloc_resource"]][0] + host = [machine["host"] for machine in list_machines if machine["name"] == alloc_resource][0] return {"host": host, "port": serving_port} @@ -1248,6 +1247,7 @@ def predict(): @app.route("/model/inactive", methods=["POST"]) @filter_request("POST/model/inactive", "train") def inactive_model(): + service = GLOBAL_POOL_NAME request_data = _parse_request_data_for_deploy_model(request) serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) if not serve_task_id: @@ -1257,10 +1257,8 @@ def inactive_model(): task.terminate(redis_db, serve_task_id, "inactive") # release port - task_info = redis_db.hgetall(f"task:{serve_task_id}") - alloc_resource = task_info["alloc_resource"] - port = json.loads(task_info["content"])["docker"]["command"][-1] - redis_db.hdel(f"ports:{task_info['service']}:{alloc_resource}", port) + _, alloc_resource, port = task.get_task_deployment_info(redis_db, serve_task_id) + redis_db.hdel(f"ports:{service}:{alloc_resource}", port) # remove serve task in mongodb mongo_client.remove_serving_task_id(request_data["model"], serve_task_id) @@ -1958,10 +1956,11 @@ def terminate_internal(task_id): task.terminate(redis_db, task_id, phase=phase) - # release its port if it is a serve task + # release its port and serve task in mongodb if it is a serve task if task_id.split("_")[-1] == "serve": - alloc_resource, port = task.get_task_deployment_info(redis_db, task_id) + model, alloc_resource, port = task.get_task_deployment_info(redis_db, task_id) redis_db.hdel(f"ports:{GLOBAL_POOL_NAME}:{alloc_resource}", port) + mongo_client.remove_serving_task_id(model, task_id) return "terminating %s" % task_id, current_status diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index 9ccef57b..6644521d 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -443,12 +443,14 @@ def list_active(redis, service): def get_task_deployment_info(redis, task_id): - """get deployment info of task: host, port""" + """get deployment info of task: model, alloc_resource, port""" task_info = redis.hgetall(f"task:{task_id}") - alloc_resource = task_info["alloc_resource"] - port = json.loads(task_info["content"])["docker"]["command"][-1] + content = json.loads(task_info["content"]) + model = content["docker"]["command"][1] + alloc_resource = task_info["alloc_resource"] # as trainer name, != host + port = content["docker"]["command"][-1] - return alloc_resource, port + return model, alloc_resource, port def info(redis, taskfile_dir, task_id, fields): """Gets information on a task.""" From e7b691070f1a7f6306969bf22f3decf7894ab742 Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Tue, 2 Feb 2021 15:25:18 +0700 Subject: [PATCH 10/12] refs#55704: update port allocation --- server/app/routes.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 59d177e0..180b8e00 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -31,6 +31,7 @@ from utils.request_utils import RequestUtils GLOBAL_POOL_NAME = "global_pool" +global_service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) logger = logging.getLogger(__name__) logger.addHandler(app.logger) @@ -84,7 +85,7 @@ def __init__(self, flask_global, service): 'entity_code': user.entity.entity_code, 'user_code': user.user_code } - self.service_config = config.get_service_config(mongo_client, service_name=GLOBAL_POOL_NAME) + self.service_config = global_service_config self.entity_storages_config = self._get_entity_storages(self.creator['entity_code']) self.storage_client, self.global_storage_name = StorageUtils.get_storages(GLOBAL_POOL_NAME, mongo_client, @@ -1199,8 +1200,8 @@ def _get_deployment_info_of_model(model): _, alloc_resource, serving_port = task.get_task_deployment_info(redis_db, serve_task_id) - service = GLOBAL_POOL_NAME - list_machines = config.get_service_config(mongo_client, service)["variables"]["server_pool"] + # default global service config + list_machines = global_service_config["variables"]["server_pool"] host = [machine["host"] for machine in list_machines if machine["name"] == alloc_resource][0] return {"host": host, "port": serving_port} @@ -1282,7 +1283,7 @@ def active_model_local(): if serve_task_id: abort(flask.make_response(flask.jsonify(message="The model has been active!"), 403)) - service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) + service_config = global_service_config # TODO: select resource for deployment resource = _select_resource_for_deploy_model(service_config) port_to_deploy = _select_port_for_deployed_model(resource) @@ -1325,9 +1326,9 @@ def _select_port_for_deployed_model(resource_config): if len(busy_ports) == 0: return port_range_from - port = max(busy_ports) + 1 - if port < port_range_to: - return port + for port in range(port_range_from, port_range_to + 1): + if port not in busy_ports: + return port return None From 3d35b331ce4108edc42db73e0a09f3dddb15112d Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Wed, 3 Feb 2021 11:51:01 +0700 Subject: [PATCH 11/12] refs#55704: set port allocation from launcher into worker --- server/app/routes.py | 59 +++++++++----------------------------- server/nmtwizard/task.py | 19 +++++++----- server/nmtwizard/worker.py | 35 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 53 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index 180b8e00..b4892622 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -1198,13 +1198,13 @@ def _parse_request_data_for_predict(current_request): def _get_deployment_info_of_model(model): serve_task_id = mongo_client.get_serve_task_id_of_model(model) - _, alloc_resource, serving_port = task.get_task_deployment_info(redis_db, serve_task_id) + deploy_info = task.get_task_deployment_info(redis_db, serve_task_id) # default global service config list_machines = global_service_config["variables"]["server_pool"] - host = [machine["host"] for machine in list_machines if machine["name"] == alloc_resource][0] + host = [machine["host"] for machine in list_machines if machine["name"] == deploy_info['alloc_resource']][0] - return {"host": host, "port": serving_port} + return {"host": host, "port": deploy_info['port']} def _check_limit_deployed_model(model): @@ -1248,7 +1248,6 @@ def predict(): @app.route("/model/inactive", methods=["POST"]) @filter_request("POST/model/inactive", "train") def inactive_model(): - service = GLOBAL_POOL_NAME request_data = _parse_request_data_for_deploy_model(request) serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) if not serve_task_id: @@ -1258,8 +1257,8 @@ def inactive_model(): task.terminate(redis_db, serve_task_id, "inactive") # release port - _, alloc_resource, port = task.get_task_deployment_info(redis_db, serve_task_id) - redis_db.hdel(f"ports:{service}:{alloc_resource}", port) + deploy_info = task.get_task_deployment_info(redis_db, serve_task_id) + redis_db.hdel(f"ports:{deploy_info['service']}:{deploy_info['alloc_resource']}", deploy_info['port']) # remove serve task in mongodb mongo_client.remove_serving_task_id(request_data["model"], serve_task_id) @@ -1276,23 +1275,15 @@ def active_model_local(): # check limit deployed model if _check_limit_deployed_model(request_data["model"]): - abort(flask.make_response(flask.jsonify(message="the numbers of deployed model exceeded the limit number"), 403)) + abort(flask.make_response(flask.jsonify(message="The numbers of deployed model exceeded the limit number"), 403)) # check model's serve task existed serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) if serve_task_id: abort(flask.make_response(flask.jsonify(message="The model has been active!"), 403)) - service_config = global_service_config - # TODO: select resource for deployment - resource = _select_resource_for_deploy_model(service_config) - port_to_deploy = _select_port_for_deployed_model(resource) - - if port_to_deploy is None: - abort(flask.make_response(flask.jsonify(message="Not available port to active"), 404)) - - _create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], - port_to_deploy, "auto") # resource["host"] + resource = "auto" + _create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], resource) return cust_jsonify({"message": "ok"}) @@ -1317,31 +1308,7 @@ def _select_resource_for_deploy_model(service_config): return service_config["variables"]["server_pool"][0] -def _select_port_for_deployed_model(resource_config): - busy_ports = _get_all_busy_port_of_resource(GLOBAL_POOL_NAME, resource_config["name"]) - port_range = resource_config["serve_port_range"] - port_range_from = port_range.get("from") - port_range_to = port_range.get("to") - - if len(busy_ports) == 0: - return port_range_from - - for port in range(port_range_from, port_range_to + 1): - if port not in busy_ports: - return port - return None - - -def _get_all_busy_port_of_resource(service, resource): - busy_ports = redis_db.hgetall(f"ports:{service}:{resource}") - if busy_ports: - busy_ports = list(map(int, list(busy_ports))) - return busy_ports - else: - return [] - - -def _create_serve_task(model, service, source_language, target_language, exposed_port, resource): +def _create_serve_task(model, service, source_language, target_language, resource): request_data = { "source": source_language, "target": target_language @@ -1351,7 +1318,7 @@ def _create_serve_task(model, service, source_language, target_language, exposed task_infos = TaskInfos(content=content, files={}, request_data=request_data, routes_configuration=routes_config, service=service, resource=resource) - serve_task = TaskServe(task_infos, model, exposed_port) + serve_task = TaskServe(task_infos, model) # gen auth token for callback url _, _ = post_function('POST/task/launch_v2', [serve_task.task_id], [serve_task]) @@ -1959,9 +1926,9 @@ def terminate_internal(task_id): # release its port and serve task in mongodb if it is a serve task if task_id.split("_")[-1] == "serve": - model, alloc_resource, port = task.get_task_deployment_info(redis_db, task_id) - redis_db.hdel(f"ports:{GLOBAL_POOL_NAME}:{alloc_resource}", port) - mongo_client.remove_serving_task_id(model, task_id) + deploy_info = task.get_task_deployment_info(redis_db, task_id) + redis_db.hdel(f"ports:{deploy_info['service']}:{deploy_info['alloc_resource']}", deploy_info['port']) + mongo_client.remove_serving_task_id(deploy_info['model'], task_id) return "terminating %s" % task_id, current_status diff --git a/server/nmtwizard/task.py b/server/nmtwizard/task.py index 6644521d..de1de52b 100644 --- a/server/nmtwizard/task.py +++ b/server/nmtwizard/task.py @@ -250,12 +250,11 @@ def __init__(self, task_infos, parent_task_id, to_translate): class TaskServe(TaskBase): - def __init__(self, task_infos, parent_task_id, serving_port): - """we set default 4000 for docker serving port""" + def __init__(self, task_infos, parent_task_id): + """we set default 4000 for docker serving port, so we set it when creating cmd""" self._task_suffix = "serve" self._task_type = "serve" self._parent_task_id = parent_task_id - self._serving_port = serving_port task_infos.content["priority"] = task_infos.content.get("priority", 0) + 1 task_infos.content["ngpus"] = 0 @@ -266,7 +265,7 @@ def __init__(self, task_infos, parent_task_id, serving_port): task_infos.content["docker"]["command"] = ["-m", self._parent_task_id] task_infos.content["docker"]["command"].extend(["-ms", "pn9_model_catalog:"]) task_infos.content["docker"]["command"].extend(["serve"]) - task_infos.content["docker"]["command"].extend([str(self._serving_port)]) + # task_infos.content["docker"]["command"].extend([str(self._serving_port)]) change_parent_task(task_infos.content["docker"]["command"], parent_task_id) TaskBase.__init__(self, task_infos) @@ -443,14 +442,20 @@ def list_active(redis, service): def get_task_deployment_info(redis, task_id): - """get deployment info of task: model, alloc_resource, port""" + """get deployment info of task: service pool, model, alloc_resource, port""" task_info = redis.hgetall(f"task:{task_id}") content = json.loads(task_info["content"]) model = content["docker"]["command"][1] alloc_resource = task_info["alloc_resource"] # as trainer name, != host port = content["docker"]["command"][-1] - - return model, alloc_resource, port + service = task_info["service"] + result = { + "service": service, + "model": model, + "alloc_resource": alloc_resource, + "port": port + } + return result def info(redis, taskfile_dir, task_id, fields): """Gets information on a task.""" diff --git a/server/nmtwizard/worker.py b/server/nmtwizard/worker.py index 3f8a97aa..6debaeee 100644 --- a/server/nmtwizard/worker.py +++ b/server/nmtwizard/worker.py @@ -288,6 +288,28 @@ def _block_resource(self, resource, service, err): self._redis.set(keyb, err) self._redis.expire(keyb, self._quarantine_time) + def _select_port_for_deployed_model(self, resource_config): + busy_ports = self._get_all_busy_port_of_resource(self._service, resource_config["name"]) + port_range = resource_config["serve_port_range"] + port_range_from = port_range.get("from") + port_range_to = port_range.get("to") + + if len(busy_ports) == 0: + return port_range_from + + for port in range(port_range_from, port_range_to + 1): + if port not in busy_ports: + return port + return None + + def _get_all_busy_port_of_resource(self, service, resource): + busy_ports = self._redis.hgetall(f"ports:{service}:{resource}") + if busy_ports: + busy_ports = list(map(int, list(busy_ports))) + return busy_ports + else: + return [] + def _allocate_resource(self, task_id, request_resource, service, task_expected_capacity): """Allocates a resource for task_id and returns the name of the resource (or None if none where allocated), and the number of allocated gpus/cpus @@ -678,8 +700,21 @@ def load_machines(self, service_name): request_resource = self._redis.hget(keyt, 'resource') allocated_resource = self._allocate_resource(task_id, request_resource, service, nxpus) if allocated_resource is not None: + # allocate port if it is a serve task + if task_id.split("_")[-1] == "serve": + alloc_resource_config = self._services[self._service]._machines[allocated_resource] + alloc_port = self._select_port_for_deployed_model(alloc_resource_config) + if alloc_port is None: + self._logger.info('%s: there is no port available in resource %s', task_id, allocated_resource) + break + # save port to task content in redis + task_content = json.loads(self._redis.hget(keyt, "content")) + task_content["docker"]["command"].append(alloc_port) + self._redis.hset(keyt, "content", json.dumps(task_content)) + self._logger.info('%s: resource %s reserved %s', task_id, allocated_resource, nxpus) self._redis.hset(keyt, 'alloc_resource', allocated_resource) + task.set_status(self._redis, keyt, 'allocated') task.work_queue(self._redis, task_id, service.name) self._redis.lrem(queue, 0, task_id) From cb9e7e12405f9a37fbbf0344b0d241394fd55a4d Mon Sep 17 00:00:00 2001 From: Pham Duy Hieu Date: Wed, 3 Feb 2021 14:24:14 +0700 Subject: [PATCH 12/12] refs#55704: move predict feature from github to gitlab --- server/app/routes.py | 169 +------------------------------------------ 1 file changed, 1 insertion(+), 168 deletions(-) diff --git a/server/app/routes.py b/server/app/routes.py index b4892622..237e92e6 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -28,10 +28,8 @@ # only for launch() maybe deprecated from nmtwizard.task import TaskBase from utils.storage_utils import StorageUtils -from utils.request_utils import RequestUtils GLOBAL_POOL_NAME = "global_pool" -global_service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) logger = logging.getLogger(__name__) logger.addHandler(app.logger) @@ -85,7 +83,7 @@ def __init__(self, flask_global, service): 'entity_code': user.entity.entity_code, 'user_code': user.user_code } - self.service_config = global_service_config + self.service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME) self.entity_storages_config = self._get_entity_storages(self.creator['entity_code']) self.storage_client, self.global_storage_name = StorageUtils.get_storages(GLOBAL_POOL_NAME, mongo_client, @@ -1183,171 +1181,6 @@ def get_evaluations(): return cust_jsonify(evaluation_catalogs) -def _parse_request_data_for_predict(current_request): - # TODO: Validate request data - request_data = current_request.form - model = request_data.get("model") - text = request_data.get("text") - - return { - "model": model, - "text": text - } - - -def _get_deployment_info_of_model(model): - serve_task_id = mongo_client.get_serve_task_id_of_model(model) - - deploy_info = task.get_task_deployment_info(redis_db, serve_task_id) - - # default global service config - list_machines = global_service_config["variables"]["server_pool"] - host = [machine["host"] for machine in list_machines if machine["name"] == deploy_info['alloc_resource']][0] - - return {"host": host, "port": deploy_info['port']} - - -def _check_limit_deployed_model(model): - # filter list model of entity - entity_code = mongo_client.get_entity_code_of_model(model) - list_models = mongo_client.get_models_of_entity(entity_code) - - count = 0 - # count number of deployed model - for model in list_models: - tasks = model["tasks"] - for task_id in tasks: - if task_id.split("_")[-1] == "serve": - count += 1 - - if count >= config.get_system_config()["predict"]["limit_activate"]: - return True - - return False - - -@app.route("/model/predict", methods=["POST"]) -@filter_request("POST/model/predict", "train") -def predict(): - request_data = _parse_request_data_for_predict(request) - - if len(request_data["text"]) > config.get_system_config()["predict"]["max_characters"]: - abort(flask.make_response(flask.jsonify(message="your input is too long!"), 403)) - - model = request_data["model"] - deployment_info = _get_deployment_info_of_model(model) - if not deployment_info: - abort(flask.make_response(flask.jsonify(message="Model has not been deployed: %s" % model), 400)) - - request_url = f"http://{deployment_info['host']}:{deployment_info['port']}/translate" - request_data = {"src": [{"text": request_data["text"]}]} - response = RequestUtils.post(request_url, request_data) - return response.json(), response.status_code - - -@app.route("/model/inactive", methods=["POST"]) -@filter_request("POST/model/inactive", "train") -def inactive_model(): - request_data = _parse_request_data_for_deploy_model(request) - serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) - if not serve_task_id: - abort(flask.make_response(flask.jsonify(message="The model has not been active!"), 400)) - - # terminate task - task.terminate(redis_db, serve_task_id, "inactive") - - # release port - deploy_info = task.get_task_deployment_info(redis_db, serve_task_id) - redis_db.hdel(f"ports:{deploy_info['service']}:{deploy_info['alloc_resource']}", deploy_info['port']) - - # remove serve task in mongodb - mongo_client.remove_serving_task_id(request_data["model"], serve_task_id) - - return cust_jsonify({"message": "ok"}) - -@app.route("/model/active", methods=["POST"]) -@filter_request("POST/model/active", "train") -def active_model_local(): - """ - deploy model on local machine for prediction (only for model studio lite) - """ - request_data = _parse_request_data_for_deploy_model(request) - - # check limit deployed model - if _check_limit_deployed_model(request_data["model"]): - abort(flask.make_response(flask.jsonify(message="The numbers of deployed model exceeded the limit number"), 403)) - - # check model's serve task existed - serve_task_id = mongo_client.get_serve_task_id_of_model(request_data["model"]) - if serve_task_id: - abort(flask.make_response(flask.jsonify(message="The model has been active!"), 403)) - - resource = "auto" - _create_serve_task(request_data["model"], GLOBAL_POOL_NAME, request_data["source"], request_data["target"], resource) - - return cust_jsonify({"message": "ok"}) - - -def _parse_request_data_for_deploy_model(current_request): - # TODO: Validate request data - request_data = current_request.form - model = request_data.get("model") - language_pair = model.split('_')[1] - source_language = language_pair[:2] - target_language = language_pair[2:] - - return { - "source": source_language, - "target": target_language, - "model": model - } - - -def _select_resource_for_deploy_model(service_config): - # TODO: Select suitable resource for deploy - return service_config["variables"]["server_pool"][0] - - -def _create_serve_task(model, service, source_language, target_language, resource): - request_data = { - "source": source_language, - "target": target_language - } - routes_config = RoutesConfiguration(flask.g, service) - content = get_serve_task_config(service=service, request_data=request_data, routes_config=routes_config) - task_infos = TaskInfos(content=content, files={}, request_data=request_data, routes_configuration=routes_config, - service=service, resource=resource) - - serve_task = TaskServe(task_infos, model) - - # gen auth token for callback url - _, _ = post_function('POST/task/launch_v2', [serve_task.task_id], [serve_task]) - - serve_task.create(redis_db, taskfile_dir) - - mongo_client.insert_serving_task(model, serve_task.task_id) - - -def get_serve_task_config(service, request_data, routes_config): - docker_image_info = TaskBase.get_docker_image_info(routes_config, request_data.get("docker_image"), mongo_client) - - content = { - "service": service, - "docker": {**docker_image_info}, - "wait_after_launch": 2, - "creator": f"{routes_config.entity_owner}{routes_config.creator['user_code']}", - "options": {}, - } - - if request_data.get("ncpus"): - content["ncpus"] = request_data["ncpus"] - if request_data.get("priority"): - content["priority"] = request_data["priority"] - if request_data.get("iterations"): - content["iterations"] = request_data["iterations"] - return json.loads(json.dumps(content)) - - @app.route("/task/launch/", methods=["POST"]) @filter_request("POST/task/launch", "train") def launch(service):