From 600f9f1d7edbc040fa53cb377688cda4c0f072c8 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Sat, 19 Oct 2024 10:21:35 +0800 Subject: [PATCH] dispatcher: fix panic (#388) Signed-off-by: dongmen <414110582@qq.com> --- downstreamadapter/dispatcher/dispatcher.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 0bf3f3bdd..895012b80 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -277,6 +277,9 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() }) d.dealWithBlockEvent(event) + case commonEvent.TypeHandshakeEvent: + log.Info("Receive handshake event", zap.Any("event", event), zap.Stringer("dispatcher", d.id)) + d.checkHandshakeEvents(dispatcherEvents) default: log.Panic("Unexpected event type", zap.Any("event Type", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs())) } @@ -285,6 +288,10 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo } func (d *Dispatcher) checkHandshakeEvents(dispatcherEvents []DispatcherEvent) bool { + if d.isReady.Load() { + log.Warn("Dispatcher is already ready, handshake event is unexpected, FIX ME!", zap.Stringer("dispatcher", d.id)) + return false + } for _, dispatcherEvent := range dispatcherEvents { event := dispatcherEvent.Event if event.GetType() == commonEvent.TypeHandshakeEvent {