Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering): change sync data structure #13794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(row)

assert(schema:validate_primary_key(pk))

local deltas = {
{
type = name,
id = row.id,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
},
Expand Down
2 changes: 1 addition & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ local function do_sync()

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
if err then
return nil, err
end
Expand Down
8 changes: 4 additions & 4 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,21 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
for _, d in ipairs(deltas) do
buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(d.id),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.row)))
end
Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/024_370_to_380.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ return {
CREATE TABLE IF NOT EXISTS clustering_sync_delta (
"version" INT NOT NULL,
"type" TEXT NOT NULL,
"id" UUID NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"row" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("database migration", function()
assert.database_has_relation("clustering_sync_delta")
assert.table_has_column("clustering_sync_delta", "version", "integer")
assert.table_has_column("clustering_sync_delta", "type", "text")
assert.table_has_column("clustering_sync_delta", "id", "uuid")
assert.table_has_column("clustering_sync_delta", "pk", "json")
assert.table_has_column("clustering_sync_delta", "ws_id", "uuid")
assert.table_has_column("clustering_sync_delta", "row", "json")
end)
Expand Down
Loading