Skip to content

Commit

Permalink
[release-19.0] Atomic Copy: Fix panics when the copy phase starts in …
Browse files Browse the repository at this point in the history
…some clusters (#17717) (#17746)

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Feb 12, 2025
1 parent 91b7ef7 commit aa96a3c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}
if len(row.Lengths) < len(tp.Fields) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of lengths: got %d lengths for %d fields",
len(row.Lengths), len(tp.Fields))
}

type colInfo struct {
typ querypb.Type
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := getInsertParallelism()
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}
copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand Down Expand Up @@ -153,7 +158,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings

lastpk = nil
// pkfields are only used for logging, so that we can monitor progress.
pkfields = make([]*querypb.Field, len(resp.Pkfields))
pkfields = make([]*querypb.Field, 0, len(resp.Pkfields))
for _, f := range resp.Pkfields {
pkfields = append(pkfields, f.CloneVT())
}
Expand Down

0 comments on commit aa96a3c

Please sign in to comment.