diff --git a/cmd/cmd.go b/cmd/cmd.go index 8c82fc0..1d10149 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -41,6 +41,7 @@ const ( flagPriority = "priority" flagNode = "node" flagTimeout = "timeout" + flagSkipChangeStream = "skip_change_stream" defaultSchemaFileName = "schema.sql" ) diff --git a/cmd/load.go b/cmd/load.go index 4eedfec..fceccbb 100644 --- a/cmd/load.go +++ b/cmd/load.go @@ -26,6 +26,9 @@ import ( "github.com/spf13/cobra" ) +var ( + skipChangeStream bool +) var loadCmd = &cobra.Command{ Use: "load", Short: "Load schema from server to file", @@ -42,7 +45,7 @@ func load(c *cobra.Command, _ []string) error { } defer client.Close() - ddl, err := client.LoadDDL(ctx) + ddl, err := client.LoadDDL(ctx, skipChangeStream) if err != nil { return &Error{ err: err, @@ -60,3 +63,7 @@ func load(c *cobra.Command, _ []string) error { return nil } + +func init() { + loadCmd.PersistentFlags().BoolVar(&skipChangeStream, flagSkipChangeStream, false, "Skip Change Stream to load") +} diff --git a/pkg/spanner/client.go b/pkg/spanner/client.go index 3a8bad1..17babf3 100644 --- a/pkg/spanner/client.go +++ b/pkg/spanner/client.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sort" + "strings" "cloud.google.com/go/spanner" databasev1 "cloud.google.com/go/spanner/admin/database/apiv1" @@ -174,7 +175,7 @@ func (c *Client) TruncateAllTables(ctx context.Context) error { return nil } -func (c *Client) LoadDDL(ctx context.Context) ([]byte, error) { +func (c *Client) LoadDDL(ctx context.Context, skipChangeStream bool) ([]byte, error) { req := &databasepb.GetDatabaseDdlRequest{Database: c.config.URL()} res, err := c.spannerAdminClient.GetDatabaseDdl(ctx, req) @@ -188,6 +189,9 @@ func (c *Client) LoadDDL(ctx context.Context) ([]byte, error) { var schema []byte last := len(res.Statements) - 1 for index, statement := range res.Statements { + if skipChangeStream && strings.HasPrefix(statement, "CREATE CHANGE STREAM") { + continue + } if index != last { statement += ddlStatementsSeparator + "\n\n" } else { diff --git a/pkg/spanner/client_test.go b/pkg/spanner/client_test.go index 0b2772a..a7b6f11 100644 --- a/pkg/spanner/client_test.go +++ b/pkg/spanner/client_test.go @@ -59,6 +59,7 @@ const ( envSpannerInstanceID = "SPANNER_INSTANCE_ID" envSpannerDatabaseID = "SPANNER_DATABASE_ID" envSpannerEmulatorHost = "SPANNER_EMULATOR_HOST" + skipChangeStream = false ) func TestLoadDDL(t *testing.T) { @@ -68,7 +69,8 @@ func TestLoadDDL(t *testing.T) { client, done := testClientWithDatabase(t, ctx) defer done() - gotDDL, err := client.LoadDDL(ctx) + // we can't include Change Stream schema to testdata because cloud-spanner-emulator hasn't supported it yet. + gotDDL, err := client.LoadDDL(ctx, skipChangeStream) if err != nil { t.Fatalf("failed to load ddl: %v", err) }