diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 0bf3f3bdd..895012b80 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -277,6 +277,9 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() }) d.dealWithBlockEvent(event) + case commonEvent.TypeHandshakeEvent: + log.Info("Receive handshake event", zap.Any("event", event), zap.Stringer("dispatcher", d.id)) + d.checkHandshakeEvents(dispatcherEvents) default: log.Panic("Unexpected event type", zap.Any("event Type", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs())) } @@ -285,6 +288,10 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo } func (d *Dispatcher) checkHandshakeEvents(dispatcherEvents []DispatcherEvent) bool { + if d.isReady.Load() { + log.Warn("Dispatcher is already ready, handshake event is unexpected, FIX ME!", zap.Stringer("dispatcher", d.id)) + return false + } for _, dispatcherEvent := range dispatcherEvents { event := dispatcherEvent.Event if event.GetType() == commonEvent.TypeHandshakeEvent {