Skip to content

Commit

Permalink
refactor: lock when pausing and re-starting the kinesumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mingrammer committed Apr 25, 2022
1 parent d9a97a8 commit b336e1f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
10 changes: 8 additions & 2 deletions kinesumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ type Kinesumer struct {
started chan struct{}

// To wait the running consumer loops when stopping.
wait sync.WaitGroup
stop chan struct{}
wait sync.WaitGroup
stop chan struct{}
// Lock for pausing and starting.
mu *sync.Mutex
close chan struct{}
}

Expand Down Expand Up @@ -214,6 +216,7 @@ func NewKinesumer(cfg *Config) (*Kinesumer, error) {
started: make(chan struct{}),
wait: sync.WaitGroup{},
stop: make(chan struct{}),
mu: &sync.Mutex{},
close: make(chan struct{}),
}

Expand Down Expand Up @@ -665,6 +668,9 @@ func (k *Kinesumer) getNextShardIterator(

// Refresh refreshes the consuming streams.
func (k *Kinesumer) Refresh(streams []string) {
k.mu.Lock()
defer k.mu.Unlock()

k.pause()
// TODO(mingrammer): Deregister the EFO consumers.

Expand Down
3 changes: 3 additions & 0 deletions syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (k *Kinesumer) syncShardInfoForStream(
return nil
}

k.mu.Lock()
defer k.mu.Unlock()

// Update client shard ids.
k.pause() // Pause the current consuming jobs before update shards.
k.shards[stream] = newShards
Expand Down

0 comments on commit b336e1f

Please sign in to comment.