Skip to content

Commit

Permalink
use vt15001 in more places and rollback as soon as we detect this error
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Feb 3, 2025
1 parent 2af7e91 commit 6513c15
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
14 changes: 14 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vterrors

import (
"fmt"
"strings"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -279,3 +280,16 @@ func errorWithNoCode(id string, short, long string) func(code vtrpcpb.Code, args
}
}
}

func ErrorsHaveInvalidSession(errs []error) bool {
for _, err := range errs {
if IsInvalidSessionError(err) {
return true
}
}
return false
}

func IsInvalidSessionError(err error) bool {
return strings.Contains(err.Error(), VT15001(0).ID)
}
14 changes: 7 additions & 7 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,12 @@ func (vc *VCursorImpl) Execute(ctx context.Context, method string, query string,

err := vc.markSavepoint(ctx, rollbackOnError, map[string]*querypb.BindVariable{})
if err != nil {
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError)
return nil, err
}

qr, err := vc.executor.Execute(ctx, nil, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars)
vc.setRollbackOnPartialExecIfRequired(err != nil, rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), err != nil, rollbackOnError)

return qr, err
}
Expand Down Expand Up @@ -794,13 +795,12 @@ func (vc *VCursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
if err != nil {
// this may be a bit too aggressive, maybe we just want to rollback if we receive a VT15001 error.
vc.setRollbackOnPartialExecIfRequired(true, rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError)
return nil, []error{err}
}

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.SafeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.observer, fetchLastInsertID)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError)
vc.logShardsQueried(primitive, len(rss))
if qr != nil && qr.InsertIDUpdated() {
vc.SafeSession.LastInsertId = qr.InsertID
Expand All @@ -820,7 +820,7 @@ func (vc *VCursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.
}

errs := vc.executor.StreamExecuteMulti(ctx, primitive, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.SafeSession, autocommit, callback, vc.observer, fetchLastInsertID)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError)

return errs
}
Expand Down Expand Up @@ -905,8 +905,8 @@ func (vc *VCursorImpl) AutocommitApproval() bool {
// when the query gets successfully executed on at least one shard,
// there does not exist any old savepoint for which rollback is already set
// and rollback on error is allowed.
func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(atleastOneSuccess bool, rollbackOnError bool) {
if atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() {
func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(required bool, atleastOneSuccess bool, rollbackOnError bool) {
if required || atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() {
vc.SafeSession.SetRollbackCommand()
}
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target

if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled {
// This specific error string needs to be returned for vtgate buffering to work.
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing)
return vterrors.VT15001(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing)
}

shuttingDown := sm.wantState != StateServing
// If wait counter for the requests is not zero, then there are go-routines blocked on waiting for requests to be empty.
// We cannot allow adding to the requests to prevent any panics from happening.
if (shuttingDown && !allowOnShutdown) || sm.rw.GetWaiterCount() > 0 {
// This specific error string needs to be returned for vtgate buffering to work.
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown)
return vterrors.VT15001(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown)
}

err = sm.verifyTargetLocked(ctx, target)
Expand Down

0 comments on commit 6513c15

Please sign in to comment.