Skip to content

Commit

Permalink
bulker: disable stale topics logic by default
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Oct 2, 2024
1 parent 8a48c54 commit 7907a34
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
2 changes: 2 additions & 0 deletions bulkerapp/app/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Config struct {
InstanceIndex int `mapstructure:"INSTANCE_INDEX" default:"0"`
ShardsCount int `mapstructure:"SHARDS" default:"1"`
EnableConsumers bool `mapstructure:"ENABLE_CONSUMERS" default:"true"`
// Suspend consumers for topics not receiving events for longer than KAFKA_TOPIC_RETENTION_HOURS
StaleTopics bool `mapstructure:"STALE_TOPICS" default:"false"`

// # GRACEFUL SHUTDOWN
//Timeout that give running batch tasks time to finish during shutdown.
Expand Down
48 changes: 26 additions & 22 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,29 @@ func (tm *TopicManager) LoadMetadata() {
tm.Errorf("Error getting metadata: %v", err)
} else {
topicsLastMessageDates := map[string]*time.Time{}
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
t := topic.Topic
if !strings.HasPrefix(t, "__") {
for _, partition := range topic.Partitions {
topicPartitionOffsets[kafka.TopicPartition{Topic: &t, Partition: partition.ID}] = kafka.MaxTimestampOffsetSpec
if tm.config.StaleTopics {
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
t := topic.Topic
if !strings.HasPrefix(t, "__") {
for _, partition := range topic.Partitions {
topicPartitionOffsets[kafka.TopicPartition{Topic: &t, Partition: partition.ID}] = kafka.MaxTimestampOffsetSpec
}
}
}
}
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 && offset.Timestamp > 0 {
lastMessageDate := time.UnixMilli(offset.Timestamp)
topicsLastMessageDates[*tp.Topic] = &lastMessageDate
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 && offset.Timestamp > 0 {
lastMessageDate := time.UnixMilli(offset.Timestamp)
topicsLastMessageDates[*tp.Topic] = &lastMessageDate
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}

tm.processMetadata(metadata, topicsLastMessageDates)
Expand Down Expand Up @@ -192,11 +194,13 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
abandonedTopicsCount++
continue
}
lastMessageDate, ok := tm.topicLastActiveDate[topic]
if !ok || lastMessageDate.Before(staleTopicsCutOff) {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
continue
if tm.config.StaleTopics {
lastMessageDate, ok := tm.topicLastActiveDate[topic]
if !ok || lastMessageDate.Before(staleTopicsCutOff) {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
continue
}
}
destinationId, mode, tableName, err := ParseTopicId(topic)
if err != nil {
Expand Down

0 comments on commit 7907a34

Please sign in to comment.