diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 34d1b7b1aa0..676864e3c38 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -195,6 +195,17 @@ function _M:process_rpc_msg(payload, collection) end else + -- may be some error message for peer + if not payload_id then + if payload.error then + ngx_log(ngx.ERR, "[rpc] RPC failed, code: ", + payload.error.code, ", err: ", + payload.error.message) + end + + return true + end + -- response, don't care about `collection` local interest_cb = self.interest[payload_id] self.interest[payload_id] = nil -- edge trigger only once @@ -271,6 +282,16 @@ function _M:start() local payload = decompress_payload(data) + if not payload then + local res, err = self:push_response( + new_error(nil, jsonrpc.PARSE_ERROR)) + if not res then + return nil, err + end + + goto continue + end + -- single rpc call if not isarray(payload) then local ok, err = self:process_rpc_msg(payload) diff --git a/spec/02-integration/18-hybrid_rpc/02-error_spec.lua b/spec/02-integration/18-hybrid_rpc/02-error_spec.lua new file mode 100644 index 00000000000..1b7a07ae6f4 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/02-error_spec.lua @@ -0,0 +1,68 @@ +local helpers = require "spec.helpers" + + +-- register a test rpc service in custom plugin rpc-error-test +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "bundled,rpc-error-test", + nginx_worker_processes = 4, -- multiple workers + cluster_rpc = "on", -- enable rpc + cluster_rpc_sync = "off", -- disable rpc sync + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "bundled,rpc-error-test", + nginx_worker_processes = 4, -- multiple workers + cluster_rpc = "on", -- enable rpc + cluster_rpc_sync = "off", -- disable rpc sync + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("rpc errors", function() + it("in custom plugin", function() + local name = "servroot2/logs/error.log" + + -- dp logs + assert.logfile(name).has.line( + "test #1 ok", true, 10) + + -- dp logs + assert.logfile(name).has.line( + "[rpc] RPC failed, code: -32600, err: empty batch array", true, 10) + assert.logfile(name).has.line( + "[rpc] RPC failed, code: -32600, err: not a valid object", true, 10) + assert.logfile(name).has.line( + "test #2 ok", true, 10) + + assert.logfile(name).has.no.line( + "assertion failed", true, 0) + end) + end) + end) +end -- for _, strategy diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/handler.lua new file mode 100644 index 00000000000..e044784c4de --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/handler.lua @@ -0,0 +1,59 @@ +local cjson = require("cjson") + + +local RpcErrorTestHandler = { + VERSION = "1.0", + PRIORITY = 1000, +} + + +function RpcErrorTestHandler:init_worker() + kong.rpc.callbacks:register("kong.test.exception", function(node_id) + return nil -- no error message, default jsonrpc.SERVER_ERROR + end) + + kong.rpc.callbacks:register("kong.test.error", function(node_id) + return nil, "kong.test.error" + end) + + local worker_events = assert(kong.worker_events) + local node_id = "control_plane" + + -- if rpc is ready we will start to call + worker_events.register(function(capabilities_list) + local res, err = kong.rpc:call(node_id, "kong.test.not_exist") + assert(not res) + assert(err == "Method not found") + + local res, err = kong.rpc:call(node_id, "kong.test.exception") + assert(not res) + assert(err == "Server error") + + local res, err = kong.rpc:call(node_id, "kong.test.error") + assert(not res) + assert(err == "kong.test.error") + + ngx.log(ngx.DEBUG, "test #1 ok") + + end, "clustering:jsonrpc", "connected") + + -- if rpc is ready we will start to send raw msg + worker_events.register(function(capabilities_list) + local s = next(kong.rpc.clients[node_id]) + + -- send an empty array + local msg = setmetatable({}, cjson.array_mt) + assert(s:push_request(msg)) + + -- send an invalid msg + local msg = setmetatable({"invalid_request"}, cjson.array_mt) + assert(s:push_request(msg)) + + ngx.log(ngx.DEBUG, "test #2 ok") + + end, "clustering:jsonrpc", "connected") + +end + + +return RpcErrorTestHandler diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/schema.lua new file mode 100644 index 00000000000..ba3266fda13 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-error-test/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-error-test", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +}