diff --git a/go.mod b/go.mod index f88d6ad576..52bfe43a96 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,8 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect diff --git a/go.sum b/go.sum index 00a44917d8..e8a9e76be7 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 33382cff99..8b983d0d19 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) { Payload: createTestMessagePayload(1), }) // Producer can't send message - assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull)) + assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull)) } func TestMultiConsumerMemoryLimit(t *testing.T) { diff --git a/pulsar/error.go b/pulsar/error.go index 25498cfba4..f03799342a 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -21,6 +21,7 @@ import ( "fmt" proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/hashicorp/go-multierror" ) // Result used to represent pulsar processing is an alias of type int. @@ -245,3 +246,10 @@ func getErrorFromServerError(serverError *proto.ServerError) error { return newError(UnknownError, serverError.String()) } } + +// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is() +// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar +// go client with go versions that newer than go 1.13 +func joinErrors(errs ...error) error { + return multierror.Append(nil, errs...) +} diff --git a/pulsar/error_test.go b/pulsar/error_test.go new file mode 100644 index 0000000000..5403effc2d --- /dev/null +++ b/pulsar/error_test.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_joinErrors(t *testing.T) { + err1 := errors.New("err1") + err2 := errors.New("err2") + err3 := errors.New("err3") + err := joinErrors(ErrInvalidMessage, err1, err2) + assert.True(t, errors.Is(err, ErrInvalidMessage)) + assert.True(t, errors.Is(err, err1)) + assert.True(t, errors.Is(err, err2)) + assert.False(t, errors.Is(err, err3)) +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 46167d0cf1..1b79053e38 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -51,14 +51,21 @@ const ( ) var ( - errFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed") - errSendTimeout = newError(TimeoutError, "message send timeout") - errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") - errContextExpired = newError(TimeoutError, "message send context expired") - errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") - errMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize") - errProducerClosed = newError(ProducerClosed, "producer already been closed") - errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full") + ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed") + ErrSendTimeout = newError(TimeoutError, "message send timeout") + ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") + ErrContextExpired = newError(TimeoutError, "message send context expired") + ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize") + ErrProducerClosed = newError(ProducerClosed, "producer already been closed") + ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full") + ErrSchema = newError(SchemaFailure, "schema error") + ErrTransaction = errors.New("transaction error") + ErrInvalidMessage = newError(InvalidMessage, "invalid message") + ErrTopicNotfound = newError(TopicNotFound, "topic not found") + ErrTopicTerminated = newError(TopicTerminated, "topic terminated") + ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked") + ErrProducerFenced = newError(ProducerFenced, "producer fenced") buffersPool sync.Pool sendRequestPool *sync.Pool @@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic not found, stop reconnecting, close the producer") - p.doClose(newError(TopicNotFound, err.Error())) + p.doClose(joinErrors(ErrTopicNotfound, err)) break } if strings.Contains(errMsg, errMsgTopicTerminated) { p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") - p.doClose(newError(TopicTerminated, err.Error())) + p.doClose(joinErrors(ErrTopicTerminated, err)) break } if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") - p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error())) + p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err)) break } if strings.Contains(errMsg, errMsgProducerFenced) { p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") - p.doClose(newError(ProducerFenced, err.Error())) + p.doClose(joinErrors(ErrProducerFenced, err)) break } @@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { p.log.WithField("size", sr.uncompressedSize). WithField("properties", sr.msg.Properties). Error("unable to add message to batch") - sr.done(nil, errFailAddToBatch) + sr.done(nil, ErrFailAddToBatch) return } } @@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() { } if errors.Is(err, internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err) + p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", err) } return @@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - sr.done(nil, errSendTimeout) + sr.done(nil, ErrSendTimeout) } // flag the sending has completed with error, flush make no effect - pi.done(errSendTimeout) + pi.done(ErrSendTimeout) pi.Unlock() // finally reached the last view item, current iteration ends @@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i]) + p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i]) return } @@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) validateMsg(msg *ProducerMessage) error { if msg == nil { - return newError(InvalidMessage, "Message is nil") + return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil")) } if msg.Value != nil && msg.Payload != nil { - return newError(InvalidMessage, "Can not set Value and Payload both") + return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both")) } if p.options.DisableMultiSchema { if msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) - return fmt.Errorf("msg schema can not match with producer schema") + return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema")) } } @@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { if txn.state != TxnOpen { p.log.WithField("state", txn.state).Error("Failed to send message" + " by a non-open transaction.") - return newError(InvalidStatus, "Failed to send message by a non-open transaction.") + return joinErrors(ErrTransaction, + fmt.Errorf("failed to send message by a non-open transaction")) } if err := txn.registerProducerTopic(p.topic); err != nil { - return err + return joinErrors(ErrTransaction, err) } if err := txn.registerSendOrAckOp(); err != nil { - return err + return joinErrors(ErrTransaction, err) } sr.transaction = txn @@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error { if schemaVersion == nil { schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) if err != nil { - return fmt.Errorf("get schema version fail, err: %w", err) + return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err)) } p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) } @@ -1097,7 +1105,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error { if sr.msg.Value != nil { if sr.schema == nil { p.log.Errorf("Schema encode message failed %s", sr.msg.Value) - return newError(SchemaFailure, "set schema value without setting schema") + return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema")) } // payload and schema are mutually exclusive @@ -1105,7 +1113,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error { schemaPayload, err := sr.schema.Encode(sr.msg.Value) if err != nil { p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value) - return newError(SchemaFailure, err.Error()) + return joinErrors(ErrSchema, err) } sr.uncompressedPayload = schemaPayload @@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { // if msg is too large and chunking is disabled if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { - p.log.WithError(errMessageTooLarge). + p.log.WithError(ErrMessageTooLarge). WithField("size", checkSize). WithField("properties", sr.msg.Properties). Errorf("MaxMessageSize %d", sr.maxMessageSize) - return errMessageTooLarge + return ErrMessageTooLarge } if sr.sendAsBatch || !p.options.EnableChunking { @@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { } else { sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) if sr.payloadChunkSize <= 0 { - p.log.WithError(errMetaTooLarge). + p.log.WithError(ErrMetaTooLarge). WithField("metadata size", proto.Size(sr.mm)). WithField("properties", sr.msg.Properties). Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) - return errMetaTooLarge + return ErrMetaTooLarge } // set ChunkMaxMessageSize if p.options.ChunkMaxMessageSize != 0 { @@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync( } if p.getProducerState() != producerReady { - sr.done(nil, errProducerClosed) + sr.done(nil, ErrProducerClosed) return } @@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) func (p *partitionProducer) internalClose(req *closeProducer) { defer close(req.doneCh) - p.doClose(errProducerClosed) + p.doClose(ErrProducerClosed) } func (p *partitionProducer) doClose(reason error) { @@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error) { WithField("properties", sr.msg.Properties) } - if errors.Is(err, errSendTimeout) { + if errors.Is(err, ErrSendTimeout) { sr.producer.metrics.PublishErrorsTimeout.Inc() } - if errors.Is(err, errMessageTooLarge) { + if errors.Is(err, ErrMessageTooLarge) { sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() } @@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { for i := 0; i < sr.totalChunks; i++ { if p.blockIfQueueFull() { if !p.publishSemaphore.Acquire(sr.ctx) { - return errContextExpired + return ErrContextExpired } // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case @@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { p.metrics.MessagesPending.Inc() } else { if !p.publishSemaphore.TryAcquire() { - return errSendQueueIsFull + return ErrSendQueueIsFull } // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case @@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error { if p.blockIfQueueFull() { if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) { - return errContextExpired + return ErrContextExpired } } else { if !p.client.memLimit.TryReserveMemory(requiredMem) { - return errMemoryBufferIsFull + return ErrMemoryBufferIsFull } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0f89069243..0d74cdeefe 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1061,8 +1061,8 @@ func TestMaxMessageSize(t *testing.T) { assert.NoError(t, err) defer client.Close() - // Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge - // being masked by an earlier errFailAddToBatch + // Need to set BatchingMaxSize > serverMaxMessageSize to avoid ErrMessageTooLarge + // being masked by an earlier ErrFailAddToBatch producer, err := client.CreateProducer(ProducerOptions{ Topic: newTopicName(), BatchingMaxSize: uint(2 * serverMaxMessageSize), @@ -1088,7 +1088,7 @@ func TestMaxMessageSize(t *testing.T) { // So when bias <= 0, the uncompressed payload will not exceed maxMessageSize, // but encryptedPayloadSize exceeds maxMessageSize, Send() will return an internal error. // When bias = 1, the first check of maxMessageSize (for uncompressed payload) is valid, - // Send() will return errMessageTooLarge + // Send() will return ErrMessageTooLarge for bias := -1; bias <= 1; bias++ { payload := make([]byte, serverMaxMessageSize+bias) ID, err := producer.Send(context.Background(), &ProducerMessage{ @@ -1098,7 +1098,7 @@ func TestMaxMessageSize(t *testing.T) { assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize)) assert.Nil(t, ID) } else { - assert.Equal(t, errMessageTooLarge, err) + assert.True(t, errors.Is(err, ErrMessageTooLarge)) } } @@ -1111,7 +1111,7 @@ func TestMaxMessageSize(t *testing.T) { assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize)) assert.Nil(t, ID) } else { - assert.Equal(t, errMessageTooLarge, err) + assert.True(t, errors.Is(err, ErrMessageTooLarge)) } } } @@ -1190,8 +1190,7 @@ func TestTopicTermination(t *testing.T) { Payload: make([]byte, 1024), }) if err != nil { - e := err.(*Error) - if e.result == TopicTerminated || err == errProducerClosed { + if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) { terminatedChan <- true } else { terminatedChan <- false @@ -2348,7 +2347,7 @@ func TestFailPendingMessageWithClose(t *testing.T) { Payload: make([]byte, 1024), }, func(id MessageID, message *ProducerMessage, e error) { if e != nil { - assert.Equal(t, errProducerClosed, e) + assert.True(t, errors.Is(e, ErrProducerClosed)) } }) }