Skip to content

Commit

Permalink
Merge branch 'main' of github.com:mochi-hpc/mochi-margo
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Oct 21, 2024
2 parents 80795f5 + 8e7ddf1 commit ac07b7d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 77 deletions.
4 changes: 4 additions & 0 deletions src/margo-config.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ char* margo_get_config_opt(margo_instance_id mid, int options)
json_object_object_add_ex(
root, "progress_timeout_ub_msec",
json_object_new_uint64(mid->hg_progress_timeout_ub), flags);
// progress_spindown_msec
json_object_object_add_ex(
root, "progress_spindown_msec",
json_object_new_uint64(mid->hg_progress_spindown_msec), flags);
// handle_cache_size
json_object_object_add_ex(root, "handle_cache_size",
json_object_new_uint64(mid->handle_cache_size),
Expand Down
104 changes: 67 additions & 37 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,8 @@ void __margo_hg_progress_fn(void* foo)
unsigned int hg_progress_timeout;
double next_timer_exp;
unsigned int pending;
int spin_flag = 0;
double spin_start_ts = 0;

while (!mid->hg_progress_shutdown_flag) {
do {
Expand All @@ -1965,48 +1967,76 @@ void __margo_hg_progress_fn(void* foo)
ret = margo_internal_trigger(mid, 0, 1, &actual_count);
} while ((ret == HG_SUCCESS) && actual_count
&& !mid->hg_progress_shutdown_flag);
/* once we have processed callbacks, give the ES an opportunity to
* run other ULTs if it needs to.

/* Yield now to give an opportunity for this ES to either a) run other
* ULTs that are eligible in this pool or b) check for runnable ULTs
* in other pools that the ES is associated with.
*/
ABT_thread_yield();

/* Check to see if there are any runnable ULTs in the pool now. If
* so, then we yield here to allow them a chance to execute.
* We check here because new ULTs may now be elegible as a result of
* being spawned by the trigger, but existing ones also may have been
* activated by an external event.
*
* NOTE: the output size value does not count the calling ULT itself,
* because it is not technically in the pool as a runnable thread at
* the moment.
*/
ABT_pool progress_pool = MARGO_PROGRESS_POOL(mid);
ABT_pool_get_size(progress_pool, &size);
if (size) ABT_thread_yield();

/* Are there any other threads in this pool that *might* need to
* execute at some point in the future? If so, then it's not
* necessarily safe for Mercury to sleep here in progress. It
* doesn't matter whether they are runnable now or not, because an
* external event could resume them.
*
* NOTE: we use ABT_pool_get_total_size() rather than
* ABT_pool_get_size() in order to include suspended ULTs in our
* count. Note that this function *does* count the caller, so it
* will always be at least one, unlike ABT_pool_get_size().
*/
ABT_pool_get_total_size(progress_pool, &size);
if (spin_flag) {
/* We used a zero progress timeout (busy spinning) on the last
* iteration. See if spindown time has elapsed yet.
*/
if (((ABT_get_wtime() - spin_start_ts)*1000)
< (double)mid->hg_progress_spindown_msec) {
/* We are still in the spindown window; continue spinning
* regardless of current conditions.
*/
spin_flag = 1;
} else {
/* This spindown window has elapsed; clear flag and timestep
* so that we can make a new policy decision.
*/
spin_flag = 0;
spin_start_ts = 0;
}
}

/* Are there any RPCs in flight, regardless of what pool they were
* issued to? If so, then we also cannot block in Mercury, because
* they may issue self forward() calls that cannot complete until we
* get through this progress/trigger cycle
*/
ABT_mutex_lock(mid->pending_operations_mtx);
pending = mid->pending_operations;
ABT_mutex_unlock(mid->pending_operations_mtx);
if (mid->hg_progress_spindown_msec && !spin_flag) {
/* Determine if it is reasonably safe to briefly block on
* Mercury progress or if we should enter spin mode. We check
* two conditions: are there any RPCs currently being processed
* (i.e. pending_operations) or are there any other threads
* assicated with the current pool that might become runnable
* while this thread is blocked? If either condition is met,
* then we use a zero timeout to Mercury to avoid blocking this
* ULT for too long.
*
* Note that there is no easy way to determine if this ES is
* expected to also execute work in other pools, so we may
* still introduce hg_progress_timeout_ub of latency in that
* configuration scenario. Latency-sensitive use cases
* should avoid running the Margo progress function in pools
* that share execution streams with other pools.
*/
ABT_mutex_lock(mid->pending_operations_mtx);
pending = mid->pending_operations;
ABT_mutex_unlock(mid->pending_operations_mtx);

/*
* Note that we intentionally use get_total_size() rather
* than get_size() to make sure that we count suspended
* ULTs, not just currently runnable ULTs. The resulting
* count includes this ULT so we look for a count > 1
* instead of a count > 0.
*/
ABT_pool_get_total_size(MARGO_PROGRESS_POOL(mid), &size);

if (pending || size > 1) {
/* entering spin mode; record timestamp so that we can
* track how long we have been in this mode
*/
spin_flag = 1;
spin_start_ts = ABT_get_wtime();
} else {
/* Block on Mercury progress to release CPU */
spin_flag = 0;
spin_start_ts = 0;
}
}

if (pending || size > 1) {
if (spin_flag) {
hg_progress_timeout = 0;
} else {
hg_progress_timeout = mid->hg_progress_timeout_ub;
Expand Down
10 changes: 10 additions & 0 deletions src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ margo_instance_id margo_init_ext(const char* address,
goto error;
}

mid->hg_progress_spindown_msec
= json_object_object_get_int_or(config, "progress_spindown_msec", 10);
int progress_timeout_ub = json_object_object_get_int_or(
config, "progress_timeout_ub_msec", 100);
int handle_cache_size
Expand Down Expand Up @@ -412,6 +414,7 @@ static bool __margo_validate_json(struct json_object* _margo,
/* Fields:
- [required] mercury: object
- [optional] argobots: object
- [optional] progress_spindown_msec: integer >= 0 (default 10)
- [optional] progress_timeout_ub_msec: integer >= 0 (default 100)
- [optional] handle_cache_size: integer >= 0 (default 32)
- [optional] use_progress_thread: bool (default false)
Expand All @@ -434,6 +437,13 @@ static bool __margo_validate_json(struct json_object* _margo,
struct json_object* _argobots = json_object_object_get(_margo, "argobots");
if (!__margo_abt_validate_json(_argobots)) { return false; }

// check "progress_spindown_msec" field
ASSERT_CONFIG_HAS_OPTIONAL(_margo, "progress_spindown_msec", int, "margo");
if (CONFIG_HAS(_margo, "progress_spindown_msec", ignore)) {
CONFIG_INTEGER_MUST_BE_POSITIVE(_margo, "progress_spindown_msec",
"progress_spindown_msec");
}

// check "progress_timeout_ub_msec" field
ASSERT_CONFIG_HAS_OPTIONAL(_margo, "progress_timeout_ub_msec", int,
"margo");
Expand Down
12 changes: 6 additions & 6 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct margo_instance {
ABT_thread hg_progress_tid;
_Atomic int hg_progress_shutdown_flag;
_Atomic unsigned hg_progress_timeout_ub;
_Atomic unsigned hg_progress_spindown_msec;

uint16_t num_registered_rpcs; /* number of registered rpc's by all providers
on this instance */
Expand Down Expand Up @@ -130,8 +131,7 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind
{
typedef enum margo_request_kind {
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;
Expand Down Expand Up @@ -159,10 +159,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down
Loading

0 comments on commit ac07b7d

Please sign in to comment.