Skip to content

Commit

Permalink
Flush free_list if it's stale (#814)
Browse files Browse the repository at this point in the history
Add an additional flag per CPU EBPF_EPOCH_PER_CPU_STALE.
This flag is accessed by ebpf_epoch_exit and by the flush timer.

This flag is set to on by the flush timer if the free_list is not empty.
This flag is set to off by ebpf_epoch_exit (if set).

If the flush timer sees this flag on and the free_list is not empty, then it will schedule an _ebpf_epoch_stale_worker DPC on the CPU (this DPC calls ebpf_epoch_enter/ebpf_epoch_exit).

Resolves: #813

Signed-off-by: Alan Jowett alanjo@microsoft.com
  • Loading branch information
Alan-Jowett authored Mar 17, 2022
1 parent fc2f504 commit 2709dac
Showing 1 changed file with 72 additions and 9 deletions.
81 changes: 72 additions & 9 deletions libs/platform/ebpf_epoch.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
// _ebpf_flush_timer:
// Calls ebpf_flush and clears the _ebpf_flush_timer_set flag.
//
// _ebpf_epoch_stale_worker:
// Called if a CPU has items to be freed, but hasn't run anything in EBPF_EPOCH_FLUSH_DELAY_IN_MICROSECONDS.

// Delay after the _ebpf_flush_timer is set before it runs.
#define EBPF_EPOCH_FLUSH_DELAY_IN_MICROSECONDS 1000
Expand All @@ -52,11 +54,8 @@ typedef struct _ebpf_epoch_cpu_entry
_Requires_lock_held_(lock) ebpf_epoch_state_t cpu_epoch_state;
_Requires_lock_held_(lock) ebpf_list_entry_t free_list;
_Requires_lock_held_(lock) ebpf_hash_table_t* thread_table;
struct
{
int timer_armed : 1;
} flags;
uintptr_t padding;
int32_t flags;
ebpf_non_preemptible_work_item_t* stale_worker;
} ebpf_epoch_cpu_entry_t;

C_ASSERT(sizeof(ebpf_epoch_cpu_entry_t) % EBPF_CACHE_LINE_SIZE == 0);
Expand Down Expand Up @@ -122,9 +121,48 @@ _ebpf_epoch_get_release_epoch(_Out_ int64_t* released_epoch);
static void
_ebpf_flush_worker(_In_ void* context);

static void
_ebpf_epoch_stale_worker(_In_ void* work_item_context, _In_ void* parameter_1)
{
UNREFERENCED_PARAMETER(work_item_context);
UNREFERENCED_PARAMETER(parameter_1);
ebpf_epoch_enter();
ebpf_epoch_exit();
}

ebpf_result_t
_ebpf_epoch_update_thread_state(uint32_t cpu_id, uintptr_t thread_id, int64_t current_epoch, bool enter);

typedef enum _ebpf_epoch_per_cpu_flags
{
EBPF_EPOCH_PER_CPU_TIMER_ARMED = (1 << 0),
EBPF_EPOCH_PER_CPU_STALE = (1 << 1),
} ebpf_epoch_per_cpu_flags_t;

static bool
_ebpf_get_per_cpu_flag(_In_ const ebpf_epoch_cpu_entry_t* cpu_entry, ebpf_epoch_per_cpu_flags_t flag)
{
return (cpu_entry->flags & flag) != 0;
}

static void
_ebpf_set_per_cpu_flag(_In_ ebpf_epoch_cpu_entry_t* cpu_entry, ebpf_epoch_per_cpu_flags_t flag, bool state)
{
for (;;) {
int32_t old_flag_value = cpu_entry->flags;
int32_t new_flag_value = old_flag_value;
if (state) {
new_flag_value |= flag;
} else {
new_flag_value &= flag;
}
if (ebpf_interlocked_compare_exchange_int32(&cpu_entry->flags, new_flag_value, old_flag_value) ==
old_flag_value) {
break;
}
}
}

ebpf_result_t
ebpf_epoch_initiate()
{
Expand Down Expand Up @@ -196,6 +234,7 @@ ebpf_epoch_terminate()
ebpf_lock_destroy(&_ebpf_epoch_cpu_table[cpu_id].lock);
ebpf_hash_table_destroy(_ebpf_epoch_cpu_table[cpu_id].thread_table);
_ebpf_epoch_cpu_table[cpu_id].thread_table = NULL;
ebpf_free_non_preemptible_work_item(_ebpf_epoch_cpu_table[cpu_id].stale_worker);
}
_ebpf_epoch_cpu_count = 0;

Expand Down Expand Up @@ -231,6 +270,10 @@ ebpf_epoch_exit()
return;
}

if (_ebpf_get_per_cpu_flag(&_ebpf_epoch_cpu_table[current_cpu], EBPF_EPOCH_PER_CPU_STALE)) {
_ebpf_set_per_cpu_flag(&_ebpf_epoch_cpu_table[current_cpu], EBPF_EPOCH_PER_CPU_STALE, false);
}

if (ebpf_is_preemptible()) {
_ebpf_epoch_update_thread_state(current_cpu, ebpf_get_current_thread_id(), _ebpf_current_epoch, false);
} else {
Expand Down Expand Up @@ -386,10 +429,11 @@ _ebpf_epoch_release_free_list(_In_ ebpf_epoch_cpu_entry_t* cpu_entry, int64_t re
}
}
// If there are still items in the free list, schedule a timer to reap them in the future.
if (!ebpf_list_is_empty(&cpu_entry->free_list) && !cpu_entry->flags.timer_armed) {
if (!ebpf_list_is_empty(&cpu_entry->free_list) &&
!_ebpf_get_per_cpu_flag(cpu_entry, EBPF_EPOCH_PER_CPU_TIMER_ARMED)) {
// We will arm the timer once per CPU that sees entries it can't release.
// That's acceptable as arming the timer is idempotent.
cpu_entry->flags.timer_armed = true;
_ebpf_set_per_cpu_flag(cpu_entry, EBPF_EPOCH_PER_CPU_TIMER_ARMED, true);
ebpf_schedule_timer_work_item(_ebpf_flush_timer, EBPF_EPOCH_FLUSH_DELAY_IN_MICROSECONDS);
}

Expand Down Expand Up @@ -431,6 +475,7 @@ _ebpf_epoch_get_release_epoch(_Out_ int64_t* release_epoch)
uint32_t cpu_id;
ebpf_lock_state_t lock_state;
ebpf_result_t return_value;
bool stale_items = false;
EBPF_LOG_MESSAGE_UINT64(
EBPF_TRACELOG_LEVEL_VERBOSE,
EBPF_TRACELOG_KEYWORD_EPOCH,
Expand All @@ -445,7 +490,23 @@ _ebpf_epoch_get_release_epoch(_Out_ int64_t* release_epoch)
lock_state = ebpf_lock_lock(&_ebpf_epoch_cpu_table[cpu_id].lock);

// Clear the flush timer flag.
_ebpf_epoch_cpu_table[cpu_id].flags.timer_armed = false;
_ebpf_set_per_cpu_flag(&_ebpf_epoch_cpu_table[cpu_id], EBPF_EPOCH_PER_CPU_TIMER_ARMED, false);

if (!ebpf_list_is_empty(&_ebpf_epoch_cpu_table[cpu_id].free_list)) {
if (_ebpf_get_per_cpu_flag(&_ebpf_epoch_cpu_table[cpu_id], EBPF_EPOCH_PER_CPU_STALE)) {
// Schedule DPC
if (!_ebpf_epoch_cpu_table[cpu_id].stale_worker) {
ebpf_allocate_non_preemptible_work_item(
&_ebpf_epoch_cpu_table[cpu_id].stale_worker, cpu_id, _ebpf_epoch_stale_worker, NULL);
}
if (_ebpf_epoch_cpu_table[cpu_id].stale_worker) {
ebpf_queue_non_preemptible_work_item(_ebpf_epoch_cpu_table[cpu_id].stale_worker, NULL);
}
} else {
_ebpf_set_per_cpu_flag(&_ebpf_epoch_cpu_table[cpu_id], EBPF_EPOCH_PER_CPU_STALE, true);
}
stale_items = true;
}

if (_ebpf_epoch_cpu_table[cpu_id].cpu_epoch_state.active) {
lowest_epoch = min(lowest_epoch, _ebpf_epoch_cpu_table[cpu_id].cpu_epoch_state.epoch);
Expand Down Expand Up @@ -478,7 +539,9 @@ _ebpf_epoch_get_release_epoch(_Out_ int64_t* release_epoch)
return_value = EBPF_SUCCESS;

Exit:

if (stale_items) {
ebpf_schedule_timer_work_item(_ebpf_flush_timer, EBPF_EPOCH_FLUSH_DELAY_IN_MICROSECONDS);
}
*release_epoch = lowest_epoch - 1;
return return_value;
}
Expand Down

0 comments on commit 2709dac

Please sign in to comment.