Skip to content

Commit

Permalink
feat(clustering/sync): validate deltas when syncing (#14127)
Browse files Browse the repository at this point in the history
### Summary
1. validate entity schema
   * call `kong.db[entity].validate(entity)` over all the entities from deltas
2. validate entity's references (foreign references)
   * re-implemented logic: traverse delta -> try to find delta's foreign entity in config & LMDB
   * TODO: decouple this logic completely from declarative config logic: https://konghq.atlassian.net/browse/KAG-6231
3. TODO: report error if deleting delta operation could not find its associated entity in LMDB and deltas: https://konghq.atlassian.net/browse/KAG-6238

KAG-5897
  • Loading branch information
chobits authored Jan 22, 2025
1 parent 1d0f212 commit c85bc75
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 11 deletions.
4 changes: 4 additions & 0 deletions bin/busted
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ if not os.getenv("KONG_BUSTED_RESPAWNED") then
-- create shared dict
resty_flags = resty_flags .. require("spec.fixtures.shared_dict")

-- create lmdb environment
local lmdb_env = os.tmpname()
resty_flags = resty_flags .. string.format(' --main-conf "lmdb_environment_path %s;" ', lmdb_env)

if resty_flags then
table.insert(cmd, cmd_prefix_count+1, resty_flags)
end
Expand Down
1 change: 1 addition & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ build = {
["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua",
["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua",
["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua",
["kong.clustering.services.sync.validate"] = "kong/clustering/services/sync/validate.lua",
["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
Expand Down
9 changes: 8 additions & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local events = require("kong.runloop.events")
local EMPTY = require("kong.tools.table").EMPTY


local validate_deltas = require("kong.clustering.services.sync.validate").validate_deltas
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
Expand Down Expand Up @@ -267,6 +268,7 @@ local function do_sync()
return nil, "default namespace does not exist inside params"
end

local wipe = ns_delta.wipe
local deltas = ns_delta.deltas

if not deltas then
Expand All @@ -292,9 +294,14 @@ local function do_sync()
end
assert(type(kong.default_workspace) == "string")

-- validate deltas
local ok, err = validate_deltas(deltas, wipe)
if not ok then
return nil, err
end

local t = txn.begin(512)

local wipe = ns_delta.wipe
if wipe then
ngx_log(ngx_INFO, "[kong.sync.v2] full sync begins")

Expand Down
69 changes: 69 additions & 0 deletions kong/clustering/services/sync/validate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
local declarative = require("kong.db.declarative")
local declarative_config = require("kong.db.schema.others.declarative_config")


local null = ngx.null
local pk_string = declarative_config.pk_string
local validate_references_sync = declarative_config.validate_references_sync
local pretty_print_error = declarative.pretty_print_error


local function validate_deltas(deltas, is_full_sync)

local errs = {}

-- generate deltas table mapping primary key string to entity item
local deltas_map = {}

local db = kong.db

for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_entity = delta.entity

if delta_entity ~= nil and delta_entity ~= null then
-- table: primary key string -> entity
local schema = db[delta_type].schema
local pk = schema:extract_pk_values(delta_entity)
local pks = pk_string(schema, pk)

deltas_map[pks] = delta_entity

-- validate entity
local dao = kong.db[delta_type]
if dao then
-- CP will insert ws_id into the entity, which will be validated as an
-- unknown field.
-- TODO: On the CP side, remove ws_id from the entity and set it only
-- in the delta.
local ws_id = delta_entity.ws_id
delta_entity.ws_id = nil -- clear ws_id

local ok, err_t = dao.schema:validate(delta_entity)

delta_entity.ws_id = ws_id -- restore ws_id

if not ok then
errs[#errs + 1] = { [delta_type] = err_t }
end
end
end
end

if next(errs) then
return nil, pretty_print_error(errs, "deltas")
end

-- validate references
local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync)
if not ok then
return nil, pretty_print_error(err_t)
end

return true
end


return {
validate_deltas = validate_deltas,
}
3 changes: 3 additions & 0 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -263,5 +263,8 @@ _M.delete_entity_for_txn = declarative_import.delete_entity_for_txn
_M.workspace_id = declarative_import.workspace_id
_M.GLOBAL_WORKSPACE_TAG = declarative_import.GLOBAL_WORKSPACE_TAG

-- helpful function
_M.pretty_print_error = pretty_print_error


return _M
72 changes: 69 additions & 3 deletions kong/db/schema/others/declarative_config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local null = ngx.null
local type = type
local next = next
local pairs = pairs
local fmt = string.format
local yield = require("kong.tools.yield").yield
local ipairs = ipairs
local insert = table.insert
Expand Down Expand Up @@ -331,7 +332,7 @@ end


local function ws_id_for(item)
if item.ws_id == nil or item.ws_id == ngx.null then
if item.ws_id == nil or item.ws_id == null then
return "*"
end
return item.ws_id
Expand Down Expand Up @@ -413,7 +414,7 @@ local function populate_references(input, known_entities, by_id, by_key, expecte
local key = use_key and item[endpoint_key]

local failed = false
if key and key ~= ngx.null then
if key and key ~= null then
local ok = add_to_by_key(by_key, entity_schema, item, entity, key)
if not ok then
add_error(errs, parent_entity, parent_idx, entity, i,
Expand Down Expand Up @@ -506,6 +507,71 @@ local function validate_references(self, input)
end


-- TODO: Completely implement validate_references_sync without associating it
-- to declarative config. Currently, we will use the dc-generated
-- foreign_references table to accelerate iterating over entity foreign keys.
function DeclarativeConfig.validate_references_sync(deltas, deltas_map, is_full_sync)
local errs = {}

for _, delta in ipairs(deltas) do
local item_type = delta.type
local item = delta.entity
local ws_id = delta.ws_id or kong.default_workspace

local foreign_refs = foreign_references[item_type]

if not item or item == null or not foreign_refs then
goto continue
end

for k, v in pairs(item) do

-- Try to check if item's some foreign key exists in the deltas or LMDB.
-- For example, `item[k]` could be `<router_entity>["service"]`, we need
-- to find the referenced foreign service entity for this router entity.

local foreign_entity = foreign_refs[k]

if foreign_entity and v ~= null then -- k is foreign key

local dao = kong.db[foreign_entity]

-- try to find it in deltas
local pks = DeclarativeConfig.pk_string(dao.schema, v)
local fvalue = deltas_map[pks]

-- try to find it in DB (LMDB)
if not fvalue and not is_full_sync then
fvalue = dao:select(v, { workspace = ws_id })
end

-- record an error if not finding its foreign reference
if not fvalue then
errs[item_type] = errs[item_type] or {}
errs[item_type][foreign_entity] = errs[item_type][foreign_entity] or {}

local msg = fmt("could not find %s's foreign refrences %s (%s)",
item_type, foreign_entity,
type(v) == "string" and v or cjson_encode(v))

insert(errs[item_type][foreign_entity], msg)
end
end -- if foreign_entity and v ~= null

end -- for k, v in pairs(item)

::continue::
end -- for _, delta in ipairs(deltas)


if next(errs) then
return nil, errs
end

return true
end


-- This is a best-effort generation of a cache-key-like identifier
-- to feed the hash when generating deterministic UUIDs.
-- We do not use the actual `cache_key` function from the DAO because
Expand Down Expand Up @@ -860,7 +926,7 @@ local function flatten(self, input)

if field.unique then
local flat_value = flat_entry[name]
if flat_value and flat_value ~= ngx.null then
if flat_value and flat_value ~= null then
local unique_key = get_unique_key(schema, entry, field, flat_value)
uniques[name] = uniques[name] or {}
if uniques[name][unique_key] then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local write_node_id = [[
local function get_http_node_id()
local client = helpers.proxy_client(nil, 9002)
finally(function() client:close() end)

helpers.wait_until(function()
local res = client:get("/request", {
headers = { host = "http.node-id.test" },
Expand Down Expand Up @@ -87,7 +88,7 @@ for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do
local rpc, rpc_sync = v[1], v[2]

for _, strategy in helpers.each_strategy() do
describe("node id persistence " .. " rpc_sync=" .. rpc_sync, function()
describe("node id persistence rpc_sync = " .. rpc_sync, function()

local control_plane_config = {
role = "control_plane",
Expand Down
Loading

1 comment on commit c85bc75

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong-dev:c85bc752073ee4a39d22b2c724a1ede41a6ce3c5
Artifacts available https://github.com/Kong/kong/actions/runs/12901689878

Please sign in to comment.