From ef6b562836f7732994b87239d4e02e5d1d6e0bdd Mon Sep 17 00:00:00 2001
From: reugn <reugpro@gmail.com>
Date: Tue, 17 Dec 2024 16:57:23 +0200
Subject: [PATCH 1/2] fix: wrap errors using %w to preserve context

---
 oauth2/auth.go                               |  2 +-
 oauth2/cache/cache.go                        | 10 +++---
 oauth2/store/keyring.go                      | 13 ++++----
 oauth2/store/store.go                        |  2 +-
 pulsar/consumer_partition.go                 | 20 ++++--------
 pulsar/crypto/default_message_crypto.go      | 14 ++++----
 pulsar/internal/commands.go                  |  2 +-
 pulsar/internal/connection.go                |  2 +-
 pulsar/internal/crypto/producer_encryptor.go |  6 ++--
 pulsar/primitiveSerDe.go                     |  4 +--
 pulsar/producer_impl.go                      |  2 +-
 pulsar/table_view_impl.go                    | 34 ++++++++++----------
 12 files changed, 52 insertions(+), 59 deletions(-)

diff --git a/oauth2/auth.go b/oauth2/auth.go
index 9f4293b5b2..a4e72d2f19 100644
--- a/oauth2/auth.go
+++ b/oauth2/auth.go
@@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
 	p := jwt.Parser{}
 	claims := jwt.MapClaims{}
 	if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
-		return "", fmt.Errorf("unable to decode the access token: %v", err)
+		return "", fmt.Errorf("unable to decode the access token: %w", err)
 	}
 	username, ok := claims[ClaimNameUserName]
 	if !ok {
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
index b7f89b2127..e2279843e0 100644
--- a/oauth2/cache/cache.go
+++ b/oauth2/cache/cache.go
@@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
 	// load from the store and use the access token if it isn't expired
 	grant, err := t.store.LoadGrant(t.audience)
 	if err != nil {
-		return nil, fmt.Errorf("LoadGrant: %v", err)
+		return nil, fmt.Errorf("LoadGrant: %w", err)
 	}
 	t.token = grant.Token
 	if t.token != nil && t.validateAccessToken(*t.token) {
@@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
 	// obtain and cache a fresh access token
 	grant, err = t.refresher.Refresh(grant)
 	if err != nil {
-		return nil, fmt.Errorf("RefreshGrant: %v", err)
+		return nil, fmt.Errorf("RefreshGrant: %w", err)
 	}
 	t.token = grant.Token
 	err = t.store.SaveGrant(t.audience, *grant)
 	if err != nil {
 		// TODO log rather than throw
-		return nil, fmt.Errorf("SaveGrant: %v", err)
+		return nil, fmt.Errorf("SaveGrant: %w", err)
 	}
 
 	return t.token, nil
@@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
 	}
 	grant, err := t.store.LoadGrant(t.audience)
 	if err != nil {
-		return fmt.Errorf("LoadGrant: %v", err)
+		return fmt.Errorf("LoadGrant: %w", err)
 	}
 	if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
 		grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
 		err = t.store.SaveGrant(t.audience, *grant)
 		if err != nil {
 			// TODO log rather than throw
-			return fmt.Errorf("SaveGrant: %v", err)
+			return fmt.Errorf("SaveGrant: %w", err)
 		}
 	}
 	return nil
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
index 8a024f5920..70fba5b0b7 100644
--- a/oauth2/store/keyring.go
+++ b/oauth2/store/keyring.go
@@ -20,6 +20,7 @@ package store
 import (
 	"crypto/sha1"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"sync"
 
@@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, e
 
 	item, err := f.getItem(audience)
 	if err != nil {
-		if err == keyring.ErrKeyNotFound {
+		if errors.Is(err, keyring.ErrKeyNotFound) {
 			return nil, ErrNoAuthenticationData
 		}
 		return nil, err
@@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string, error) {
 	key := hashKeyringKey(audience)
 	authItem, err := f.kr.Get(key)
 	if err != nil {
-		if err == keyring.ErrKeyNotFound {
+		if errors.Is(err, keyring.ErrKeyNotFound) {
 			return "", ErrNoAuthenticationData
 		}
-		return "", fmt.Errorf("unable to get information from the keyring: %v", err)
+		return "", fmt.Errorf("unable to get information from the keyring: %w", err)
 	}
 	return authItem.Label, nil
 }
@@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
 	var err error
 	keys, err := f.kr.Keys()
 	if err != nil {
-		return fmt.Errorf("unable to get information from the keyring: %v", err)
+		return fmt.Errorf("unable to get information from the keyring: %w", err)
 	}
 	for _, key := range keys {
 		err = f.kr.Remove(key)
 	}
 	if err != nil {
-		return fmt.Errorf("unable to update the keyring: %v", err)
+		return fmt.Errorf("unable to update the keyring: %w", err)
 	}
 	return nil
 }
@@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
 	}
 	err = f.kr.Set(i)
 	if err != nil {
-		return fmt.Errorf("unable to update the keyring: %v", err)
+		return fmt.Errorf("unable to update the keyring: %w", err)
 	}
 	return nil
 }
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
index 55d4c9ec38..5e916920e0 100644
--- a/oauth2/store/store.go
+++ b/oauth2/store/store.go
@@ -26,7 +26,7 @@ import (
 // ErrNoAuthenticationData indicates that stored authentication data is not available
 var ErrNoAuthenticationData = errors.New("authentication data is not available")
 
-// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+// ErrUnsupportedAuthData indicates that stored authentication data is unusable
 var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
 
 // Store is responsible for persisting authorization grants
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 98848d71e4..bbddb594e2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	// error decrypting the payload
 	if err != nil {
 		// default crypto failure action
-		crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
+		cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
 		if pc.options.decryption != nil {
-			crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
+			cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
 		}
 
-		switch crypToFailureAction {
+		switch cryptoFailureAction {
 		case crypto.ConsumerCryptoFailureActionFail:
-			pc.log.Errorf("consuming message failed due to decryption err :%v", err)
+			pc.log.Errorf("consuming message failed due to decryption err: %v", err)
 			pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
 			return err
 		case crypto.ConsumerCryptoFailureActionDiscard:
 			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
-			return fmt.Errorf("discarding message on decryption error :%v", err)
+			return fmt.Errorf("discarding message on decryption error: %w", err)
 		case crypto.ConsumerCryptoFailureActionConsume:
-			pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
+			pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
 			messages := []*message{
 				{
 					publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
@@ -1775,14 +1775,6 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 		return
 	}
 
-	if state == consumerClosed || state == consumerClosing {
-		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
-		if pc.nackTracker != nil {
-			pc.nackTracker.Close()
-		}
-		return
-	}
-
 	pc.setConsumerState(consumerClosing)
 	pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
diff --git a/pulsar/crypto/default_message_crypto.go b/pulsar/crypto/default_message_crypto.go
index 2239fa394c..c0a38f53d0 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyR
 	d.cipherLock.Lock()
 	defer d.cipherLock.Unlock()
 	if keyName == "" || keyReader == nil {
-		return fmt.Errorf("keyname or keyreader is null")
+		return fmt.Errorf("keyname or keyreader is nil")
 	}
 
 	// read the public key and its info using keyReader
@@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
 func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
 	payload []byte,
 	keyReader KeyReader) ([]byte, error) {
-	// if data key is present, attempt to derypt using the existing key
+	// if data key is present, attempt to decrypt using the existing key
 	if d.dataKey != nil {
 		decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
 		if err != nil {
@@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey,
 
 // read the public key into RSA key
 func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
-	var publickKey gocrypto.PublicKey
+	var publicKey gocrypto.PublicKey
 
 	pubPem, _ := pem.Decode(key)
 	if pubPem == nil {
-		return publickKey, fmt.Errorf("failed to decode public key")
+		return publicKey, fmt.Errorf("failed to decode public key")
 	}
 
 	genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
 	if err != nil {
-		return publickKey, err
+		return publicKey, err
 	}
-	publickKey = genericPublicKey
+	publicKey = genericPublicKey
 
-	return publickKey, nil
+	return publicKey, nil
 }
 
 func generateDataKey() ([]byte, error) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7471ee0d46..b9f462341e 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
 	encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
 	if err != nil {
 		// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
-		return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+		return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error: %w", err)
 	}
 
 	cmdSize := uint32(proto.Size(cmdSend))
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 84c4323d9c..1faccc918e 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -947,7 +947,7 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
 	resourceID := commandTopicMigrated.GetResourceId()
 	migratedBrokerServiceURL := c.getMigratedBrokerServiceURL(commandTopicMigrated)
 	if migratedBrokerServiceURL == "" {
-		c.log.Warnf("Failed to find the migrated broker url for resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
+		c.log.Warnf("Failed to find the migrated broker url for resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
 			resourceID,
 			commandTopicMigrated.GetBrokerServiceUrl(),
 			commandTopicMigrated.GetBrokerServiceUrlTls())
diff --git a/pulsar/internal/crypto/producer_encryptor.go b/pulsar/internal/crypto/producer_encryptor.go
index a5b972da10..01dc5f83ce 100644
--- a/pulsar/internal/crypto/producer_encryptor.go
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -55,11 +55,11 @@ func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetad
 		crypto.NewMessageMetadataSupplier(msgMetadata),
 		payload)
 
-	// error encryping the payload
+	// error encrypting the payload
 	if err != nil {
 		// error occurred in encrypting the payload
 		// crypto ProducerCryptoFailureAction is set to send
-		// send unencrypted message
+		// unencrypted message
 		if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
 			e.logger.
 				WithError(err).
@@ -67,7 +67,7 @@ func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetad
 			return payload, nil
 		}
 
-		return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
+		return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload: %w", err)
 	}
 	return encryptedPayload, nil
 }
diff --git a/pulsar/primitiveSerDe.go b/pulsar/primitiveSerDe.go
index 0d53aa1c36..da3d8491ad 100644
--- a/pulsar/primitiveSerDe.go
+++ b/pulsar/primitiveSerDe.go
@@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64,
 
 func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
 	if len(buf) < 8 {
-		return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
+		return 0, fmt.Errorf("cannot decode binary double: %w", io.ErrShortBuffer)
 	}
 	return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
 }
 
 func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
 	if len(buf) < 4 {
-		return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
+		return 0, fmt.Errorf("cannot decode binary float: %w", io.ErrShortBuffer)
 	}
 	return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
 }
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ca923108fe..8e970d28fe 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -143,7 +143,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 				true,
 				client.log.SubLogger(log.Fields{"topic": p.topic}))
 			if err != nil {
-				return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %v", err)
+				return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err)
 			}
 			p.options.Encryption.MessageCrypto = messageCrypto
 		}
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 60b66e33d0..5f7f4e2b34 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -41,8 +41,8 @@ type TableViewImpl struct {
 	dataMu sync.Mutex
 	data   map[string]interface{}
 
-	readersMu    sync.Mutex
-	cancelRaders map[string]cancelReader
+	readersMu     sync.Mutex
+	cancelReaders map[string]cancelReader
 
 	listenersMu sync.Mutex
 	listeners   []func(string, interface{}) error
@@ -73,12 +73,12 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) {
 	}
 
 	tv := TableViewImpl{
-		client:       client,
-		options:      options,
-		data:         make(map[string]interface{}),
-		cancelRaders: make(map[string]cancelReader),
-		logger:       logger,
-		closedCh:     make(chan struct{}),
+		client:        client,
+		options:       options,
+		data:          make(map[string]interface{}),
+		cancelReaders: make(map[string]cancelReader),
+		logger:        logger,
+		closedCh:      make(chan struct{}),
 	}
 
 	// Do an initial round of partition update check to make sure we can populate the partition readers
@@ -104,16 +104,16 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
 	tv.readersMu.Lock()
 	defer tv.readersMu.Unlock()
 
-	for partition, cancelReader := range tv.cancelRaders {
+	for partition, cancelReader := range tv.cancelReaders {
 		if _, ok := partitions[partition]; !ok {
 			cancelReader.cancelFunc()
 			cancelReader.reader.Close()
-			delete(tv.cancelRaders, partition)
+			delete(tv.cancelReaders, partition)
 		}
 	}
 
 	for partition := range partitions {
-		if _, ok := tv.cancelRaders[partition]; !ok {
+		if _, ok := tv.cancelReaders[partition]; !ok {
 			reader, err := newReader(tv.client, ReaderOptions{
 				Topic:          partition,
 				StartMessageID: EarliestMessageID(),
@@ -127,14 +127,14 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
 			for reader.HasNext() {
 				msg, err := reader.Next(context.Background())
 				if err != nil {
-					tv.logger.Errorf("read next message failed for %s: %w", partition, err)
+					tv.logger.Errorf("read next message failed for %s: %v", partition, err)
 				}
 				if msg != nil {
 					tv.handleMessage(msg)
 				}
 			}
 			ctx, cancelFunc := context.WithCancel(context.Background())
-			tv.cancelRaders[partition] = cancelReader{
+			tv.cancelReaders[partition] = cancelReader{
 				reader:     reader,
 				cancelFunc: cancelFunc,
 			}
@@ -148,7 +148,7 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
 func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
 	for {
 		if err := tv.partitionUpdateCheck(); err != nil {
-			tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
+			tv.logger.Errorf("failed to check for changes in number of partitions: %v", err)
 		}
 		select {
 		case <-tv.closedCh:
@@ -236,7 +236,7 @@ func (tv *TableViewImpl) Close() {
 
 	if !tv.closed {
 		tv.closed = true
-		for _, cancelReader := range tv.cancelRaders {
+		for _, cancelReader := range tv.cancelReaders {
 			cancelReader.reader.Close()
 		}
 		close(tv.closedCh)
@@ -259,7 +259,7 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
 
 	for _, listener := range tv.listeners {
 		if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
-			tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
+			tv.logger.Errorf("table view listener failed for %v: %v", msg, err)
 		}
 	}
 }
@@ -268,7 +268,7 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader R
 	for {
 		msg, err := reader.Next(ctx)
 		if err != nil {
-			tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
+			tv.logger.Errorf("read next message failed for %s: %v", reader.Topic(), err)
 		}
 		var e *Error
 		if (errors.As(err, &e) && e.Result() == ConsumerClosed) || errors.Is(err, context.Canceled) {

From b4e75bd561ac187a0ad634eeaa252163586ae339 Mon Sep 17 00:00:00 2001
From: reugn <reugpro@gmail.com>
Date: Wed, 18 Dec 2024 21:30:42 +0200
Subject: [PATCH 2/2] move the consumer state check

---
 pulsar/consumer_partition.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index bbddb594e2..520d9e8dd3 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1767,6 +1767,14 @@ func (pc *partitionConsumer) runEventsLoop() {
 func (pc *partitionConsumer) internalClose(req *closeRequest) {
 	defer close(req.doneCh)
 	state := pc.getConsumerState()
+	if state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
+		if pc.nackTracker != nil {
+			pc.nackTracker.Close()
+		}
+		return
+	}
+
 	if state != consumerReady {
 		// this might be redundant but to ensure nack tracker is closed
 		if pc.nackTracker != nil {