Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1007] fix trace use producer instead client underlay apis. #1095

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"bytes"
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/producer"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -237,8 +239,8 @@ type traceDispatcher struct {
// support deliver trace message to other cluster.
namesrvs *namesrvs
// round robin index
rrindex int32
cli RMQClient
rrindex int32
traceProducer rocketmq.Producer
}

func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
Expand Down Expand Up @@ -273,28 +275,30 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
srvs.SetCredentials(traceCfg.Credentials)
}

cliOp := DefaultClientOptions()
cliOp.GroupName = traceCfg.GroupName
cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
cliOp.RetryTimes = 0
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
cli := GetOrNewRocketMQClient(cliOp, nil)
if cli == nil {
traceProducer, err := producer.NewDefaultProducer(
producer.WithGroupName(traceCfg.GroupName),
producer.WithNameServerAddrs(traceCfg.NamesrvAddrs),
producer.WithInstanceName("INNER_TRACE_CLIENT_DEFAULT"),
producer.WithRetry(0),
producer.WithNamesrv(srvs),
producer.WithCredentials(traceCfg.Credentials),
)
if err != nil {
panic(errors.Wrap(err, "new producer failed."))
}
if traceProducer == nil {
return nil
}
cliOp.Namesrv = cli.GetNameSrv()
return &traceDispatcher{
ctx: ctx,
cancel: cancel,

traceTopic: t,
access: traceCfg.Access,
input: make(chan TraceContext, 1024),
batchCh: make(chan []*TraceContext, 2048),
cli: cli,
namesrvs: srvs,
traceTopic: t,
access: traceCfg.Access,
input: make(chan TraceContext, 1024),
batchCh: make(chan []*TraceContext, 2048),
namesrvs: srvs,
traceProducer: traceProducer,
}
}

Expand All @@ -304,7 +308,7 @@ func (td *traceDispatcher) GetTraceTopicName() string {

func (td *traceDispatcher) Start() {
td.running = true
td.cli.Start()
td.traceProducer.Start()
maxWaitDuration := 5 * time.Millisecond
td.ticker = time.NewTicker(maxWaitDuration)
maxWaitTime := maxWaitDuration.Nanoseconds()
Expand Down Expand Up @@ -462,28 +466,20 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
msg := primitive.NewMessage(traceTopic, []byte(data))
msg.WithKeys(keySet.slice())

mq, addr := td.findMq(regionID)
if mq == nil {
return
}

var req = td.buildSendRequest(mq, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
err := td.traceProducer.SendAsync(ctx, func(ctx context.Context, resp *primitive.SendResult, err error) {
cancel()
resp := primitive.NewSendResult()
if e != nil {
if err != nil {
rlog.Info("send trace data error.", map[string]interface{}{
"traceData": data,
})
} else {
td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
rlog.Debug("send trace data success:", map[string]interface{}{
"SendResult": resp,
"traceData": data,
})
}
})
}, msg)
if err != nil {
cancel()
rlog.Info("send trace data error when invoke", map[string]interface{}{
Expand Down
12 changes: 12 additions & 0 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ func WithGroupName(group string) Option {
}
}

func WithNameServerAddrs(nameServerAddrs primitive.NamesrvAddr) Option {
return func(opts *producerOptions) {
opts.NameServerAddrs = nameServerAddrs
}
}

func WithNamesrv(namesrv internal.Namesrvs) Option {
return func(opts *producerOptions) {
opts.Namesrv = namesrv
}
}

func WithInstanceName(name string) Option {
return func(opts *producerOptions) {
opts.InstanceName = name
Expand Down