diff --git a/internal/trace.go b/internal/trace.go index f7cea8de..011e239b 100644 --- a/internal/trace.go +++ b/internal/trace.go @@ -21,6 +21,8 @@ import ( "bytes" "context" "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/producer" "runtime" "strconv" "strings" @@ -237,8 +239,8 @@ type traceDispatcher struct { // support deliver trace message to other cluster. namesrvs *namesrvs // round robin index - rrindex int32 - cli RMQClient + rrindex int32 + traceProducer rocketmq.Producer } func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher { @@ -273,28 +275,30 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher { srvs.SetCredentials(traceCfg.Credentials) } - cliOp := DefaultClientOptions() - cliOp.GroupName = traceCfg.GroupName - cliOp.NameServerAddrs = traceCfg.NamesrvAddrs - cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT" - cliOp.RetryTimes = 0 - cliOp.Namesrv = srvs - cliOp.Credentials = traceCfg.Credentials - cli := GetOrNewRocketMQClient(cliOp, nil) - if cli == nil { + traceProducer, err := producer.NewDefaultProducer( + producer.WithGroupName(traceCfg.GroupName), + producer.WithNameServerAddrs(traceCfg.NamesrvAddrs), + producer.WithInstanceName("INNER_TRACE_CLIENT_DEFAULT"), + producer.WithRetry(0), + producer.WithNamesrv(srvs), + producer.WithCredentials(traceCfg.Credentials), + ) + if err != nil { + panic(errors.Wrap(err, "new producer failed.")) + } + if traceProducer == nil { return nil } - cliOp.Namesrv = cli.GetNameSrv() return &traceDispatcher{ ctx: ctx, cancel: cancel, - traceTopic: t, - access: traceCfg.Access, - input: make(chan TraceContext, 1024), - batchCh: make(chan []*TraceContext, 2048), - cli: cli, - namesrvs: srvs, + traceTopic: t, + access: traceCfg.Access, + input: make(chan TraceContext, 1024), + batchCh: make(chan []*TraceContext, 2048), + namesrvs: srvs, + traceProducer: traceProducer, } } @@ -304,7 +308,7 @@ func (td *traceDispatcher) GetTraceTopicName() string { func (td *traceDispatcher) Start() { td.running = true - td.cli.Start() + td.traceProducer.Start() maxWaitDuration := 5 * time.Millisecond td.ticker = time.NewTicker(maxWaitDuration) maxWaitTime := maxWaitDuration.Nanoseconds() @@ -462,28 +466,20 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat msg := primitive.NewMessage(traceTopic, []byte(data)) msg.WithKeys(keySet.slice()) - mq, addr := td.findMq(regionID) - if mq == nil { - return - } - - var req = td.buildSendRequest(mq, msg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) { + err := td.traceProducer.SendAsync(ctx, func(ctx context.Context, resp *primitive.SendResult, err error) { cancel() - resp := primitive.NewSendResult() - if e != nil { + if err != nil { rlog.Info("send trace data error.", map[string]interface{}{ "traceData": data, }) } else { - td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg) rlog.Debug("send trace data success:", map[string]interface{}{ "SendResult": resp, "traceData": data, }) } - }) + }, msg) if err != nil { cancel() rlog.Info("send trace data error when invoke", map[string]interface{}{ diff --git a/producer/option.go b/producer/option.go index c3a0dc42..90c38ee5 100644 --- a/producer/option.go +++ b/producer/option.go @@ -65,6 +65,18 @@ func WithGroupName(group string) Option { } } +func WithNameServerAddrs(nameServerAddrs primitive.NamesrvAddr) Option { + return func(opts *producerOptions) { + opts.NameServerAddrs = nameServerAddrs + } +} + +func WithNamesrv(namesrv internal.Namesrvs) Option { + return func(opts *producerOptions) { + opts.Namesrv = namesrv + } +} + func WithInstanceName(name string) Option { return func(opts *producerOptions) { opts.InstanceName = name