Skip to content

Commit

Permalink
Atomic Copy: Fix panics when the copy phase starts in some clusters (#…
Browse files Browse the repository at this point in the history
…17717)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Feb 12, 2025
1 parent b5ede71 commit c47f1bd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations",
len(tp.Fields), len(bindLocations))
}
if len(row.Lengths) < len(tp.Fields) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of lengths: got %d lengths for %d fields",
len(row.Lengths), len(tp.Fields))
}

// Bind field values to locations.
var (
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers)))
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}
copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand Down Expand Up @@ -154,7 +159,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings

lastpk = nil
// pkfields are only used for logging, so that we can monitor progress.
pkfields = make([]*querypb.Field, len(resp.Pkfields))
pkfields = make([]*querypb.Field, 0, len(resp.Pkfields))
for _, f := range resp.Pkfields {
pkfields = append(pkfields, f.CloneVT())
}
Expand Down

0 comments on commit c47f1bd

Please sign in to comment.