Skip to content

Commit

Permalink
support use tls
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 committed Jun 19, 2021
1 parent a8c5295 commit e0cd67d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Table of Contents
* [Methods](#methods)
* [new](#new)
* [addRPCHook](#addRPCHook)
* [setUseTLS](#setUseTLS)
* [produce](#produce)
* [Installation](#installation)
* [See Also](#see-also)
Expand Down Expand Up @@ -51,6 +52,14 @@ Synopsis
local message = "halo world"

local p = producer.new(nameservers, "produce_group")

-- set acl
local aclHook = require("resty.rocketmq.acl_rpchook").new("RocketMQ","123456781")
p:addRPCHook(aclHook)

-- use tls mode
p:setUseTLS(true)

local res, err = p:produce("TopicTest", message)
if not res then
ngx.say("send err:", err)
Expand Down Expand Up @@ -106,6 +115,12 @@ there is an acl hook provided, usage is:
p:addRPCHook(aclHook)
```

#### setUseTLS

`syntax: p:setUseTLS(useTLS)`

`useTLS` is a boolean

#### produce
`syntax: res, err = p:produce(topic, message, tags, keys, waitStoreMsgOk)`

Expand Down
21 changes: 11 additions & 10 deletions lib/resty/rocketmq/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function _M.new(nameservers)
nameservers = nameservers_parsed,
current_nameserver = 1,
RPCHook = {},
useTLS = false,
}, _M)
end

Expand All @@ -40,6 +41,10 @@ function _M.addRPCHook(self, hook)
end
end

function _M.setUseTLS(self, useTLS)
self.useTLS = useTLS
end

function _M:chooseNameserver()
local nameserver = self.nameservers[self.current_nameserver]
self.current_nameserver = self.current_nameserver + 1
Expand All @@ -49,14 +54,10 @@ function _M:chooseNameserver()
return nameserver
end

function _M:requestNameserver(send)
local nameserver = self:chooseNameserver()
return core.doReqeust(nameserver.ip, nameserver.port, send)
end

function _M:getTopicRouteInfoFromNameserver(topic)
local send = core.encode(REQUEST_CODE.GET_ROUTEINFO_BY_TOPIC, { topic = topic })
return self:requestNameserver(send)
local nameserver = self:chooseNameserver()
local addr = nameserver.ip .. ':' .. nameserver.port
return core.request(REQUEST_CODE.GET_ROUTEINFO_BY_TOPIC, addr, { topic = topic }, nil, false, self.RPCHook, self.useTLS)
end

local function messageProperties2String(properties)
Expand All @@ -82,15 +83,15 @@ function _M:sendMessage(brokerAddr, msg)
k = msg.unitMode,
l = msg.maxReconsumeTimes,
m = msg.batch,
}, msg.body, false, self.RPCHook)
}, msg.body, false, self.RPCHook, self.useTLS)
end

function _M:endTransactionOneway(brokerAddr, msg)
return core.request(REQUEST_CODE.END_TRANSACTION, brokerAddr, msg, nil, true, self.RPCHook)
return core.request(REQUEST_CODE.END_TRANSACTION, brokerAddr, msg, nil, true, self.RPCHook, self.useTLS)
end

function _M:sendHeartbeat(brokerAddr, heartbeatData)
return core.request(REQUEST_CODE.HEART_BEAT, brokerAddr, {}, cjson_safe.encode(heartbeatData), false, self.RPCHook)
return core.request(REQUEST_CODE.HEART_BEAT, brokerAddr, {}, cjson_safe.encode(heartbeatData), false, self.RPCHook, self.useTLS)
end

return _M
19 changes: 15 additions & 4 deletions lib/resty/rocketmq/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,18 @@ local function encode(code, h, body, oneway)
end
_M.encode = encode

local function doReqeust(ip, port, send, oneway)
local function doReqeust(ip, port, send, oneway, useTLS)
local sock = ngx_socket_tcp()
local res, err = sock:connect(ip, port)
if not res then
return nil, nil, ('connect %s:%s fail:%s'):format(ip, port, err)
end
if useTLS then
local ok, err = sock:sslhandshake(nil, nil, false)
if not ok then
return ok, nil, "failed to do ssl handshake: " .. err
end
end
local ok, err = sock:send(send)
if not ok then
return nil, nil, err
Expand Down Expand Up @@ -265,7 +271,7 @@ local function doReqeust(ip, port, send, oneway)
end
_M.doReqeust = doReqeust

local function request(code, addr, header, body, oneway, RPCHook)
local function request(code, addr, header, body, oneway, RPCHook, useTLS)
if RPCHook then
for _, hook in ipairs(RPCHook) do
hook.doBeforeRequest(addr, header, body)
Expand All @@ -274,8 +280,13 @@ local function request(code, addr, header, body, oneway, RPCHook)
ngx.log(ngx.DEBUG, ('\27[33msend:%s\27[0m %s %s'):format(codeName(code), cjson_safe.encode(header), body))
local send = encode(code, header, body, oneway)
local ip, port = unpack(split(addr, ':'))
local respHeader, respBody, err = doReqeust(ip, port, send, oneway)
ngx.log(ngx.DEBUG, ('\27[34mrecv:%s\27[0m %s %s'):format(respCodeName(respHeader.code), respHeader.remark or '', respBody))
local respHeader, respBody, err = doReqeust(ip, port, send, oneway, useTLS)
if err then
return nil, nil, err
end
if not oneway then
ngx.log(ngx.DEBUG, ('\27[34mrecv:%s\27[0m %s %s'):format(respCodeName(respHeader.code), respHeader.remark or '', respBody))
end
if not oneway and RPCHook then
for _, hook in ipairs(RPCHook) do
hook.doAfterResponse(addr, header, body, respHeader, respBody)
Expand Down
4 changes: 4 additions & 0 deletions lib/resty/rocketmq/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ function _M.addRPCHook(self, hook)
self.client:addRPCHook(hook)
end

function _M.setUseTLS(self, useTLS)
self.client:setUseTLS(useTLS)
end

local function topicRouteData2TopicPublishInfo(topic, route)
local info = {
topicRouteData = route,
Expand Down

0 comments on commit e0cd67d

Please sign in to comment.