Skip to content

Commit

Permalink
dispatcher: fix panic (pingcap#388)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Oct 19, 2024
1 parent 4b18b47 commit 600f9f1
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.dealWithBlockEvent(event)
case commonEvent.TypeHandshakeEvent:
log.Info("Receive handshake event", zap.Any("event", event), zap.Stringer("dispatcher", d.id))
d.checkHandshakeEvents(dispatcherEvents)
default:
log.Panic("Unexpected event type", zap.Any("event Type", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs()))
}
Expand All @@ -285,6 +288,10 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo
}

func (d *Dispatcher) checkHandshakeEvents(dispatcherEvents []DispatcherEvent) bool {
if d.isReady.Load() {
log.Warn("Dispatcher is already ready, handshake event is unexpected, FIX ME!", zap.Stringer("dispatcher", d.id))
return false
}
for _, dispatcherEvent := range dispatcherEvents {
event := dispatcherEvent.Event
if event.GetType() == commonEvent.TypeHandshakeEvent {
Expand Down

0 comments on commit 600f9f1

Please sign in to comment.