diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index aee7f5c8909..caee2f9ed04 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -620,6 +620,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", len(tp.Fields), len(bindLocations)) } + if len(row.Lengths) < len(tp.Fields) { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of lengths: got %d lengths for %d fields", + len(row.Lengths), len(tp.Fields)) + } type colInfo struct { typ querypb.Type diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 3a0996e0c39..1e3892c0f05 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -84,6 +84,11 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings defer rowsCopiedTicker.Stop() parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers))) + // For now do not support concurrent inserts for atomic copies. + if parallelism > 1 { + parallelism = 1 + log.Infof("Disabling concurrent inserts for atomic copies") + } copyWorkerFactory := vc.newCopyWorkerFactory(parallelism) var copyWorkQueue *vcopierCopyWorkQueue @@ -154,7 +159,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings lastpk = nil // pkfields are only used for logging, so that we can monitor progress. - pkfields = make([]*querypb.Field, len(resp.Pkfields)) + pkfields = make([]*querypb.Field, 0, len(resp.Pkfields)) for _, f := range resp.Pkfields { pkfields = append(pkfields, f.CloneVT()) }