Skip to content

Commit

Permalink
Merge pull request #1978 from cat-bro/production_replace_tpv_api_call…
Browse files Browse the repository at this point in the history
…_with_database_query

replace production tpv api call with database query
  • Loading branch information
cat-bro authored May 29, 2024
2 parents 24d8ac2 + 50019cd commit df45620
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,53 @@ tools:
from galaxy.jobs.mapper import JobNotReadyException
raise JobNotReadyException()
rank: | # TODO: rank function needs overhaul to consider memory
import requests
import random
grafana_destinations = { # dict of destinations where the key from the stats api call differs from the destination id
'Galaxy-Main': 'slurm',
'pulsar-high-mem-1': 'pulsar-high-mem1',
'pulsar-qld-himem-0': 'pulsar-qld-high-mem0',
'pulsar-qld-himem-1': 'pulsar-qld-high-mem1',
'pulsar-qld-himem-2': 'pulsar-qld-high-mem2',
}
params = {
'pretty': 'true',
'db': 'queues',
'q': 'SELECT last("percent_allocated") from "sinfo" group by "host"'
}
if len(candidate_destinations) > 1:
if len(candidate_destinations) <= 1:
log.info("++ ---tpv rank debug: 1 or fewer destinations: returning candidate_destinations")
final_destinations = candidate_destinations
else:
import random
from sqlalchemy import text

raw_sql_query = """
select destination_id, state, count(id), sum(cores), sum(mem)
from (
select id,
CAST((REGEXP_MATCHES(encode(destination_params, 'escape'),'ntasks=(\d+)'))[1] as INTEGER) as cores,
CAST((REGEXP_MATCHES(encode(destination_params, 'escape'),'mem=(\d+)'))[1] as INTEGER) as mem,
state,
destination_id
from job
where state='queued' or state='running'
order by destination_id
) as foo
group by destination_id, state;
"""

try:
response = requests.get('https://stats.usegalaxy.org.au:8086/query', auth=('grafana', '{{ vault_influx_grafana_password }}'), params=params)
data = response.json()
cpu_by_destination = {grafana_destinations.get(s['tags']['host'], s['tags']['host']):s['values'][0][1] for s in data.get('results')[0].get('series', [])}
# sort by cpu usage
candidate_destinations.sort(key=lambda d: (cpu_by_destination.get(d.id), random.randint(0,9)))
results = app.model.context.execute(text(raw_sql_query))
db_queue_info = {}
for row in results:
# log.info(f"++ ---tpv rank debug: row returned by db query: {str(row)}")
destination_id, state, count_id, sum_cores, sum_mem = row
if not destination_id in db_queue_info:
db_queue_info[destination_id] = {
'queued': {'sum_cores': 0, 'sum_mem': 0.0, 'job_count': 0},
'running': {'sum_cores': 0, 'sum_mem': 0.0, 'job_count': 0},
}
db_queue_info[destination_id][state] = {'sum_cores': sum_cores, 'sum_mem': sum_mem, 'job_count': count_id}

def destination_usage_proportion(destination):
if not destination.context.get('destination_total_mem') or not destination.context.get('destination_total_cores'):
raise Exception(f"++ ---tpv rank debug: At least one of destination_total_mem, destination_total_cores is unavailable")
destination_total_cores = destination.context.get('destination_total_cores')
return_value = sum([db_queue_info.get(destination.id, {}).get(state, {}).get('sum_cores', 0) for state in ['queued', 'running']])/destination_total_cores
log.info(f"++ ---tpv rank debug: returning usage proportion value for destination {destination}: {str(return_value)}")
return return_value

# Sort by cpu usage as with previous method. This time queued cpu commitment counts towards CPU usage
candidate_destinations.sort(key=lambda d: (destination_usage_proportion(d.id), random.randint(0,9)))
final_destinations = candidate_destinations
except Exception:
log.exception("An error occurred while querying influxdb. Using a weighted random candidate destination")
log.exception("++ ---tpv rank debug: An error occurred with database query and/or surrounding logic. Using a weighted random candidate destination")
final_destinations = helpers.weighted_random_sampling(candidate_destinations)
else:
final_destinations = candidate_destinations
final_destinations
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ destinations:
default:
runner: slurm
abstract: true
context:
destination_total_cores: null
destination_total_mem: null
# params: # commented out 3/4/23 to check whether this helps with load on nfs
# tmp_dir: True # TODO: check that this works
scheduling:
Expand Down Expand Up @@ -64,6 +67,9 @@ destinations:
runner: slurm
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['slurm'].tags }}
scheduling:
accept:
Expand All @@ -75,6 +81,9 @@ destinations:
runner: slurm
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['slurm-training'].tags }}
scheduling:
accept:
Expand All @@ -86,6 +95,9 @@ destinations:
interactive_pulsar:
max_accepted_cores: 32
max_accepted_mem: 122.86
context:
destination_total_cores: 192
destination_total_mem: 737
runner: pulsar_embedded
tags: {{ job_conf_limits.environments['interactive_pulsar'].tags }}
params:
Expand All @@ -107,6 +119,9 @@ destinations:
runner: pulsar_mel2_runner
max_accepted_cores: 8
max_accepted_mem: 31.45
context:
destination_total_cores: 192
destination_total_mem: 737
tags: {{ job_conf_limits.environments['pulsar-mel2'].tags }}
scheduling:
accept:
Expand All @@ -118,6 +133,9 @@ destinations:
runner: pulsar-mel3_runner
max_accepted_cores: 32
max_accepted_mem: 63 # reduced from 122.86 to assist with scheduling
context:
destination_total_cores: 144
destination_total_mem: 553
tags: {{ job_conf_limits.environments['pulsar-mel3'].tags }}
scheduling:
accept:
Expand All @@ -131,6 +149,9 @@ destinations:
runner: pulsar-high-mem1_runner
max_accepted_cores: 126
max_accepted_mem: 3845.1
context:
destination_total_cores: 126
destination_total_mem: 3845.1
min_accepted_mem: 62.51
tags: {{ job_conf_limits.environments['pulsar-high-mem1'].tags }}
scheduling:
Expand All @@ -143,6 +164,9 @@ destinations:
runner: pulsar-high-mem2_runner
max_accepted_cores: 126
max_accepted_mem: 1922.49
context:
destination_total_cores: 126
destination_total_mem: 1922.49
# min_accepted_mem: 62.51 # 2/10/23 accept all job sizes
tags: {{ job_conf_limits.environments['pulsar-high-mem2'].tags }}
scheduling:
Expand All @@ -155,6 +179,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem0'].tags }}
scheduling:
accept:
Expand All @@ -167,6 +194,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem1'].tags }}
scheduling:
accept:
Expand All @@ -180,6 +210,9 @@ destinations:
max_accepted_cores: 240
max_accepted_mem: 3845.07
min_accepted_mem: 62.51
context:
destination_total_cores: 240
destination_total_mem: 3845.07
tags: {{ job_conf_limits.environments['pulsar-qld-high-mem2'].tags }}
scheduling:
accept:
Expand All @@ -191,6 +224,9 @@ destinations:
runner: pulsar-nci-training_runner
max_accepted_cores: 16
max_accepted_mem: 47.07
context:
destination_total_cores: 192
destination_total_mem: 565
tags: {{ job_conf_limits.environments['pulsar-nci-training'].tags }}
scheduling:
accept:
Expand All @@ -205,6 +241,9 @@ destinations:
runner: pulsar-qld-blast_runner
max_accepted_cores: 60
max_accepted_mem: 200
context:
destination_total_cores: 60
destination_total_mem: 200
tags: {{ job_conf_limits.environments['pulsar-qld-blast'].tags }}
scheduling:
accept:
Expand All @@ -218,6 +257,9 @@ destinations:
runner: pulsar-QLD_runner
max_accepted_cores: 16
max_accepted_mem: 62.72
context:
destination_total_cores: 112
destination_total_mem: 439
tags: {{ job_conf_limits.environments['pulsar-QLD'].tags }}
scheduling:
accept:
Expand All @@ -230,6 +272,9 @@ destinations:
runner: pulsar_azure_0_runner
max_accepted_cores: 24
max_accepted_mem: 209
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure'].tags }}
scheduling:
require:
Expand All @@ -241,6 +286,9 @@ destinations:
max_accepted_cores: 24
max_accepted_mem: 209
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure-gpu'].tags }}
scheduling:
require:
Expand All @@ -259,6 +307,9 @@ destinations:
max_accepted_cores: 24
max_accepted_mem: 209
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 558
tags: {{ job_conf_limits.environments['pulsar-azure-1-gpu'].tags }}
scheduling:
require:
Expand All @@ -277,6 +328,9 @@ destinations:
max_accepted_cores: 64
max_accepted_mem: 582
max_accepted_gpus: 1
context:
destination_total_cores: 64
destination_total_mem: 582
scheduling:
require:
- pulsar
Expand Down

0 comments on commit df45620

Please sign in to comment.