From ca35465b4c1896f64e327854add01c638b1dd850 Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Wed, 9 Aug 2023 15:10:46 +0200 Subject: [PATCH 1/4] fix(queue): add failing test for edge case max_batch_size=max_entries When defining a very special edge case configuration having max_batch_size=max_entries, the queue can fail with an assertion error when removing the frontmost element. This happens especially when the callback repeatedly fails (eg. an unavailable backend system receiving data). What happens: 1. we add max_batch_size elements, all of which "post" resources 2. the batch queue consumes all of those resources in `process_once` by `wait()`ing for them, but gets stuck processing/sending the batch 3. as `process_once` is stuck until `max_retry_time` passed, the function does not run `delete_frontmost_entry()` and thus actually moves the `front` reference 4. when enqueuing the next item, it tries to drop the oldest entry, but triggers the assertion in queue.lua as no resources are left --- spec/01-unit/27-queue_spec.lua | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 8c071686ea19..506c318863cc 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -405,6 +405,37 @@ describe("plugin queue", function() assert.match_re(log_messages, "INFO .*queue resumed processing") end) + it("queue does not fail for max batch size = max entries", function() + local function enqueue(entry) + Queue.enqueue( + queue_conf({ + name = "capacity-exceeded", + max_batch_size = 2, + max_entries = 2, + max_coalescing_delay = 0.1, + }), + function(_, batch) + ngx.sleep(1) + return false, "FAIL FAIL FAIL" + end, + nil, + entry + ) + end + + -- enqueue 2 entries, enough for first batch + for i = 1, 2 do + enqueue("initial batch: " .. tostring(i)) + end + -- wait for max_coalescing_delay such that the first batch is processed (and will be stuck in retry loop, as our handler always fails) + ngx.sleep(0.1) + -- fill in some more entries + for i = 1, 2 do + enqueue("fill up: " .. tostring(i)) + end + wait_until_queue_done("capacity-exceeded") + end) + it("drops entries when it reaches its max_bytes", function() local processed local function enqueue(entry) From 131e1816fb2738466f3f4b8a3db5386f8f035bac Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Wed, 9 Aug 2023 16:31:52 +0200 Subject: [PATCH 2/4] potential fix for race condition This commit might fix #11377 by removing currently processed elements out of the race condition window. Two tests needed changes: 1. "giving up sending after retrying" needed another (otherwise) ignored value, such that we can wait long enough in `wait_until_queue_done` (there might be a more elegant solution here) 2. the new test required reactivating the handler to succeed to finally clear the queue Why do I think this works? - immediately after the last call on `semaphore:wait()`, we'll start actually removing items from `entries` - the code cannot be interrupted by other light threads before we actually start the handler These assumptions strongly need verification by some lua experts! --- kong/tools/queue.lua | 21 +++++++++--------- spec/01-unit/27-queue_spec.lua | 39 +++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 96efe1d95aad..204ed0f43cf4 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -254,11 +254,21 @@ function Queue:process_once() end end + local batch = {unpack(self.entries, self.front, self.front + entry_count - 1)} + -- Guard against queue shrinkage during handler invocation by using math.min below. + for _ = 1, math.min(entry_count, self:count()) do + self:delete_frontmost_entry() + end + if self.queue_full then + self:log_info('queue resumed processing') + self.queue_full = false + end + local start_time = now() local retry_count = 0 while true do self:log_debug("passing %d entries to handler", entry_count) - ok, err = self.handler(self.handler_conf, {unpack(self.entries, self.front, self.front + entry_count - 1)}) + ok, err = self.handler(self.handler_conf, batch) if ok then self:log_debug("handler processed %d entries sucessfully", entry_count) break @@ -283,15 +293,6 @@ function Queue:process_once() sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) retry_count = retry_count + 1 end - - -- Guard against queue shrinkage during handler invocation by using math.min below. - for _ = 1, math.min(entry_count, self:count()) do - self:delete_frontmost_entry() - end - if self.queue_full then - self:log_info('queue resumed processing') - self.queue_full = false - end end diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 506c318863cc..c85954d87ca5 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -321,19 +321,24 @@ describe("plugin queue", function() end) it("gives up sending after retrying", function() - Queue.enqueue( - queue_conf({ - name = "retry-give-up", - max_batch_size = 1, - max_retry_time = 1, - max_coalescing_delay = 0.1, - }), - function() - return false, "FAIL FAIL FAIL" - end, - nil, - "Hello" - ) + local function enqueue(entry) + Queue.enqueue( + queue_conf({ + name = "retry-give-up", + max_batch_size = 1, + max_retry_time = 1, + max_coalescing_delay = 0.1, + }), + function() + return false, "FAIL FAIL FAIL" + end, + nil, + entry + ) + end + + enqueue("Hello") + enqueue("another value") wait_until_queue_done("retry-give-up") assert.match_re(log_messages, 'WARN .* handler could not process entries: FAIL FAIL FAIL') assert.match_re(log_messages, 'ERR .*1 queue entries were lost') @@ -406,6 +411,7 @@ describe("plugin queue", function() end) it("queue does not fail for max batch size = max entries", function() + local fail_process = true local function enqueue(entry) Queue.enqueue( queue_conf({ @@ -416,13 +422,15 @@ describe("plugin queue", function() }), function(_, batch) ngx.sleep(1) - return false, "FAIL FAIL FAIL" + if fail_process then + return false, "FAIL FAIL FAIL" + end + return true end, nil, entry ) end - -- enqueue 2 entries, enough for first batch for i = 1, 2 do enqueue("initial batch: " .. tostring(i)) @@ -433,6 +441,7 @@ describe("plugin queue", function() for i = 1, 2 do enqueue("fill up: " .. tostring(i)) end + fail_process = false wait_until_queue_done("capacity-exceeded") end) From 7432cea4188ed0cf91434c9130be3ea04e5f63c9 Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Mon, 21 Aug 2023 10:31:17 +0200 Subject: [PATCH 3/4] remove shrinking queue workaround --- kong/tools/queue.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 204ed0f43cf4..8f20327dde5c 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -255,8 +255,7 @@ function Queue:process_once() end local batch = {unpack(self.entries, self.front, self.front + entry_count - 1)} - -- Guard against queue shrinkage during handler invocation by using math.min below. - for _ = 1, math.min(entry_count, self:count()) do + for _ = 1, entry_count do self:delete_frontmost_entry() end if self.queue_full then From 2885cbd78fded8023a2d1c26623c2b90adcd18fd Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Mon, 21 Aug 2023 11:06:18 +0200 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25a9ff291305..5f6e25e26876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ [#11342](https://github.com/Kong/kong/pull/11342) - When the worker is in shutdown mode and more data is immediately available without waiting for `max_coalescing_delay`, queues are now cleared in batches. [#11376](https://github.com/Kong/kong/pull/11376) +- A race condition in the plugin queue could potentially crash the worker when `max_entries` was set to `max_batch_size`. + [#11378](https://github.com/Kong/kong/pull/11378) - **AWS-Lambda**: fix an issue that the AWS-Lambda plugin cannot extract a json encoded proxy integration response. [#11413](https://github.com/Kong/kong/pull/11413)