Skip to content
This repository has been archived by the owner on Sep 19, 2023. It is now read-only.

Merge predict feature into model studio lite #131

Open
wants to merge 13 commits into
base: dev/model_studio_lite
Choose a base branch
from
Open
16 changes: 11 additions & 5 deletions server/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
get_cpu_count, get_params, boolean_param, change_parent_task, remove_config_option, model_name_analysis
from nmtwizard.capacity import Capacity
from nmtwizard.task import TaskEnum, TaskInfos, TasksCreationInfos, TaskPreprocess, TaskTrain, TaskTranslate, \
TaskScoring, TASK_RELEASE_TYPE
TaskScoring, TaskServe, TASK_RELEASE_TYPE
# only for launch() maybe deprecated
from nmtwizard.task import TaskBase
from utils.storage_utils import StorageUtils
Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(self, flask_global, service):
'entity_code': user.entity.entity_code,
'user_code': user.user_code
}
self.service_config = config.get_service_config(mongo_client, service_name=GLOBAL_POOL_NAME)
self.service_config = config.get_service_config(mongo_client, GLOBAL_POOL_NAME)
self.entity_storages_config = self._get_entity_storages(self.creator['entity_code'])
self.storage_client, self.global_storage_name = StorageUtils.get_storages(GLOBAL_POOL_NAME,
mongo_client,
Expand Down Expand Up @@ -849,7 +849,6 @@ def get_translate_score_corpus(testing_data_infos, request_data, routes_config,
source = request_data["source"]
target = request_data["target"]
default_test_data = get_default_test_data(routes_config.storage_client, source, target) if with_default_test else []

to_translate_corpus = []
to_score_corpus = []
for corpus in testing_data_infos:
Expand Down Expand Up @@ -1542,7 +1541,7 @@ def launch(service):
"registry": get_registry(service_module, image_score),
"tag": "latest",
"command": ["score", "-o"] + oref["output"] + ["-r"] + oref["ref"] +
option_lang + ['-f', "launcher:scores"]
option_lang + ['-f', "launcher:scores"]
}

score_task_id, explicit_name = build_task_id(content_score, xxyy, "score", parent_task_id)
Expand Down Expand Up @@ -1597,7 +1596,7 @@ def launch(service):
"registry": get_registry(service_module, image_score),
"tag": "latest",
"command": ["tuminer", "--tumode", "score", "--srcfile"] + in_out["infile"] + ["--tgtfile"] +
in_out["outfile"] + ["--output"] + in_out["scorefile"]
in_out["outfile"] + ["--output"] + in_out["scorefile"]
}

tuminer_task_id, explicit_name = build_task_id(content_tuminer, xxyy, "tuminer", parent_task_id)
Expand Down Expand Up @@ -1800,6 +1799,13 @@ def terminate_internal(task_id):
return "problem while posting model: %s" % res, current_status

task.terminate(redis_db, task_id, phase=phase)

# release its port and serve task in mongodb if it is a serve task
if task_id.split("_")[-1] == "serve":
deploy_info = task.get_task_deployment_info(redis_db, task_id)
redis_db.hdel(f"ports:{deploy_info['service']}:{deploy_info['alloc_resource']}", deploy_info['port'])
mongo_client.remove_serving_task_id(deploy_info['model'], task_id)

return "terminating %s" % task_id, current_status


Expand Down
27 changes: 18 additions & 9 deletions server/nmtwizard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def displaycmd(lst):
while p != -1:
q = t.find("]]", p)
if q != -1:
t = t[0:p] + t[q+2:]
t = t[0:p] + t[q + 2:]
else:
t = t[0:p]
p = t.find("[[private:", p)
Expand All @@ -45,10 +45,10 @@ def rmprivate(lst):
if isinstance(t, six.string_types):
p = t.find("[[private:")
while p != -1:
t = t[0:p] + t[p+10:]
t = t[0:p] + t[p + 10:]
q = t.find("]]")
if q != -1:
t = t[0:q] + t[q+2:]
t = t[0:q] + t[q + 2:]
p = t.find("[[private:", q)
else:
p = -1
Expand Down Expand Up @@ -165,7 +165,7 @@ def ssh_connect_with_retry(hostname,
pkey=pkey,
key_filename=key_filename,
look_for_keys=False)
logger.info("Connection to %s successful (%f)", hostname, time.time()-start)
logger.info("Connection to %s successful (%f)", hostname, time.time() - start)
if login_cmd is not None:
if not run_and_check_command(client, login_cmd):
raise RuntimeError("failed to run login command")
Expand Down Expand Up @@ -267,9 +267,9 @@ def cmd_connect_private_registry(docker_registry):
if docker_registry['type'] == "aws":
return ('$(AWS_ACCESS_KEY_ID=%s AWS_SECRET_ACCESS_KEY=%s '
'aws ecr get-login --no-include-email --region %s)') % (
docker_registry['credentials']['AWS_ACCESS_KEY_ID'],
docker_registry['credentials']['AWS_SECRET_ACCESS_KEY'],
docker_registry['region'])
docker_registry['credentials']['AWS_ACCESS_KEY_ID'],
docker_registry['credentials']['AWS_SECRET_ACCESS_KEY'],
docker_registry['region'])
username = docker_registry['credentials']['username']
password = docker_registry['credentials']['password']
return 'docker login --username %s --password %s' % (username, password)
Expand All @@ -291,8 +291,8 @@ def cmd_docker_run(lxpu, docker_options, task_id,
if nbgpu == 0 or (nbgpu == 1 and lgpu[0] == 0):
gpu_id = '0'
else:
env['NV_GPU'] = ",".join([str(int(g)-1) for g in lgpu])
gpu_id = ",".join([str(v) for v in range(1, nbgpu+1)])
env['NV_GPU'] = ",".join([str(int(g) - 1) for g in lgpu])
gpu_id = ",".join([str(v) for v in range(1, nbgpu + 1)])

if docker_options.get('dev') == 1:
return "sleep 35"
Expand All @@ -310,8 +310,16 @@ def cmd_docker_run(lxpu, docker_options, task_id,

# launch the task
cmd = '%s_o_run_o_-i_o_--rm' % docker_cmd

# only for serve tasks
if task_id.split("_")[-1] == "serve":
serving_port = docker_command[-1]
cmd += "_o_-p_o_{}:4000".format(serving_port)
docker_command = docker_command[:-1]

if need_expose_gpus:
cmd += '_o_--gpus=all'

if 'mount' in docker_options:
for k in docker_options['mount']:
cmd += '_o_-v_o_%s' % k
Expand Down Expand Up @@ -416,6 +424,7 @@ def launch_task(task_id,
* `callback_url`: server to callback for beat of activity
* `callback_interval`: time between 2 beats
"""

(lgpu, lcpu) = lxpu
gpu_id = ",".join(lgpu)
logger.info("launching task - %s / %s", task_id, gpu_id)
Expand Down
62 changes: 61 additions & 1 deletion server/nmtwizard/mongo_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"tags": "pn9-tag",
"dockers": "pn9-docker",
"evaluations": "pn9-evaluation",
"dataset": "pn9-dataset"
"dataset": "pn9-dataset",
"models": "pn9-model"
}


Expand Down Expand Up @@ -181,3 +182,62 @@ def get_dataset_by_ids(self, dataset_ids):
"$in": dataset_ids
}})
return dataset

def create_deployment_info(self, deployment_info):
the_table = self.get("serving_models")
the_table.insert(deployment_info)

def insert_serving_task(self, model, task_id):
the_table = self.get("models")
query = {
"model": model
}
update = {
"$push": {"tasks": task_id}
}
the_table.update_one(query, update)

def get_serve_task_id_of_model(self, model):
the_table = self.get("models")
query = {
"model": model
}
model_info = the_table.find_one(query)
tasks = model_info.get("tasks")
for task_id in tasks:
if task_id.split("_")[-1] == "serve":
return task_id

return None

def remove_serving_task_id(self, model, task_id):
the_table = self.get("models")
query = {
"model": model
}

update = {
"$pull": {"tasks": task_id}
}

the_table.update_one(query, update)

def get_entity_code_of_model(self, model):
the_table = self.get("models")
query = {
"model": model
}

return the_table.find_one(query)["owner"]["entity_code"]

def get_models_of_entity(self, entity_code):
"""get all models of the entity"""
the_table = self.get("models")

query = {
"type": None,
"owner": {"entity_code": entity_code}
}

return the_table.find(query)

84 changes: 61 additions & 23 deletions server/nmtwizard/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,28 @@ def __init__(self, task_infos, parent_task_id, to_translate):
TaskBase.__init__(self, task_infos)


class TaskServe(TaskBase):
def __init__(self, task_infos, parent_task_id):
"""we set default 4000 for docker serving port, so we set it when creating cmd"""
self._task_suffix = "serve"
self._task_type = "serve"
self._parent_task_id = parent_task_id

task_infos.content["priority"] = task_infos.content.get("priority", 0) + 1
task_infos.content["ngpus"] = 0
if "ncpus" not in task_infos.content:
task_infos.content["ncpus"] = get_cpu_count(task_infos.routes_configuration.service_config,
task_infos.content["ngpus"], "trans")

task_infos.content["docker"]["command"] = ["-m", self._parent_task_id]
task_infos.content["docker"]["command"].extend(["-ms", "pn9_model_catalog:"])
task_infos.content["docker"]["command"].extend(["serve"])
# task_infos.content["docker"]["command"].extend([str(self._serving_port)])
change_parent_task(task_infos.content["docker"]["command"], parent_task_id)

TaskBase.__init__(self, task_infos)


class TaskScoring(TaskBase):
def __init__(self, task_infos, parent_task_id, model, to_score):
self._task_suffix = "score"
Expand Down Expand Up @@ -364,7 +386,7 @@ def terminate(redis, task_id, phase):
# remove from service queue if it was there
service = redis.hget(keyt, "service")
if service is not None:
redis.lrem('queued:'+service, 0, task_id)
redis.lrem('queued:' + service, 0, task_id)

redis.hset(keyt, "message", phase)
set_status(redis, keyt, "terminating")
Expand All @@ -373,50 +395,66 @@ def terminate(redis, task_id, phase):

def work_queue(redis, task_id, service=None, delay=0):
if service is None:
service = redis.hget('task:'+task_id, 'service')
service = redis.hget('task:' + task_id, 'service')
# Queues the task in the work queue with a delay.
if delay == 0:
with redis.acquire_lock(f'work_queue:{task_id}', acquire_timeout=1, expire_time=10):
if task_id not in redis.lrange(f'work:{service}', 0, -1):
redis.lpush('work:'+service, task_id)
redis.delete('queue:'+task_id)
redis.lpush('work:' + service, task_id)
redis.delete('queue:' + task_id)
else:
redis.set('queue:'+task_id, delay)
redis.expire('queue:'+task_id, int(delay))
redis.set('queue:' + task_id, delay)
redis.expire('queue:' + task_id, int(delay))


def work_unqueue(redis, service):
"""Pop a task from the work queue."""
return redis.rpop('work:'+service)
return redis.rpop('work:' + service)


def service_queue(redis, task_id, service):
"""Queue the task on the service queue."""
with redis.acquire_lock('service:'+service):
redis.lrem('queued:'+service, 0, task_id)
redis.lpush('queued:'+service, task_id)
redis.delete('queue:'+task_id)
with redis.acquire_lock('service:' + service):
redis.lrem('queued:' + service, 0, task_id)
redis.lpush('queued:' + service, task_id)
redis.delete('queue:' + task_id)


def enable(redis, task_id, service=None):
if service is None:
service = redis.hget('task:'+task_id, 'service')
service = redis.hget('task:' + task_id, 'service')
# Marks a task as enabled.
redis.sadd("active:"+service, task_id)
redis.sadd("active:" + service, task_id)


def disable(redis, task_id, service=None):
if service is None:
service = redis.hget('task:'+task_id, 'service')
service = redis.hget('task:' + task_id, 'service')
# Marks a task as disabled.
redis.srem("active:"+service, task_id)
redis.delete("beat:"+task_id)
redis.srem("active:" + service, task_id)
redis.delete("beat:" + task_id)


def list_active(redis, service):
"""Returns all active tasks (i.e. non stopped)."""
return redis.smembers("active:"+service)

return redis.smembers("active:" + service)


def get_task_deployment_info(redis, task_id):
"""get deployment info of task: service pool, model, alloc_resource, port"""
task_info = redis.hgetall(f"task:{task_id}")
content = json.loads(task_info["content"])
model = content["docker"]["command"][1]
alloc_resource = task_info["alloc_resource"] # as trainer name, != host
port = content["docker"]["command"][-1]
service = task_info["service"]
result = {
"service": service,
"model": model,
"alloc_resource": alloc_resource,
"port": port
}
return result

def info(redis, taskfile_dir, task_id, fields):
"""Gets information on a task."""
Expand Down Expand Up @@ -459,8 +497,8 @@ def change(redis, task_id, service, priority, ngpus):
disable(redis, task_id, prev_service)
enable(redis, task_id, service)
redis.hset(keyt, "service", service)
redis.lrem('queued:'+prev_service, 0, task_id)
redis.lpush('queued:'+service, task_id)
redis.lrem('queued:' + prev_service, 0, task_id)
redis.lpush('queued:' + service, task_id)
if priority:
redis.hset(keyt, "priority", priority)
if ngpus:
Expand Down Expand Up @@ -554,7 +592,7 @@ def set_file(redis, taskfile_dir, task_id, content, filename, limit=None):
with open(os.path.join(taskdir, filename), "wb") as fh:
content = six.ensure_binary(content)
if limit and len(content) >= limit:
content = content[:limit-len(disclaimer)] + disclaimer
content = content[:limit - len(disclaimer)] + disclaimer
fh.write(content)
return content

Expand All @@ -570,8 +608,8 @@ def append_file(redis, taskfile_dir, task_id, content, filename, limit=None):
if limit and current_size >= limit:
return
content = six.ensure_binary(content, encoding="utf-8")
if limit and len(content)+current_size >= limit:
content = content[:limit-len(disclaimer)] + disclaimer
if limit and len(content) + current_size >= limit:
content = content[:limit - len(disclaimer)] + disclaimer
with open(filepath, "ab") as fh:
fh.write(content)

Expand Down
Loading