Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace production tpv api call with database query #1978

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,54 @@ tools:
from galaxy.jobs.mapper import JobNotReadyException
raise JobNotReadyException()
rank: | # TODO: rank function needs overhaul to consider memory
import requests
import random
grafana_destinations = { # dict of destinations where the key from the stats api call differs from the destination id
'Galaxy-Main': 'slurm',
'pulsar-high-mem-1': 'pulsar-high-mem1',
'pulsar-qld-himem-0': 'pulsar-qld-high-mem0',
'pulsar-qld-himem-1': 'pulsar-qld-high-mem1',
'pulsar-qld-himem-2': 'pulsar-qld-high-mem2',
}
params = {
'pretty': 'true',
'db': 'queues',
'q': 'SELECT last("percent_allocated") from "sinfo" group by "host"'
}
if len(candidate_destinations) > 1:
if len(candidate_destinations) <= 1:
log.info("++ ---tpv rank debug: 1 or fewer destinations: returning candidate_destinations")
final_destinations = candidate_destinations
else:
import random
from sqlalchemy import text

raw_sql_query = """
select destination_id, state, count(id), sum(cores), sum(mem)
from (
select id,
CAST((REGEXP_MATCHES(encode(destination_params, 'escape'),'ntasks=(\d+)'))[1] as INTEGER) as cores,
CAST((REGEXP_MATCHES(encode(destination_params, 'escape'),'mem=(\d+)'))[1] as INTEGER) as mem,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to avoid the regex if we merge this: galaxyproject/tpv-shared-database#61

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually yes, there should be a more pleasant query, even one that can be expressed in sqlalchemy. For now, this works

state,
destination_id
from job
where state='queued' or state='running'
order by destination_id
) as foo
group by destination_id, state;
"""

try:
response = requests.get('https://stats.usegalaxy.org.au:8086/query', auth=('grafana', '{{ vault_influx_grafana_password }}'), params=params)
data = response.json()
cpu_by_destination = {grafana_destinations.get(s['tags']['host'], s['tags']['host']):s['values'][0][1] for s in data.get('results')[0].get('series', [])}
# sort by cpu usage
candidate_destinations.sort(key=lambda d: (cpu_by_destination.get(d.id), random.randint(0,9)))
results = app.model.context.execute(text(raw_sql_query))
db_queue_info = {}
for row in results:
# log.info(f"++ ---tpv rank debug: row returned by db query: {str(row)}")
destination_id, state, count_id, sum_cores, sum_mem = row
if not destination_id in db_queue_info:
db_queue_info[destination_id] = {
'queued': {'sum_cores': 0, 'sum_mem': 0.0, 'job_count': 0},
'running': {'sum_cores': 0, 'sum_mem': 0.0, 'job_count': 0},
}
db_queue_info[destination_id][state] = {'sum_cores': sum_cores, 'sum_mem': sum_mem, 'job_count': count_id}

def destination_usage_proportion(destination):
if not destination.context.get('destination_total_mem') or not destination.context.get('destination_total_cores'):
raise Exception(f"++ ---tpv rank debug: At least one of destination_total_mem, destination_total_cores is unavailable")
destination_total_cores = destination.context.get('destination_total_cores')
return_value = sum([db_queue_info.get(destination.id, {}).get(state, {}).get('sum_cores', 0) for state in ['queued', 'running']])/destination_total_cores
log.info(f"++ ---tpv rank debug: returning usage proportion value for destination {destination}: {str(return_value)}")
return return_value

# Sort by cpu usage as with previous method. This time queued cpu commitment counts towards CPU usage
candidate_destinations.sort(key=lambda d: (destination_usage_proportion(d.id), random.randint(0,9)))
final_destinations = candidate_destinations
except Exception:
log.exception("An error occurred while querying influxdb. Using a weighted random candidate destination")
log.exception("An error occurred with database query and/or surrounding logic. Using a weighted random candidate destination")
final_destinations = helpers.weighted_random_sampling(candidate_destinations)
else:
final_destinations = candidate_destinations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ destinations:
default:
runner: slurm
abstract: true
context:
destination_total_cores: null
destination_total_mem: null
# params: # commented out 3/4/23 to check whether this helps with load on nfs
# tmp_dir: True # TODO: check that this works
scheduling:
Expand Down Expand Up @@ -64,6 +67,9 @@ destinations:
runner: slurm
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['slurm'].tags }}
scheduling:
accept:
Expand All @@ -75,6 +81,9 @@ destinations:
runner: slurm
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['slurm-training'].tags }}
scheduling:
accept:
Expand All @@ -86,6 +95,9 @@ destinations:
interactive_pulsar:
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
runner: pulsar_embedded
tags: {{ job_conf_limits.environments['interactive_pulsar'].tags }}
params:
Expand All @@ -107,6 +119,9 @@ destinations:
runner: pulsar_mel2_runner
max_accepted_cores: 8
max_accepted_mem: 31.45
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['pulsar-mel2'].tags }}
scheduling:
accept:
Expand All @@ -118,6 +133,9 @@ destinations:
runner: pulsar-mel3_runner
max_accepted_cores: 32
max_accepted_mem: 63 # reduced from 122.86 to assist with scheduling
context:
destination_total_cores: 144
destination_total_mem: 553
tags: {{ job_conf_limits.environments['pulsar-mel3'].tags }}
scheduling:
accept:
Expand All @@ -131,6 +149,9 @@ destinations:
runner: pulsar-high-mem1_runner
max_accepted_cores: 126
max_accepted_mem: 3845.1
context:
destination_total_cores: 126
destination_total_mem: 3845.1
min_accepted_mem: 62.51
tags: {{ job_conf_limits.environments['pulsar-high-mem1'].tags }}
scheduling:
Expand All @@ -143,6 +164,9 @@ destinations:
runner: pulsar-high-mem2_runner
max_accepted_cores: 126
max_accepted_mem: 1922.49
context:
destination_total_cores: 126
destination_total_mem: 1922.49
# min_accepted_mem: 62.51 # 2/10/23 accept all job sizes
tags: {{ job_conf_limits.environments['pulsar-high-mem2'].tags }}
scheduling:
Expand All @@ -155,6 +179,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem0'].tags }}
scheduling:
accept:
Expand All @@ -167,6 +194,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem1'].tags }}
scheduling:
accept:
Expand All @@ -180,6 +210,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem2'].tags }}
scheduling:
accept:
Expand All @@ -191,6 +224,9 @@ destinations:
runner: pulsar-nci-training_runner
max_accepted_cores: 16
max_accepted_mem: 47.07
context:
destination_total_cores: 192
destination_total_mem: 565
tags: {{ job_conf_limits.environments['pulsar-nci-training'].tags }}
scheduling:
accept:
Expand All @@ -205,6 +241,9 @@ destinations:
runner: pulsar-qld-blast_runner
max_accepted_cores: 60
max_accepted_mem: 200
context:
destination_total_cores: 60
destination_total_mem: 200
tags: {{ job_conf_limits.environments['pulsar-qld-blast'].tags }}
scheduling:
accept:
Expand All @@ -217,6 +256,9 @@ destinations:
runner: pulsar-QLD_runner
max_accepted_cores: 16
max_accepted_mem: 62.72
context:
destination_total_cores: 112
destination_total_mem: 439
tags: {{ job_conf_limits.environments['pulsar-QLD'].tags }}
scheduling:
accept:
Expand All @@ -229,6 +271,9 @@ destinations:
runner: pulsar_azure_0_runner
max_accepted_cores: 24
max_accepted_mem: 209
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure'].tags }}
scheduling:
require:
Expand All @@ -240,6 +285,9 @@ destinations:
max_accepted_cores: 24
max_accepted_mem: 209
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure-gpu'].tags }}
scheduling:
require:
Expand All @@ -258,6 +306,9 @@ destinations:
max_accepted_cores: 24
max_accepted_mem: 209
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure-1-gpu'].tags }}
scheduling:
require:
Expand All @@ -276,6 +327,9 @@ destinations:
max_accepted_cores: 64
max_accepted_mem: 582
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 582
scheduling:
require:
- pulsar
Expand Down
Loading