Skip to content

Commit

Permalink
change api
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 committed Nov 7, 2021
1 parent e5c00c8 commit 7562f8e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 27 deletions.
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ Table of Contents
* [setUseTLS](#setUseTLS)
* [registerSendMessageHook](#registerSendMessageHook)
* [registerEndTransactionHook](#registerEndTransactionHook)
* [send](#send)
* [sendMessageInTransaction](#sendMessageInTransaction)
* [start](#start)
* [produce](#produce)
* [stop](#stop)
* [resty.rocketmq.admin](#restyrocketmqadmin)
* [Methods](#methods)
Expand Down Expand Up @@ -78,7 +79,7 @@ Synopsis
-- use tls mode
p:setUseTLS(true)

local res, err = p:produce("TopicTest", message)
local res, err = p:send("TopicTest", message)
if not res then
ngx.say("send err:", err)
return
Expand Down Expand Up @@ -176,18 +177,29 @@ there is an acl hook provided, usage is:
- transactionState
- fromTransactionCheck

#### start
`syntax: p:start()`

note that if you don't call p:start() before sending messages, messages will be sent successfully, but the trace is not send.


#### produce
`syntax: res, err = p:produce(topic, message, tags, keys, waitStoreMsgOk)`
#### send
`syntax: res, err = p:send(topic, message, tags, keys, waitStoreMsgOk)`

In case of success, returns the a table of results.
In case of errors, returns `nil` with a string describing the error.

#### setTransactionListener
`syntax: res, err = p:setTransactionListener(transactionListener)`

`transactionListener` is a table that contains two functions as follows:

- `executeLocalTransaction(self, msg, arg)`
- `checkLocalTransaction(self, msg)`

#### sendMessageInTransaction
`syntax: res, err = p:sendMessageInTransaction(topic, arg, message, tags, keys, waitStoreMsgOk)`

#### start
`syntax: p:start()`

note that if you don't call p:start() before sending messages, messages will be sent successfully, but the trace is not send.

#### stop
`syntax: p:stop()`

Expand Down
29 changes: 14 additions & 15 deletions lib/resty/rocketmq/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local client = require("resty.rocketmq.client")
local utils = require("resty.rocketmq.utils")
local trace = require("resty.rocketmq.trace")
local RESPONSE_CODE = core.RESPONSE_CODE
local cjson_safe = require("cjson.safe")

---@class producer
local _M = {}
Expand Down Expand Up @@ -69,16 +70,7 @@ local function sendHeartbeatToAllBroker(self)
local heartbeatData = {
clientID = '' .. ngx.worker.pid(),
producerDataSet = { { groupName = self.groupName } },
consumerDataSet = {
{
groupName = self.groupName,
consumerType = self.groupName,
messageModel = self.groupName,
consumerFromWhere = self.groupName,
subscriptionDataSet = self.groupName,
unitMode = self.groupName,
}
}
consumerDataSet = setmetatable({}, cjson_safe.empty_array_mt)
}
for brokerName, brokers in pairs(self.client.brokerAddrTable) do
local addr = brokers[0]
Expand Down Expand Up @@ -186,7 +178,7 @@ local function produce(self, msg)
return h
end

function _M:produce(topic, message, tags, keys, waitStoreMsgOk)
function _M:send(topic, message, tags, keys, waitStoreMsgOk)
return produce(self, {
producerGroup = self.groupName,
topic = topic,
Expand All @@ -209,11 +201,18 @@ function _M:produce(topic, message, tags, keys, waitStoreMsgOk)
})
end

function _M:setTransactionListener(transactionListener)
if type(transactionListener.executeLocalTransaction) ~= 'function' then
return nil, 'invalid callback'
end
self.transactionListener = transactionListener
end


-- todo add check callback
function _M:transactionProduce(topic, execute, arg, message, tags, keys, waitStoreMsgOk)
if type(execute) ~= 'function' then
return nil, 'invalid callback'
function _M:sendMessageInTransaction(topic, arg, message, tags, keys, waitStoreMsgOk)
if not self.transactionListener then
return nil, "TransactionListener is null"
end
local msg = {
producerGroup = self.groupName,
Expand Down Expand Up @@ -245,7 +244,7 @@ function _M:transactionProduce(topic, execute, arg, message, tags, keys, waitSto
if h.code == RESPONSE_CODE.SUCCESS then
msg.properties.__transationId__ = h.transationId
msg.transationId = msg.properties.UNIQ_KEY
localTransactionState = execute(msg, arg)
localTransactionState = self.transactionListener:executeLocalTransaction(msg, arg)
else
localTransactionState = core.TRANSACTION_ROLLBACK_TYPE
end
Expand Down
2 changes: 1 addition & 1 deletion lib/resty/rocketmq/trace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ local function sendTraceDataByMQ(self, keySet, data, topic)
for k in pairs(keySet) do
table.insert(keys, k)
end
local res, err = self.producer:produce(core.RMQ_SYS_TRACE_TOPIC, data, "", table.concat(keys, ' '))
local res, err = self.producer:send(core.RMQ_SYS_TRACE_TOPIC, data, "", table.concat(keys, ' '))
if err then
ngx.log(ngx.WARN, 'send msg trace fail, ', err)
end
Expand Down
2 changes: 1 addition & 1 deletion t/producer.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ok
return
end

local res, err = p:produce("TopicTest", message)
local res, err = p:send("TopicTest", message)
if not res then
ngx.say("send err:", err)
return
Expand Down
2 changes: 1 addition & 1 deletion t/trace.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ __DATA__
return
end
p:start()
local res, err = p:produce("TopicTest", "halo world", "tags", "keys")
local res, err = p:send("TopicTest", "halo world", "tags", "keys")
if not res then
ngx.say("send err:", err)
return
Expand Down

0 comments on commit 7562f8e

Please sign in to comment.