Skip to content

Commit

Permalink
feat(dao): add ability to select and delete expired entities (#13296)
Browse files Browse the repository at this point in the history
  • Loading branch information
raoxiaoyan authored Jul 3, 2024
1 parent 4eeae00 commit ef81e6f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 10 deletions.
21 changes: 15 additions & 6 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local workspaces = require "kong.workspaces"
local new_tab = require "table.new"
local DAO_MAX_TTL = require("kong.constants").DATABASE.DAO_MAX_TTL
local is_valid_uuid = require("kong.tools.uuid").is_valid_uuid
local deep_copy = require("kong.tools.table").deep_copy

local setmetatable = setmetatable
local tostring = tostring
Expand Down Expand Up @@ -290,6 +291,12 @@ local function validate_options_value(self, options)
end
end

if options.skip_ttl ~= nil then
if type(options.skip_ttl) ~= "boolean" then
errors.skip_ttl = "must be a boolean"
end
end

if next(errors) then
return nil, errors
end
Expand Down Expand Up @@ -896,8 +903,9 @@ local function generate_foreign_key_methods(schema)
return nil, err, err_t
end

local show_ws_id = { show_ws_id = true }
local entity, err, err_t = self["select_by_" .. name](self, unique_value, show_ws_id)
local select_options = deep_copy(options or {})
select_options["show_ws_id"] = true
local entity, err, err_t = self["select_by_" .. name](self, unique_value, select_options)
if err then
return nil, err, err_t
end
Expand All @@ -906,7 +914,7 @@ local function generate_foreign_key_methods(schema)
return true
end

local cascade_entries = find_cascade_delete_entities(self, entity, show_ws_id)
local cascade_entries = find_cascade_delete_entities(self, entity, select_options)

local ok, err_t = run_hook("dao:delete_by:pre",
entity,
Expand Down Expand Up @@ -1293,8 +1301,9 @@ function DAO:delete(pk_or_entity, options)
return nil, tostring(err_t), err_t
end

local show_ws_id = { show_ws_id = true }
local entity, err, err_t = self:select(primary_key, show_ws_id)
local select_options = deep_copy(options or {})
select_options["show_ws_id"] = true
local entity, err, err_t = self:select(primary_key, select_options)
if err then
return nil, err, err_t
end
Expand All @@ -1311,7 +1320,7 @@ function DAO:delete(pk_or_entity, options)
end
end

local cascade_entries = find_cascade_delete_entities(self, primary_key, show_ws_id)
local cascade_entries = find_cascade_delete_entities(self, primary_key, select_options)

local ws_id = entity.ws_id
local _
Expand Down
80 changes: 76 additions & 4 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ local function execute(strategy, statement_name, attributes, options)

local is_update = options and options.update
local has_ttl = strategy.schema.ttl

local skip_ttl = options and options.skip_ttl
if has_ws_id then
assert(ws_id == nil or type(ws_id) == "string")
argv[0] = escape_literal(connector, ws_id, "ws_id")
Expand All @@ -433,7 +433,7 @@ local function execute(strategy, statement_name, attributes, options)
for i = 1, argc do
local name = argn[i]
local value
if has_ttl and name == "ttl" then
if has_ttl and name == "ttl" and not skip_ttl then
value = (options and options.ttl)
and get_ttl_value(strategy, attributes, options)

Expand Down Expand Up @@ -576,7 +576,12 @@ end


function _mt:select(primary_key, options)
local res, err = execute(self, "select", self.collapse(primary_key), options)
local statement_name = "select"
if self.schema.ttl and options and options.skip_ttl then
statement_name = "select_skip_ttl"
end

local res, err = execute(self, statement_name, self.collapse(primary_key), options)
if res then
local row = res[1]
if row then
Expand All @@ -592,6 +597,11 @@ end

function _mt:select_by_field(field_name, unique_value, options)
local statement_name = "select_by_" .. field_name

if self.schema.ttl and options and options.skip_ttl then
statement_name = statement_name .. "_skip_ttl"
end

local filter = {
[field_name] = unique_value,
}
Expand Down Expand Up @@ -695,7 +705,11 @@ end


function _mt:delete(primary_key, options)
local res, err = execute(self, "delete", self.collapse(primary_key), options)
local statement_name = "delete"
if self.schema.ttl and options and options.skip_ttl then
statement_name = "delete_skip_ttl"
end
local res, err = execute(self, statement_name, self.collapse(primary_key), options)
if res then
if res.affected_rows == 0 then
return nil, nil
Expand All @@ -710,6 +724,9 @@ end

function _mt:delete_by_field(field_name, unique_value, options)
local statement_name = "delete_by_" .. field_name
if self.schema.ttl and options and options.skip_ttl then
statement_name = statement_name .. "_skip_ttl"
end
local filter = {
[field_name] = unique_value,
}
Expand Down Expand Up @@ -1189,6 +1206,19 @@ function _M.new(connector, schema, errors)
}
})

add_statement("delete_skip_ttl", {
operation = "write",
argn = primary_key_names,
argv = primary_key_args,
code = {
"DELETE\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", "(" .. pk_escaped .. ") = (" .. primary_key_placeholders .. ")",
ws_id_select_where), ";"
}
})

add_statement("select", {
operation = "read",
expr = select_expressions,
Expand All @@ -1205,6 +1235,21 @@ function _M.new(connector, schema, errors)
}
})

add_statement("select_skip_ttl", {
operation = "read",
expr = select_expressions,
argn = primary_key_names,
argv = primary_key_args,
code = {
"SELECT ", select_expressions, "\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", "(" .. pk_escaped .. ") = (" .. primary_key_placeholders .. ")",
ws_id_select_where),
" LIMIT 1;"
}
})

add_statement_for_export("page_first", {
operation = "read",
argn = { LIMIT },
Expand Down Expand Up @@ -1387,6 +1432,20 @@ function _M.new(connector, schema, errors)
},
})

add_statement("select_by_" .. field_name .. "_skip_ttl", {
operation = "read",
argn = single_names,
argv = single_args,
code = {
"SELECT ", select_expressions, "\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", unique_escaped .. " = $1",
ws_id_select_where),
" LIMIT 1;"
},
})

local update_by_args_names = {}
for _, update_name in ipairs(update_names) do
insert(update_by_args_names, update_name)
Expand Down Expand Up @@ -1442,6 +1501,19 @@ function _M.new(connector, schema, errors)
ws_id_select_where), ";"
}
})

add_statement("delete_by_" .. field_name .. "_skip_ttl", {
operation = "write",
argn = single_names,
argv = single_args,
code = {
"DELETE\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", unique_escaped .. " = $1",
ws_id_select_where), ";"
}
})
end
end

Expand Down
32 changes: 32 additions & 0 deletions spec/02-integration/03-db/14-dao_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ for _, strategy in helpers.all_strategies() do
"services",
"consumers",
"acls",
"keyauth_credentials",
})
_G.kong.db = db

Expand Down Expand Up @@ -98,6 +99,7 @@ for _, strategy in helpers.all_strategies() do
db.consumers:truncate()
db.plugins:truncate()
db.services:truncate()
db.keyauth_credentials:truncate()
end)

it("select_by_cache_key()", function()
Expand Down Expand Up @@ -185,6 +187,36 @@ for _, strategy in helpers.all_strategies() do
assert.same(new_plugin_config.config.redis.host, read_plugin.config.redis.host)
assert.same(new_plugin_config.config.redis.host, read_plugin.config.redis_host) -- legacy field is included
end)

it("keyauth_credentials can be deleted or selected before run ttl cleanup in background timer", function()
local key = uuid()
local original_keyauth_credentials = bp.keyauth_credentials:insert({
consumer = { id = consumer.id },
key = key,
}, { ttl = 5 })

-- wait for 5 seconds.
ngx.sleep(5)

-- select or delete keyauth_credentials after ttl expired.
local expired_keyauth_credentials
helpers.wait_until(function()
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key)
return not expired_keyauth_credentials
end, 1)
assert.is_nil(expired_keyauth_credentials)
kong.db.keyauth_credentials:delete_by_key(key)

-- select or delete keyauth_credentials with skip_ttl=true after ttl expired.
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key, { skip_ttl = true })
assert.not_nil(expired_keyauth_credentials)
assert.same(expired_keyauth_credentials.id, original_keyauth_credentials.id)
kong.db.keyauth_credentials:delete_by_key(key, { skip_ttl = true })

-- check again
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key, { skip_ttl = true })
assert.is_nil(expired_keyauth_credentials)
end)
end)
end

1 comment on commit ef81e6f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:ef81e6fb8df2523e83982f30a704c4671422d29c
Artifacts available https://github.com/Kong/kong/actions/runs/9774671285

Please sign in to comment.