Skip to content

Commit

Permalink
Check if the vindex has an owner while validating externalized vindex
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Jan 14, 2025
1 parent 18b7de1 commit e866118
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 59 deletions.
36 changes: 23 additions & 13 deletions go/vt/vtctl/workflow/lookup_vindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -542,9 +543,25 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri
return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines)
}

// validateExternalizedVindex checks if a given vindex is externalized.
// A vindex is considered externalized if it has an owner and is not in write-only mode.
func (lv *lookupVindex) validateExternalizedVindex(vindex *vschemapb.Vindex) error {
writeOnly, ok := vindex.Params["write_only"]
if ok && writeOnly == "true" {
return fmt.Errorf("vindex is in write-only mode")
}
if vindex.Owner == "" {
return fmt.Errorf("vindex has no owner")
}
return nil
}

// validateExternalized checks if the vindex has been externalized
// and verifies the state of the VReplication workflow on the target shards.
// It ensures that all streams in the workflow are frozen.
func (lv *lookupVindex) validateExternalized(ctx context.Context, vindex *vschemapb.Vindex, name string, targetShards []*topo.ShardInfo) error {
if _, ok := vindex.Params["write_only"]; ok {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "write_only param found in vindex %s", name)
if err := lv.validateExternalizedVindex(vindex); err != nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s has not been externalized yet: %v", name, err)
}

err := forAllShards(targetShards, func(targetShard *topo.ShardInfo) error {
Expand All @@ -559,19 +576,12 @@ func (lv *lookupVindex) validateExternalized(ctx context.Context, vindex *vschem
return err
}
if res == nil || res.Workflow == "" {
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", name, targetPrimary.Alias)
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", name, topoproto.TabletAliasString(targetPrimary.Alias))
}
for _, stream := range res.Streams {
if vindex.Owner == "" {
// If there's no owner, all streams need to be running.
if stream.State != binlogdatapb.VReplicationWorkflowState_Running {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State)
}
} else {
// If there's an owner, all streams need to be frozen.
if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || stream.Message != Frozen {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not frozen: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message)
}
// All streams need to be frozen.
if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || stream.Message != Frozen {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "stream %d for %v.%v is not frozen: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message)
}
}
return nil
Expand Down
70 changes: 36 additions & 34 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ const (
// Time to wait between LOCK TABLES cycles on the sources during SwitchWrites.
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s"
SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%a and workflow=%a"
)

var (
Expand Down Expand Up @@ -559,7 +559,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
// LookupVindexComplete checks if the lookup vindex has been externalized,
// and if the vindex has an owner, it deletes the workflow.
func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.LookupVindexCompleteRequest) (*vtctldatapb.LookupVindexCompleteResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.LookupVindexInternalize")
span, ctx := trace.NewSpan(ctx, "workflow.Server.LookupVindexComplete")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
Expand All @@ -581,20 +581,18 @@ func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.Look
return nil, err
}

// Assuming that the lookup vindex was externalized, we don't need to
// delete the write_only parameter from the vindex.
// Now that we have checked that the vindex has been externalized,
// we don't need to delete the write_only parameter from the vindex.
resp := &vtctldatapb.LookupVindexCompleteResponse{}
if vindex.Owner != "" {
if _, derr := s.WorkflowDelete(ctx, &vtctldatapb.WorkflowDeleteRequest{
Keyspace: req.TableKeyspace,
Workflow: req.Name,
KeepData: true,
KeepRoutingRules: true,
}); derr != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr)
}
resp.WorkflowDeleted = true
}
if _, derr := s.WorkflowDelete(ctx, &vtctldatapb.WorkflowDeleteRequest{
Keyspace: req.TableKeyspace,
Workflow: req.Name,
KeepData: true,
KeepRoutingRules: true,
}); derr != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr)
}
resp.WorkflowDeleted = true
return resp, nil
}

Expand Down Expand Up @@ -681,7 +679,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
return err
}
if res == nil || res.Workflow == "" {
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, targetPrimary.Alias)
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, topoproto.TabletAliasString(targetPrimary.Alias))
}
for _, stream := range res.Streams {
if stream.Bls.Filter == nil || len(stream.Bls.Filter.Rules) != 1 {
Expand Down Expand Up @@ -720,7 +718,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
KeepData: true, // Not relevant
KeepRoutingRules: true, // Not relevant
}); derr != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr)
return nil, vterrors.Wrapf(derr, "failed to delete workflow %s", req.Name)
}
resp.WorkflowDeleted = true
} else {
Expand Down Expand Up @@ -780,29 +778,33 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L
return nil, err
}

// Make the vindex back to write_only and save the source vschema.
vindex.Params["write_only"] = "true"
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil {
return nil, err
}

resp := &vtctldatapb.LookupVindexInternalizeResponse{}
if vindex.Owner != "" {
err := forAllShards(targetShards, func(si *topo.ShardInfo) error {
tabletInfo, err := s.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
}
query := fmt.Sprintf(SqlUnfreezeWorkflow,
encodeString(tabletInfo.DbName()), encodeString(req.Name))
_, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query)
err = forAllShards(targetShards, func(si *topo.ShardInfo) error {
tabletInfo, err := s.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
})
}
query, err := sqlparser.ParseAndBind(SqlUnfreezeWorkflow,
sqltypes.StringBindVariable(tabletInfo.DbName()),
sqltypes.StringBindVariable(req.Name),
)
if err != nil {
return nil, err
return err
}
resp.WorkflowStarted = true
}

// Make the vindex back to write_only and save the source vschema.
vindex.Params["write_only"] = "true"
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil {
_, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query)
return err
})
if err != nil {
return nil, err
}
resp.WorkflowStarted = true

return resp, s.ts.RebuildSrvVSchema(ctx, nil)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
if req.Message != nil && *req.Message != sqltypes.Null.String() {
if req.Message != nil {
message = *req.Message
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
Expand Down
21 changes: 10 additions & 11 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,12 +2267,6 @@ func TestInternalizeLookupVindex(t *testing.T) {
ms.SourceKeyspace, ms.SourceKeyspace)
ownedRunning := sqltypes.MakeTestResult(fields, "1|Running|msg|"+ownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}")
ownedStopped := sqltypes.MakeTestResult(fields, "1|Stopped|"+workflow.Frozen+"|"+ownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}")
unownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`,
ms.SourceKeyspace, ms.SourceKeyspace)
unownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`,
ms.SourceKeyspace, ms.SourceKeyspace)
unownedRunning := sqltypes.MakeTestResult(fields, "2|Running|msg|"+unownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}")
unownedStopped := sqltypes.MakeTestResult(fields, "2|Stopped|Stopped after copy|"+unownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}")

testcases := []struct {
request *vtctldatapb.LookupVindexInternalizeRequest
Expand Down Expand Up @@ -2308,7 +2302,6 @@ func TestInternalizeLookupVindex(t *testing.T) {
Keyspace: ms.SourceKeyspace,
TableKeyspace: ms.TargetKeyspace,
},
vrResponse: unownedStopped,
expectedVschema: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"unowned_lookup": {
Expand All @@ -2321,7 +2314,7 @@ func TestInternalizeLookupVindex(t *testing.T) {
},
},
},
err: "is not in Running state",
err: "no owner",
},
{
request: &vtctldatapb.LookupVindexInternalizeRequest{
Expand Down Expand Up @@ -2352,7 +2345,6 @@ func TestInternalizeLookupVindex(t *testing.T) {
Keyspace: ms.SourceKeyspace,
TableKeyspace: ms.TargetKeyspace,
},
vrResponse: unownedRunning,
expectedVschema: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"unowned_lookup": {
Expand All @@ -2366,6 +2358,7 @@ func TestInternalizeLookupVindex(t *testing.T) {
},
},
},
err: "no owner",
},
{
request: &vtctldatapb.LookupVindexInternalizeRequest{
Expand Down Expand Up @@ -2399,10 +2392,16 @@ func TestInternalizeLookupVindex(t *testing.T) {
require.NotNil(t, tcase.request, "No request provided")

for _, targetTablet := range targetShards {
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil)
if tcase.vrResponse != nil {
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil)
}
// Update queries are required only if the Vindex is owned.
if len(tcase.expectedVschema.Vindexes) > 0 && tcase.expectedVschema.Vindexes[tcase.request.Name].Owner != "" {
unfreezeQuery := fmt.Sprintf(workflow.SqlUnfreezeWorkflow, sqltypes.EncodeStringSQL("vt_targetks"), sqltypes.EncodeStringSQL(tcase.request.Name))
unfreezeQuery, err := sqlparser.ParseAndBind(workflow.SqlUnfreezeWorkflow,
sqltypes.StringBindVariable("vt_targetks"),
sqltypes.StringBindVariable(tcase.request.Name),
)
require.NoError(t, err)
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, unfreezeQuery, &sqltypes.Result{})
}
}
Expand Down

0 comments on commit e866118

Please sign in to comment.