Skip to content

Commit

Permalink
[release-21.0] Atomic Copy: Fix panics when the copy phase starts in …
Browse files Browse the repository at this point in the history
…some clusters (#17717) (#17748)

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Feb 12, 2025
1 parent 2e5327f commit 1cdad83
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}
if len(row.Lengths) < len(tp.Fields) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of lengths: got %d lengths for %d fields",
len(row.Lengths), len(tp.Fields))
}

type colInfo struct {
typ querypb.Type
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers)))
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}
copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand Down Expand Up @@ -154,7 +159,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings

lastpk = nil
// pkfields are only used for logging, so that we can monitor progress.
pkfields = make([]*querypb.Field, len(resp.Pkfields))
pkfields = make([]*querypb.Field, 0, len(resp.Pkfields))
for _, f := range resp.Pkfields {
pkfields = append(pkfields, f.CloneVT())
}
Expand Down

0 comments on commit 1cdad83

Please sign in to comment.