From 6513c1566777e1fdfe956053e4273b7a4fcde69d Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 3 Feb 2025 11:49:54 -0600 Subject: [PATCH] use vt15001 in more places and rollback as soon as we detect this error Signed-off-by: Florent Poinsard --- go/vt/vterrors/code.go | 14 ++++++++++++++ go/vt/vtgate/executorcontext/vcursor_impl.go | 14 +++++++------- go/vt/vttablet/tabletserver/state_manager.go | 4 ++-- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 232188a29f8..4c0d8ef9dfd 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -18,6 +18,7 @@ package vterrors import ( "fmt" + "strings" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -279,3 +280,16 @@ func errorWithNoCode(id string, short, long string) func(code vtrpcpb.Code, args } } } + +func ErrorsHaveInvalidSession(errs []error) bool { + for _, err := range errs { + if IsInvalidSessionError(err) { + return true + } + } + return false +} + +func IsInvalidSessionError(err error) bool { + return strings.Contains(err.Error(), VT15001(0).ID) +} diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 16d2cf196d0..1847ce973a7 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -762,11 +762,12 @@ func (vc *VCursorImpl) Execute(ctx context.Context, method string, query string, err := vc.markSavepoint(ctx, rollbackOnError, map[string]*querypb.BindVariable{}) if err != nil { + vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError) return nil, err } qr, err := vc.executor.Execute(ctx, nil, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars) - vc.setRollbackOnPartialExecIfRequired(err != nil, rollbackOnError) + vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), err != nil, rollbackOnError) return qr, err } @@ -794,13 +795,12 @@ func (vc *VCursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards)) err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{}) if err != nil { - // this may be a bit too aggressive, maybe we just want to rollback if we receive a VT15001 error. - vc.setRollbackOnPartialExecIfRequired(true, rollbackOnError) + vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError) return nil, []error{err} } qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.SafeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.observer, fetchLastInsertID) - vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) + vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError) vc.logShardsQueried(primitive, len(rss)) if qr != nil && qr.InsertIDUpdated() { vc.SafeSession.LastInsertId = qr.InsertID @@ -820,7 +820,7 @@ func (vc *VCursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine. } errs := vc.executor.StreamExecuteMulti(ctx, primitive, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.SafeSession, autocommit, callback, vc.observer, fetchLastInsertID) - vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) + vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError) return errs } @@ -905,8 +905,8 @@ func (vc *VCursorImpl) AutocommitApproval() bool { // when the query gets successfully executed on at least one shard, // there does not exist any old savepoint for which rollback is already set // and rollback on error is allowed. -func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(atleastOneSuccess bool, rollbackOnError bool) { - if atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() { +func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(required bool, atleastOneSuccess bool, rollbackOnError bool) { + if required || atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() { vc.SafeSession.SetRollbackCommand() } } diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 8a1d7cb7889..832c99d20c8 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -391,7 +391,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing) + return vterrors.VT15001(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing) } shuttingDown := sm.wantState != StateServing @@ -399,7 +399,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target // We cannot allow adding to the requests to prevent any panics from happening. if (shuttingDown && !allowOnShutdown) || sm.rw.GetWaiterCount() > 0 { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) + return vterrors.VT15001(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) } err = sm.verifyTargetLocked(ctx, target)