Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add failing test for edge case max_batch_size=max_entries #11378

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
[#11342](https://github.com/Kong/kong/pull/11342)
- When the worker is in shutdown mode and more data is immediately available without waiting for `max_coalescing_delay`, queues are now cleared in batches.
[#11376](https://github.com/Kong/kong/pull/11376)
- A race condition in the plugin queue could potentially crash the worker when `max_entries` was set to `max_batch_size`.
[#11378](https://github.com/Kong/kong/pull/11378)
- **AWS-Lambda**: fix an issue that the AWS-Lambda plugin cannot extract a json encoded proxy integration response.
[#11413](https://github.com/Kong/kong/pull/11413)

Expand Down
20 changes: 10 additions & 10 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,20 @@ function Queue:process_once()
end
end

local batch = {unpack(self.entries, self.front, self.front + entry_count - 1)}
for _ = 1, entry_count do
self:delete_frontmost_entry()
end
if self.queue_full then
self:log_info('queue resumed processing')
self.queue_full = false
end

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
ok, err = self.handler(self.handler_conf, {unpack(self.entries, self.front, self.front + entry_count - 1)})
ok, err = self.handler(self.handler_conf, batch)
if ok then
self:log_debug("handler processed %d entries sucessfully", entry_count)
break
Expand All @@ -283,15 +292,6 @@ function Queue:process_once()
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end

-- Guard against queue shrinkage during handler invocation by using math.min below.
for _ = 1, math.min(entry_count, self:count()) do
self:delete_frontmost_entry()
end
if self.queue_full then
self:log_info('queue resumed processing')
self.queue_full = false
end
end


Expand Down
66 changes: 53 additions & 13 deletions spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,24 @@ describe("plugin queue", function()
end)

it("gives up sending after retrying", function()
Queue.enqueue(
queue_conf({
name = "retry-give-up",
max_batch_size = 1,
max_retry_time = 1,
max_coalescing_delay = 0.1,
}),
function()
return false, "FAIL FAIL FAIL"
end,
nil,
"Hello"
)
local function enqueue(entry)
Queue.enqueue(
queue_conf({
name = "retry-give-up",
max_batch_size = 1,
max_retry_time = 1,
max_coalescing_delay = 0.1,
}),
function()
return false, "FAIL FAIL FAIL"
end,
nil,
entry
)
end

enqueue("Hello")
enqueue("another value")
wait_until_queue_done("retry-give-up")
assert.match_re(log_messages, 'WARN .* handler could not process entries: FAIL FAIL FAIL')
assert.match_re(log_messages, 'ERR .*1 queue entries were lost')
Expand Down Expand Up @@ -405,6 +410,41 @@ describe("plugin queue", function()
assert.match_re(log_messages, "INFO .*queue resumed processing")
end)

it("queue does not fail for max batch size = max entries", function()
local fail_process = true
local function enqueue(entry)
Queue.enqueue(
queue_conf({
name = "capacity-exceeded",
max_batch_size = 2,
max_entries = 2,
max_coalescing_delay = 0.1,
}),
function(_, batch)
ngx.sleep(1)
if fail_process then
return false, "FAIL FAIL FAIL"
end
return true
end,
nil,
entry
)
end
-- enqueue 2 entries, enough for first batch
for i = 1, 2 do
enqueue("initial batch: " .. tostring(i))
end
-- wait for max_coalescing_delay such that the first batch is processed (and will be stuck in retry loop, as our handler always fails)
ngx.sleep(0.1)
-- fill in some more entries
for i = 1, 2 do
enqueue("fill up: " .. tostring(i))
end
fail_process = false
wait_until_queue_done("capacity-exceeded")
end)

it("drops entries when it reaches its max_bytes", function()
local processed
local function enqueue(entry)
Expand Down
Loading