Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SchemaMigrationsHistory to determine unapplied out of order migrations
Browse files Browse the repository at this point in the history
This commit adds a `SchemaMigrationsHistory` table which:

- Provides users with more information about the time and order in
which migration scripts were applied to their database. This is exposed
via the `wrench migrate history` command

- Allows out of order migration scripts to be applied to a database by
determining what subset of migrations are yet to be applied in contrast
to comparing a single version number. This flexibility allows production
hotfixes to be applied on a diverging branch and having those changes
merged back to non production environments which may already be ahead of
production.

In addition to the new functionality around `SchemaMigrationsHistory`,
existing users of wrench will have their databases upgraded to use the
new tracking table by backfilling the missing history. New users of
wrench will have both `SchemaMigrations` and `SchemaMigrationsHistory`
from the start and no backfilling is necessary.
RoryQ committed Jun 16, 2020
1 parent 3519db2 commit 6a440ad
Showing 10 changed files with 570 additions and 32 deletions.
35 changes: 29 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -30,18 +30,34 @@ $ wrench migrate up --directory ./_examples
# load ddl from database to file ./_examples/schema.sql
$ wrench load --directory ./_examples

# show time and date of migrations
$ wrench migrate history
Version Dirty Created Modified
1 false 2020-06-16 08:07:11.763755 +0000 UTC 2020-06-16 08:07:11.76998 +0000 UTC

# finally, we have successfully migrated database!
$ cat ./_examples/schema.sql
CREATE TABLE SchemaMigrations (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
) PRIMARY KEY(Version);

CREATE TABLE Singers (
SingerID STRING(36) NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
) PRIMARY KEY(SingerID);

CREATE TABLE SchemaMigrations (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
) PRIMARY KEY(Version);

CREATE TABLE SchemaMigrationsHistory (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
Created TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
Modified TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
) PRIMARY KEY(Version);
```

## Installation
@@ -107,7 +123,14 @@ This creates a next migration file like `_examples/migrations/000001.sql`. You w
$ wrench migrate up --directory ./_examples
```

This executes migrations. This also creates `SchemaMigrations` table into your database to manage schema version if it does not exist.
This executes migrations. This also creates `SchemaMigrations` & `SchemaMigrationsHistory` tables in your database to manage schema version if it does not exist.

### Migrations history
```sh
$ wrench migrate history
```
This displays the history of migrations applied to your database, ordered by when they were first attempted.
Migrations left in a dirty state and subsequently retried are reflected in the Modified timestamp.

### Apply single DDL/DML

17 changes: 17 additions & 0 deletions _examples/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
CREATE TABLE Singers (
SingerID STRING(36) NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
) PRIMARY KEY(SingerID);

CREATE TABLE SchemaMigrations (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
) PRIMARY KEY(Version);

CREATE TABLE SchemaMigrationsHistory (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
Created TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
Modified TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
) PRIMARY KEY(Version);
61 changes: 57 additions & 4 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
@@ -25,8 +25,10 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/tabwriter"

"github.com/cloudspannerecosystem/wrench/pkg/spanner"
"github.com/spf13/cobra"
@@ -64,12 +66,18 @@ func init() {
Short: "Set version V but don't run migration (ignores dirty state)",
RunE: migrateSet,
}
migrateHistoryCmd := &cobra.Command{
Use: "history",
Short: "Print migration version history",
RunE: migrateHistory,
}

migrateCmd.AddCommand(
migrateCreateCmd,
migrateUpCmd,
migrateVersionCmd,
migrateSetCmd,
migrateHistoryCmd,
)

migrateCmd.PersistentFlags().String(flagNameDirectory, "", "Directory that migration files placed (required)")
@@ -127,23 +135,40 @@ func migrateUp(c *cobra.Command, args []string) error {
}
defer client.Close()

dir := filepath.Join(c.Flag(flagNameDirectory).Value.String(), migrationsDirName)
migrations, err := spanner.LoadMigrations(dir)
if err != nil {
return &Error{
cmd: c,
err: err,
}
}

if err = client.EnsureMigrationTable(ctx, migrationTableName); err != nil {
return &Error{
cmd: c,
err: err,
}
}

dir := filepath.Join(c.Flag(flagNameDirectory).Value.String(), migrationsDirName)
migrations, err := spanner.LoadMigrations(dir)
status, err := client.DetermineUpgradeStatus(ctx, migrationTableName)
if err != nil {
return &Error{
cmd: c,
err: err,
}
}

return client.ExecuteMigrations(ctx, migrations, limit, migrationTableName)
switch status {
case spanner.ExistingMigrationsUpgradeStarted:
return client.UpgradeExecuteMigrations(ctx, migrations, limit, migrationTableName)
case spanner.ExistingMigrationsUpgradeCompleted:
return client.ExecuteMigrations(ctx, migrations, limit, migrationTableName)
default:
return &Error{
cmd: c,
err: errors.New("migration in undetermined state"),
}
}
}

func migrateVersion(c *cobra.Command, args []string) error {
@@ -180,6 +205,34 @@ func migrateVersion(c *cobra.Command, args []string) error {
return nil
}

func migrateHistory(c *cobra.Command, args []string) error {
ctx := context.Background()

client, err := newSpannerClient(ctx, c)
if err != nil {
return err
}
defer client.Close()

history, err := client.GetMigrationHistory(ctx, migrationTableName)
if err != nil {
return err
}
sort.SliceStable(history, func(i, j int) bool {
return history[i].Created.Before(history[j].Created) // order by Created
})

writer := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', tabwriter.AlignRight)
fmt.Fprintln(writer, "Version\tDirty\tCreated\tModified")
for i := range history {
h := history[i]
fmt.Fprintf(writer, "%d\t%v\t%v\t%v\n", h.Version, h.Dirty, h.Created, h.Modified)
}
writer.Flush()

return nil
}

func migrateSet(c *cobra.Command, args []string) error {
ctx := context.Background()

258 changes: 243 additions & 15 deletions pkg/spanner/client.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,9 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"sort"
"time"

"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
@@ -33,8 +35,20 @@ import (
)

const (
ddlStatementsSeparator = ";"
ddlStatementsSeparator = ";"
upgradeIndicator = "wrench_upgrade_indicator"
historyStr = "History"
FirstRun = UpgradeStatus("FirstRun")
ExistingMigrationsNoUpgrade = UpgradeStatus("NoUpgrade")
ExistingMigrationsUpgradeStarted = UpgradeStatus("Started")
ExistingMigrationsUpgradeCompleted = UpgradeStatus("Completed")
createUpgradeIndicatorFormatString = `CREATE TABLE %s (Dummy INT64 NOT NULL) PRIMARY KEY(Dummy)`
)
var (
createUpgradeIndicatorSql = fmt.Sprintf(createUpgradeIndicatorFormatString, upgradeIndicator)
)

type UpgradeStatus string

type table struct {
TableName string `spanner:"table_name"`
@@ -46,6 +60,13 @@ type Client struct {
spannerAdminClient *admin.DatabaseAdminClient
}

type MigrationHistoryRecord struct {
Version int64 `spanner:"Version"`
Dirty bool `spanner:"Dirty"`
Created time.Time `spanner:"Created"`
Modified time.Time `spanner:"Modified"`
}

func NewClient(ctx context.Context, config *Config) (*Client, error) {
opts := make([]option.ClientOption, 0)
if config.CredentialsFile != "" {
@@ -263,6 +284,99 @@ func (c *Client) ApplyPartitionedDML(ctx context.Context, statements []string) (

return numAffectedRows, nil
}
func (c *Client) UpgradeExecuteMigrations(ctx context.Context, migrations Migrations, limit int, tableName string) error {
err := c.backfillMigrations(ctx, migrations, tableName)
if err != nil {
return err
}

err = c.ExecuteMigrations(ctx, migrations, limit, tableName)
if err != nil {
return err
}

return c.markUpgradeComplete(ctx)
}

func (c *Client) backfillMigrations(ctx context.Context, migrations Migrations, tableName string) error {
v, d, err := c.GetSchemaMigrationVersion(ctx, tableName)
if err != nil {
return err
}

historyTableName := tableName + historyStr
_, err = c.spannerClient.ReadWriteTransaction(ctx, func(ctx context.Context, rw *spanner.ReadWriteTransaction) error {
for i := range migrations {
if v > migrations[i].Version {
if err := c.upsertVersionHistory(ctx, rw, int64(migrations[i].Version), false, historyTableName); err != nil {
return err
}
} else if v == migrations[i].Version {
if err := c.upsertVersionHistory(ctx, rw, int64(migrations[i].Version), d, historyTableName); err != nil {
return err
}
}
}

return nil
})
if err != nil {
return err
}

return nil
}

func (c *Client) upsertVersionHistory(ctx context.Context, rw *spanner.ReadWriteTransaction, version int64, dirty bool, historyTableName string) error {
_, err := rw.ReadRow(ctx, historyTableName, spanner.Key{version}, []string{"Version", "Dirty", "Created", "Modified"})
if err != nil {
// insert
if spanner.ErrCode(err) == codes.NotFound {
return rw.BufferWrite([]*spanner.Mutation{
spanner.Insert(historyTableName,
[]string{"Version", "Dirty", "Created", "Modified"},
[]interface{}{version, dirty, spanner.CommitTimestamp, spanner.CommitTimestamp})})
}
return err
}

// update
return rw.BufferWrite([]*spanner.Mutation{
spanner.Update(historyTableName,
[]string{"Version", "Dirty", "Modified"},
[]interface{}{version, dirty, spanner.CommitTimestamp})})
}

func (c *Client) markUpgradeComplete(ctx context.Context) error {
err := c.ApplyDDL(ctx, []string{"DROP TABLE " + upgradeIndicator})
if err != nil {
return &Error{
Code: ErrorCodeCompleteUpgrade,
err: err,
}
}

return nil
}

func (c *Client) GetMigrationHistory(ctx context.Context, versionTableName string) ([]MigrationHistoryRecord, error) {
history := make([]MigrationHistoryRecord, 0)
stmt := spanner.NewStatement("SELECT Version, Dirty, Created, Modified FROM " + versionTableName + historyStr)
err := c.spannerClient.Single().Query(ctx, stmt).Do(func(r *spanner.Row) error {
version := MigrationHistoryRecord{}
if err := r.ToStruct(&version); err != nil {
return err
}
history = append(history, version)

return nil
})
if err != nil {
return nil, err
}

return history, nil
}

func (c *Client) ExecuteMigrations(ctx context.Context, migrations Migrations, limit int, tableName string) error {
sort.Sort(migrations)
@@ -285,13 +399,25 @@ func (c *Client) ExecuteMigrations(ctx context.Context, migrations Migrations, l
}
}

history, err := c.GetMigrationHistory(ctx, tableName)
if err != nil {
return &Error{
Code: ErrorCodeExecuteMigrations,
err: err,
}
}
applied := make(map[int64]bool)
for i := range history {
applied[history[i].Version] = true
}

var count int
for _, m := range migrations {
if limit == 0 {
break
}

if m.Version <= version {
if applied[int64(m.Version)] {
continue
}

@@ -395,7 +521,11 @@ func (c *Client) SetSchemaMigrationVersion(ctx context.Context, version uint, di
[]interface{}{int64(version), dirty},
),
}
return tx.BufferWrite(m)
if err := tx.BufferWrite(m); err != nil {
return err
}

return c.upsertVersionHistory(ctx, tx, int64(version), dirty, tableName+historyStr)
})
if err != nil {
return &Error{
@@ -407,31 +537,129 @@ func (c *Client) SetSchemaMigrationVersion(ctx context.Context, version uint, di
return nil
}

func (c *Client) Close() error {
c.spannerClient.Close()
if err := c.spannerAdminClient.Close(); err != nil {
return &Error{
err: err,
Code: ErrorCodeCloseClient,
}
}

return nil
}

func (c *Client) EnsureMigrationTable(ctx context.Context, tableName string) error {
iter := c.spannerClient.Single().Read(ctx, tableName, spanner.AllKeys(), []string{"Version"})
fmtErr := func(err error) *Error {
return &Error{
Code: ErrorCodeEnsureMigrationTables,
err: err,
}
}
status, err := c.DetermineUpgradeStatus(ctx, tableName)
if err != nil {
return fmtErr(err)
}

switch status {
case FirstRun:
if err := c.createVersionTable(ctx, tableName); err != nil {
return fmtErr(err)
}
if err := c.createHistoryTable(ctx, tableName+historyStr); err != nil {
return fmtErr(err)
}
case ExistingMigrationsNoUpgrade:
if err := c.createUpgradeIndicatorTable(ctx); err != nil {
return fmtErr(err)
}
if err := c.createHistoryTable(ctx, tableName+historyStr); err != nil {
return fmtErr(err)
}
}

return nil
}

func (c *Client) DetermineUpgradeStatus(ctx context.Context, tableName string) (UpgradeStatus, error) {
stmt := spanner.NewStatement(`SELECT table_name FROM information_schema.tables WHERE table_catalog = '' AND table_schema = ''
AND table_name in (@version, @history, @indicator)`)
stmt.Params["version"] = tableName
stmt.Params["history"] = tableName + historyStr
stmt.Params["indicator"] = upgradeIndicator
iter := c.spannerClient.Single().Query(ctx, stmt)

tables := make(map[string]bool)
err := iter.Do(func(r *spanner.Row) error {
t := &table{}
if err := r.ToStruct(t); err != nil {
return err
}
tables[t.TableName] = true
return nil
})
if err == nil {
if err != nil {
return "", err
}

switch {
case len(tables) == 0:
return FirstRun, nil
case len(tables) == 1 && tables[tableName]:
return ExistingMigrationsNoUpgrade, nil
case len(tables) == 2 && tables[tableName] && tables[tableName+historyStr]:
return ExistingMigrationsUpgradeCompleted, nil
case len(tables) > 1 && tables[tableName] && tables[upgradeIndicator]:
return ExistingMigrationsUpgradeStarted, nil
default:
return "", fmt.Errorf("undetermined state of schema version tables %+v", tables)
}
}

func (c *Client) tableExists(ctx context.Context, tableName string) bool {
ri := c.spannerClient.Single().Query(ctx, spanner.Statement{
SQL: "SELECT table_name FROM information_schema.tables WHERE table_catalog = '' AND table_name = @table",
Params: map[string]interface{}{"table": tableName},
})
defer ri.Stop()
_, err := ri.Next()
return err != iterator.Done
}

func (c *Client) createHistoryTable(ctx context.Context, historyTableName string) error {
if c.tableExists(ctx, historyTableName) {
return nil
}

stmt := fmt.Sprintf(`CREATE TABLE %s (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL
) PRIMARY KEY(Version)`, tableName)
Dirty BOOL NOT NULL,
Created TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
Modified TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true)
) PRIMARY KEY(Version)`, historyTableName)

return c.ApplyDDL(ctx, []string{stmt})
}

func (c *Client) Close() error {
c.spannerClient.Close()
if err := c.spannerAdminClient.Close(); err != nil {
return &Error{
err: err,
Code: ErrorCodeCloseClient,
}
func (c *Client) createUpgradeIndicatorTable(ctx context.Context) error {
if c.tableExists(ctx, upgradeIndicator) {
return nil
}

return nil
stmt := fmt.Sprintf(createUpgradeIndicatorFormatString, upgradeIndicator)

return c.ApplyDDL(ctx, []string{stmt})
}

func (c *Client) createVersionTable(ctx context.Context, tableName string) error {
if c.tableExists(ctx, tableName) {
return nil
}

stmt := fmt.Sprintf(`CREATE TABLE %s (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL
) PRIMARY KEY(Version)`, tableName)

return c.ApplyDDL(ctx, []string{stmt})
}
215 changes: 208 additions & 7 deletions pkg/spanner/client_test.go
Original file line number Diff line number Diff line change
@@ -20,15 +20,14 @@
package spanner

import (
"cloud.google.com/go/spanner"
"context"
"fmt"
"github.com/google/uuid"
"google.golang.org/api/iterator"
"io/ioutil"
"os"
"testing"

"cloud.google.com/go/spanner"
"github.com/google/uuid"
"google.golang.org/api/iterator"
)

const (
@@ -212,6 +211,7 @@ func TestExecuteMigrations(t *testing.T) {
// ensure that only 000002.sql has been applied.
ensureMigrationColumn(t, ctx, client, "LastName", "STRING(MAX)", "YES")
ensureMigrationVersionRecord(t, ctx, client, 2, false)
ensureMigrationHistoryRecord(t, ctx, client, 2, false)

if err := client.ExecuteMigrations(ctx, migrations, len(migrations), migrationTable); err != nil {
t.Fatalf("failed to execute migration: %v", err)
@@ -220,6 +220,7 @@ func TestExecuteMigrations(t *testing.T) {
// ensure that 000003.sql and 000004.sql have been applied.
ensureMigrationColumn(t, ctx, client, "LastName", "STRING(MAX)", "NO")
ensureMigrationVersionRecord(t, ctx, client, 4, false)
ensureMigrationHistoryRecord(t, ctx, client, 4, false)

// ensure that schema is not changed and ExecuteMigrate is safely finished even though no migrations should be applied.
ensureMigrationColumn(t, ctx, client, "LastName", "STRING(MAX)", "NO")
@@ -257,6 +258,20 @@ func ensureMigrationColumn(t *testing.T, ctx context.Context, client *Client, co
}
}

func ensureMigrationHistoryRecord(t *testing.T, ctx context.Context, client *Client, version int64, dirty bool) {
history, err := client.GetMigrationHistory(ctx, migrationTable)
for i := range history {
if history[i].Version == version && history[i].Dirty == dirty{
return
}
}
if err != nil {
t.Fatalf("failed to get history: %v", err)
}

t.Errorf("(version %d, dirty %v) not found in history", version, dirty)
}

func ensureMigrationVersionRecord(t *testing.T, ctx context.Context, client *Client, version int64, dirty bool) {
t.Helper()

@@ -344,9 +359,6 @@ func TestSetSchemaMigrationVersion(t *testing.T) {
func TestEnsureMigrationTable(t *testing.T) {
ctx := context.Background()

client, done := testClientWithDatabase(t, ctx)
defer done()

tests := map[string]struct {
table string
}{
@@ -356,6 +368,9 @@ func TestEnsureMigrationTable(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
client, done := testClientWithDatabase(t, ctx)
defer done()

if err := client.EnsureMigrationTable(ctx, test.table); err != nil {
t.Fatalf("failed to ensure migration table: %v", err)
}
@@ -383,6 +398,190 @@ func TestEnsureMigrationTable(t *testing.T) {
}
})
}

t.Run("also creates history table", func(t *testing.T) {
client, done := testClientWithDatabase(t, ctx)
defer done()

if err := client.EnsureMigrationTable(ctx, migrationTable); err != nil {
t.Fatalf("failed to ensure migration table: %v", err)
}

if client.tableExists(ctx, migrationTable+historyStr) == false {
t.Fatal("failed to create history table")
}
})
}

func TestClient_DetermineUpgradeStatus(t *testing.T) {
type args struct {
tableName string
ddlStatement string
}
tests := []struct {
name string
args args
want UpgradeStatus
wantErr bool
}{
{
string(FirstRun),
args{"NonExistentTable", ""},
FirstRun,
false,
},
{
string(ExistingMigrationsNoUpgrade),
args{migrationTable, "DROP TABLE " + migrationTable + historyStr},
ExistingMigrationsNoUpgrade,
false,
},
{
string(ExistingMigrationsUpgradeStarted),
args{migrationTable, createUpgradeIndicatorSql},
ExistingMigrationsUpgradeStarted,
false,
},
{
string(ExistingMigrationsUpgradeCompleted),
args{migrationTable, ""},
ExistingMigrationsUpgradeCompleted,
false,
},
{
"UndeterminedState",
args{"NonExistentTable", createUpgradeIndicatorSql},
"",
true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
client, done := testClientWithDatabase(t, ctx)
defer done()

if tt.args.ddlStatement != "" {
err := client.ApplyDDL(ctx, []string{tt.args.ddlStatement})
if err != nil {
t.Error(err)
}
}

got, err := client.DetermineUpgradeStatus(ctx, tt.args.tableName)

if (err != nil) != tt.wantErr {
t.Errorf("DetermineUpgradeStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("DetermineUpgradeStatus() got = %v, want %v", got, tt.want)
}
})
}
}

func TestHotfixMigration(t *testing.T) {
ctx := context.Background()
client, done := testClientWithDatabase(t, ctx)
defer done()

// apply changes from "trunk": [100, 200]
migrations, err := LoadMigrations("testdata/hotfix/a")
if err != nil {
t.Fatalf("failed to load migrations: %v", err)
}
if err := client.ExecuteMigrations(ctx, migrations, len(migrations), migrationTable); err != nil {
t.Fatalf("failed to execute migration: %v", err)
}
history, err := client.GetMigrationHistory(ctx, migrationTable)
if err != nil {
t.Fatalf("failed to get migration history: %v", err)
}
if len(history) != 2 {
t.Errorf("incorrect history versions: %+v", history)
}
ensureMigrationHistoryRecord(t, ctx, client, 100, false)
ensureMigrationHistoryRecord(t, ctx, client, 200, false)

// apply changes from "hotfix" branch: [101]
migrations, err = LoadMigrations("testdata/hotfix/b")
if err != nil {
t.Fatalf("failed to load migrations: %v", err)
}
if err := client.ExecuteMigrations(ctx, migrations, len(migrations), migrationTable); err != nil {
t.Fatalf("failed to execute migration: %v", err)
}
history, err = client.GetMigrationHistory(ctx, migrationTable)
if err != nil {
t.Fatalf("failed to get migration history: %v", err)
}
if len(history) != 3 {
t.Errorf("incorrect history versions: %+v", history)
}
ensureMigrationHistoryRecord(t, ctx, client, 101, false)
}

func TestUpgrade(t *testing.T) {
t.Run("PriorMigrationsBackfilledInHistoryTable", func(t *testing.T) {
ctx := context.Background()
client, done := testClientWithDatabase(t, ctx)
defer done()

// run migrations
migrations, err := LoadMigrations("testdata/migrations")
if err != nil {
t.Fatalf("failed to load migrations: %v", err)
}
if err := client.ExecuteMigrations(ctx, migrations, len(migrations), migrationTable); err != nil {
t.Fatalf("failed to execute migration: %v", err)
}
expected, err := client.GetMigrationHistory(ctx, migrationTable)
if err != nil {
t.Fatalf("failed to get migration history: %v", err)
}

// clear history table
if err := client.ApplyDDL(ctx, []string{"DROP TABLE " + migrationTable + historyStr}); err != nil {
t.Fatalf("failed to drop migration history: %v", err)
}
if err := client.EnsureMigrationTable(ctx, migrationTable); err != nil {
t.Fatalf("failed to recreate migration table: %v", err)
}
if client.tableExists(ctx, upgradeIndicator) == false {
t.Error("upgrade indicator should exist")
}
if err := client.UpgradeExecuteMigrations(ctx, migrations, len(migrations), migrationTable); err != nil {
t.Fatalf("failed to execute migration: %v", err)
}

// history is backfilled
actual, err := client.GetMigrationHistory(ctx, migrationTable)
if err != nil {
t.Fatalf("failed to get migration history: %v", err)
}
if len(expected) != len(actual) {
t.Error("missing versions in backfilled history")
}
if client.tableExists(ctx, upgradeIndicator) == true {
t.Error("upgrade indicator should be dropped")
}

contains := func(m []MigrationHistoryRecord, v int64) bool {
for i := range m {
if m[i].Version == v {
return true
}
}
return false
}

if (contains(actual, 2) && contains(actual, 3) && contains(actual, 4)) == false {
t.Errorf("missing version in history table %+v", actual)
}
})

}

func testClientWithDatabase(t *testing.T, ctx context.Context) (*Client, func()) {
@@ -403,6 +602,7 @@ func testClientWithDatabase(t *testing.T, ctx context.Context) (*Client, func())
if database == "" {
id := uuid.New()
database = fmt.Sprintf("wrench-test-%s", id.String()[:8])
t.Log("creating " + database)
}

config := &Config{
@@ -431,5 +631,6 @@ func testClientWithDatabase(t *testing.T, ctx context.Context) (*Client, func())
if err := client.DropDatabase(ctx); err != nil {
t.Fatalf("failed to delete database: %v", err)
}
t.Log("dropped database " + database)
}
}
2 changes: 2 additions & 0 deletions pkg/spanner/errors.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,8 @@ const (
ErrorCodeNoMigration
ErrorCodeMigrationVersionDirty
ErrorCodeWaitOperation
ErrorCodeEnsureMigrationTables
ErrorCodeCompleteUpgrade
)

type Error struct {
1 change: 1 addition & 0 deletions pkg/spanner/testdata/hotfix/a/000100.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Singers ADD COLUMN LastName STRING(MAX);
1 change: 1 addition & 0 deletions pkg/spanner/testdata/hotfix/a/000200.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Singers ALTER COLUMN LastName STRING(MAX) NOT NULL;
1 change: 1 addition & 0 deletions pkg/spanner/testdata/hotfix/b/000101_hotfix.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE Singers SET LastName = "" WHERE LastName IS NULL;
11 changes: 11 additions & 0 deletions pkg/spanner/testdata/schema.sql
Original file line number Diff line number Diff line change
@@ -7,3 +7,14 @@ CREATE TABLE Singers (
SingerID STRING(36) NOT NULL,
FirstName STRING(1024),
) PRIMARY KEY(SingerID);

CREATE TABLE SchemaMigrationsHistory (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL,
Created TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
Modified TIMESTAMP NOT NULL OPTIONS (
allow_commit_timestamp = true
),
) PRIMARY KEY(Version);

0 comments on commit 6a440ad

Please sign in to comment.