Skip to content

Commit

Permalink
1.Pull Consumer Support Assign Subscription Type.
Browse files Browse the repository at this point in the history
2.add several apis for Assign Sub Type: SeekOffset/Assign/OffsetForTimestamp/GetTopicRouteInfo
  • Loading branch information
muyun.cyt committed Dec 2, 2024
1 parent 3538144 commit 4fb3f58
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 129 deletions.
11 changes: 11 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
type PullConsumer interface {
// Start the PullConsumer for consuming message
Start() error
// GetTopicRouteInfo get topic route info
GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error)

// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector) error

// Unsubscribe a topic
Unsubscribe(topic string) error

// Assign assign message queue to consumer
Assign(topic string, mqs []*primitive.MessageQueue) error

// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
Shutdown() error

Expand All @@ -104,6 +109,12 @@ type PullConsumer interface {
// PullFrom pull messages of queue from the offset to offset + numbers
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// SeekOffset seek offset for specific queue
SeekOffset(queue *primitive.MessageQueue, offset int64)

// OffsetForTimestamp get offset of specific queue with timestamp
OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64) (int64, error)

// UpdateOffset updateOffset update offset of queue in mem
UpdateOffset(queue *primitive.MessageQueue, offset int64) error

Expand Down
8 changes: 8 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error {
return nil
}

func (dc *defaultConsumer) isRunning() bool {
return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
}

func (dc *defaultConsumer) isStopped() bool {
return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
}

func (dc *defaultConsumer) persistConsumerOffset() error {
err := dc.makeSureStateOK()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consumer/mock_offset_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 122 additions & 3 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,26 @@ func (cr *ConsumeRequest) GetPQ() *processQueue {
return cr.processQueue
}

type SubscriptionType int

const (
None SubscriptionType = iota
Subscribe
Assign
)

type defaultPullConsumer struct {
*defaultConsumer

topic string
selector MessageSelector
GroupName string
Model MessageModel
SubType SubscriptionType
UnitMode bool
nextQueueSequence int64
allocateQueues []*primitive.MessageQueue
mq2seekOffset sync.Map // key:primitive.MessageQueue,value:seekOffset

done chan struct{}
closeOnce sync.Once
Expand Down Expand Up @@ -116,18 +126,40 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
defaultConsumer: dc,
done: make(chan struct{}, 1),
consumeRequestCache: make(chan *ConsumeRequest, 4),
GroupName: dc.option.GroupName,
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageConcurrently
c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
return c, nil
}

func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) {
topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic)
value, exist := pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
if exist {
return value.([]*primitive.MessageQueue), nil
}
pc.client.UpdateTopicRouteInfo()
value, exist = pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
if !exist {
return nil, errors2.ErrRouteNotFound
}
return value.([]*primitive.MessageQueue), nil
}

func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
if pc.SubType == Assign {
return errors2.ErrSubscriptionType
}

if pc.SubType == None {
pc.SubType = Subscribe
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)

data := buildSubscriptionData(topic, selector)
Expand All @@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector)
}

func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
if pc.SubType == Assign {
return errors2.ErrSubscriptionType
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)
pc.subscriptionDataTable.Delete(topic)
return nil
}

func (pc *defaultPullConsumer) Assign(topic string, mqs []*primitive.MessageQueue) error {
if pc.SubType == Subscribe {
return errors2.ErrSubscriptionType
}
if pc.SubType == None {
pc.SubType = Assign
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)
data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll})
pc.topic = topic
pc.subscriptionDataTable.Store(topic, data)
oldQueues := pc.allocateQueues
pc.allocateQueues = mqs
rlog.Info("pull consumer assign new mqs", map[string]interface{}{
"topic": topic,
"group": pc.GroupName,
"oldMqs": oldQueues,
"newMqs": mqs,
})
if pc.isRunning() {
pc.Rebalance()
}
return nil
}

func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, originOffset int64) int64 {
if pc.SubType != Assign {
return originOffset
}
value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
if !exist {
return originOffset
} else {
nextOffset := value.(int64)
_ = pc.updateOffset(mq, nextOffset)
return nextOffset
}
}

func (pc *defaultPullConsumer) Start() error {
var err error
pc.once.Do(func() {
Expand Down Expand Up @@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string {
}

func (pc *defaultPullConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
switch pc.SubType {
case Assign:
pc.RebalanceViaTopic()
break
case Subscribe:
pc.defaultConsumer.doBalance()
break
}
}

func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
pc.defaultConsumer.doBalanceIfNotPaused()
switch pc.SubType {
case Assign:
pc.RebalanceViaTopic()
break
case Subscribe:
pc.defaultConsumer.doBalanceIfNotPaused()
break
}
}

func (pc *defaultPullConsumer) RebalanceViaTopic() {
changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, pc.allocateQueues)
if changed {
rlog.Info("PullConsumer rebalance result changed ", map[string]interface{}{
rlog.LogKeyAllocateMessageQueue: pc.allocateQueues,
})
}
}

func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
Expand Down Expand Up @@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes

}

func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) {
pc.mq2seekOffset.Store(mq, offset)
rlog.Info("pull consumer seek offset", map[string]interface{}{
"mq": mq,
"offset": offset,
})
}

func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
return pc.searchOffsetByTimestamp(mq, timestamp)
}

func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
if pc.SubType == Assign {
return
}

var allocateQueues []*primitive.MessageQueue
pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
Expand Down Expand Up @@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
sleepTime = _PullDelayTimeWhenError
goto NEXT
}

nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
beginTime := time.Now()
sd := v.(*internal.SubscriptionData)

Expand All @@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
QueueOffset: nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
Expand Down Expand Up @@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error {
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}

if pc.SubType == None {
return errors2.ErrBlankSubType
}

return nil
}
4 changes: 2 additions & 2 deletions consumer/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) {
stats := NewStatsManager()

st := time.Now()
for {
for {
stats.increasePullTPS("rocketmq", "default", 1)
time.Sleep(500*time.Millisecond)
time.Sleep(500 * time.Millisecond)
if time.Now().Sub(st) > 5*time.Minute {
break
}
Expand Down
3 changes: 3 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ var (
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
ErrSubscriptionType = errors.New("subscribe type is not matched")
ErrBlankSubType = errors.New("subscribe type should not be blank")
ErrResponse = errors.New("response error")
ErrCompressLevel = errors.New("unsupported compress level")
ErrUnknownIP = errors.New("unknown IP address")
ErrService = errors.New("service close is not running, please check")
ErrTopicNotExist = errors.New("topic not exist")
ErrRouteNotFound = errors.New("topic route not found")
ErrNotExisted = errors.New("not existed")
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
ErrMultiIP = errors.New("multiple IP addr does not support")
Expand Down
Loading

0 comments on commit 4fb3f58

Please sign in to comment.