From e0cd67d957bb4acf4347840eeb377324f0be6912 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Sat, 19 Jun 2021 10:56:25 +0800 Subject: [PATCH] support use tls --- README.md | 15 +++++++++++++++ lib/resty/rocketmq/client.lua | 21 +++++++++++---------- lib/resty/rocketmq/core.lua | 19 +++++++++++++++---- lib/resty/rocketmq/producer.lua | 4 ++++ 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 606fca7..3c76e53 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Table of Contents * [Methods](#methods) * [new](#new) * [addRPCHook](#addRPCHook) + * [setUseTLS](#setUseTLS) * [produce](#produce) * [Installation](#installation) * [See Also](#see-also) @@ -51,6 +52,14 @@ Synopsis local message = "halo world" local p = producer.new(nameservers, "produce_group") + + -- set acl + local aclHook = require("resty.rocketmq.acl_rpchook").new("RocketMQ","123456781") + p:addRPCHook(aclHook) + + -- use tls mode + p:setUseTLS(true) + local res, err = p:produce("TopicTest", message) if not res then ngx.say("send err:", err) @@ -106,6 +115,12 @@ there is an acl hook provided, usage is: p:addRPCHook(aclHook) ``` +#### setUseTLS + +`syntax: p:setUseTLS(useTLS)` + +`useTLS` is a boolean + #### produce `syntax: res, err = p:produce(topic, message, tags, keys, waitStoreMsgOk)` diff --git a/lib/resty/rocketmq/client.lua b/lib/resty/rocketmq/client.lua index f2e5681..049c3cb 100644 --- a/lib/resty/rocketmq/client.lua +++ b/lib/resty/rocketmq/client.lua @@ -29,6 +29,7 @@ function _M.new(nameservers) nameservers = nameservers_parsed, current_nameserver = 1, RPCHook = {}, + useTLS = false, }, _M) end @@ -40,6 +41,10 @@ function _M.addRPCHook(self, hook) end end +function _M.setUseTLS(self, useTLS) + self.useTLS = useTLS +end + function _M:chooseNameserver() local nameserver = self.nameservers[self.current_nameserver] self.current_nameserver = self.current_nameserver + 1 @@ -49,14 +54,10 @@ function _M:chooseNameserver() return nameserver end -function _M:requestNameserver(send) - local nameserver = self:chooseNameserver() - return core.doReqeust(nameserver.ip, nameserver.port, send) -end - function _M:getTopicRouteInfoFromNameserver(topic) - local send = core.encode(REQUEST_CODE.GET_ROUTEINFO_BY_TOPIC, { topic = topic }) - return self:requestNameserver(send) + local nameserver = self:chooseNameserver() + local addr = nameserver.ip .. ':' .. nameserver.port + return core.request(REQUEST_CODE.GET_ROUTEINFO_BY_TOPIC, addr, { topic = topic }, nil, false, self.RPCHook, self.useTLS) end local function messageProperties2String(properties) @@ -82,15 +83,15 @@ function _M:sendMessage(brokerAddr, msg) k = msg.unitMode, l = msg.maxReconsumeTimes, m = msg.batch, - }, msg.body, false, self.RPCHook) + }, msg.body, false, self.RPCHook, self.useTLS) end function _M:endTransactionOneway(brokerAddr, msg) - return core.request(REQUEST_CODE.END_TRANSACTION, brokerAddr, msg, nil, true, self.RPCHook) + return core.request(REQUEST_CODE.END_TRANSACTION, brokerAddr, msg, nil, true, self.RPCHook, self.useTLS) end function _M:sendHeartbeat(brokerAddr, heartbeatData) - return core.request(REQUEST_CODE.HEART_BEAT, brokerAddr, {}, cjson_safe.encode(heartbeatData), false, self.RPCHook) + return core.request(REQUEST_CODE.HEART_BEAT, brokerAddr, {}, cjson_safe.encode(heartbeatData), false, self.RPCHook, self.useTLS) end return _M diff --git a/lib/resty/rocketmq/core.lua b/lib/resty/rocketmq/core.lua index 945caf0..03dbc0f 100644 --- a/lib/resty/rocketmq/core.lua +++ b/lib/resty/rocketmq/core.lua @@ -230,12 +230,18 @@ local function encode(code, h, body, oneway) end _M.encode = encode -local function doReqeust(ip, port, send, oneway) +local function doReqeust(ip, port, send, oneway, useTLS) local sock = ngx_socket_tcp() local res, err = sock:connect(ip, port) if not res then return nil, nil, ('connect %s:%s fail:%s'):format(ip, port, err) end + if useTLS then + local ok, err = sock:sslhandshake(nil, nil, false) + if not ok then + return ok, nil, "failed to do ssl handshake: " .. err + end + end local ok, err = sock:send(send) if not ok then return nil, nil, err @@ -265,7 +271,7 @@ local function doReqeust(ip, port, send, oneway) end _M.doReqeust = doReqeust -local function request(code, addr, header, body, oneway, RPCHook) +local function request(code, addr, header, body, oneway, RPCHook, useTLS) if RPCHook then for _, hook in ipairs(RPCHook) do hook.doBeforeRequest(addr, header, body) @@ -274,8 +280,13 @@ local function request(code, addr, header, body, oneway, RPCHook) ngx.log(ngx.DEBUG, ('\27[33msend:%s\27[0m %s %s'):format(codeName(code), cjson_safe.encode(header), body)) local send = encode(code, header, body, oneway) local ip, port = unpack(split(addr, ':')) - local respHeader, respBody, err = doReqeust(ip, port, send, oneway) - ngx.log(ngx.DEBUG, ('\27[34mrecv:%s\27[0m %s %s'):format(respCodeName(respHeader.code), respHeader.remark or '', respBody)) + local respHeader, respBody, err = doReqeust(ip, port, send, oneway, useTLS) + if err then + return nil, nil, err + end + if not oneway then + ngx.log(ngx.DEBUG, ('\27[34mrecv:%s\27[0m %s %s'):format(respCodeName(respHeader.code), respHeader.remark or '', respBody)) + end if not oneway and RPCHook then for _, hook in ipairs(RPCHook) do hook.doAfterResponse(addr, header, body, respHeader, respBody) diff --git a/lib/resty/rocketmq/producer.lua b/lib/resty/rocketmq/producer.lua index c27aeb8..f54aa72 100644 --- a/lib/resty/rocketmq/producer.lua +++ b/lib/resty/rocketmq/producer.lua @@ -28,6 +28,10 @@ function _M.addRPCHook(self, hook) self.client:addRPCHook(hook) end +function _M.setUseTLS(self, useTLS) + self.client:setUseTLS(useTLS) +end + local function topicRouteData2TopicPublishInfo(topic, route) local info = { topicRouteData = route,