From 4aafc01a690f83f02696686dedc6ad953ef09624 Mon Sep 17 00:00:00 2001 From: Linh Tran Tuan Date: Fri, 6 Aug 2021 21:10:59 +0900 Subject: [PATCH] NO-ISSUE Code refactoring (#13) --- balancer.go | 38 +++++--------- dbList.go | 68 ++++++++++++++++++++++++ db_wrapper.go | 35 +++++++++++++ errors.go | 21 ++++---- go.mod | 4 +- go.sum | 13 ++--- mssqlx.go | 91 +++++++++++++++++++------------- mssqlx_test.go | 58 ++------------------- retry.go | 19 +++---- types.go | 137 ------------------------------------------------- 10 files changed, 199 insertions(+), 285 deletions(-) create mode 100644 dbList.go create mode 100644 db_wrapper.go delete mode 100644 types.go diff --git a/balancer.go b/balancer.go index 49db813..3d3d06b 100644 --- a/balancer.go +++ b/balancer.go @@ -2,6 +2,7 @@ package mssqlx import ( "context" + "fmt" "sync/atomic" "time" ) @@ -22,7 +23,7 @@ type balancer struct { // new balancer and start health checkers func newBalancer(ctx context.Context, numHealthChecker int, numDbInstance int, isWsrep bool) *balancer { - if numHealthChecker <= 0 { + if numHealthChecker < 2 { numHealthChecker = 2 // at least two checkers } @@ -71,55 +72,42 @@ func (c *balancer) get(shouldBalancing bool) *wrapper { } // failure make a db node become failure and auto health tracking -func (c *balancer) failure(w *wrapper) { +func (c *balancer) failure(w *wrapper, err error) { if c.dbs.remove(w) { // remove this node - c.sendFailure(w) - } -} - -func (c *balancer) sendFailure(w *wrapper) { - select { - case <-c.ctx.Done(): - return + reportError(fmt.Sprintf("deactive connection:[%s] for health checking due to error", w.dsn), err) - case c.fail <- w: // give to health checker + select { + case <-c.ctx.Done(): + case c.fail <- w: + } } } // healthChecker daemon to check health of db connection func (c *balancer) healthChecker() { - doneCh := c.ctx.Done() - - var db *wrapper for { select { - case <-doneCh: + case <-c.ctx.Done(): return - case db = <-c.fail: + case db := <-c.fail: if ping(db) == nil && (!c.isWsrep || db.checkWsrepReady()) { c.dbs.add(db) continue + } else { + c.fail <- db } select { - case <-doneCh: + case <-c.ctx.Done(): return case <-time.After(time.Duration(c.getHealthCheckPeriod()) * time.Millisecond): } - - select { - case <-doneCh: - return - - case c.fail <- db: - } } } } func (c *balancer) destroy() { - c.dbs.clear() c.cancel() } diff --git a/dbList.go b/dbList.go new file mode 100644 index 0000000..c601293 --- /dev/null +++ b/dbList.go @@ -0,0 +1,68 @@ +package mssqlx + +import ( + "sync" + "sync/atomic" +) + +type dbList struct { + lk sync.RWMutex + list []*wrapper + currentIndex uint32 +} + +func (b *dbList) current() (w *wrapper) { + b.lk.RLock() + + if n := uint32(len(b.list)); n > 0 { + w = b.list[atomic.LoadUint32(&b.currentIndex)%n] + } + + b.lk.RUnlock() + + return +} + +func (b *dbList) next() (w *wrapper) { + b.lk.RLock() + + if n := uint32(len(b.list)); n > 0 { + w = b.list[atomic.AddUint32(&b.currentIndex, 1)%n] + } + + b.lk.RUnlock() + + return +} + +func (b *dbList) add(w *wrapper) { + if w != nil { + b.lk.Lock() + b.list = append(b.list, w) + b.lk.Unlock() + } +} + +func (b *dbList) remove(w *wrapper) (removed bool) { + if w != nil { + b.lk.Lock() + + n := len(b.list) + for i := 0; i < n; i++ { + if b.list[i] == w { // found + removed = true + + if i != n-1 { + b.list[i] = b.list[n-1] + } + b.list = b.list[:n-1] + + break + } + } + + b.lk.Unlock() + } + + return +} diff --git a/db_wrapper.go b/db_wrapper.go new file mode 100644 index 0000000..6f390fe --- /dev/null +++ b/db_wrapper.go @@ -0,0 +1,35 @@ +package mssqlx + +import ( + "github.com/jmoiron/sqlx" +) + +type wrapper struct { + db *sqlx.DB + dsn string +} + +func (w *wrapper) checkWsrepReady() bool { + type wsrepVariable struct { + VariableName string `db:"Variable_name"` + Value string `db:"Value"` + } + + var v wsrepVariable + + if err := w.db.Get(&v, "SHOW VARIABLES LIKE 'wsrep_on'"); err != nil { + reportError("SHOW VARIABLES LIKE 'wsrep_on'", err) + return false + } + + if v.Value != "ON" { + return true + } + + if err := w.db.Get(&v, "SHOW STATUS LIKE 'wsrep_ready'"); err != nil || v.Value != "ON" { + reportError("SHOW STATUS LIKE 'wsrep_ready'", err) + return false + } + + return true +} diff --git a/errors.go b/errors.go index 84dbb07..b43d660 100644 --- a/errors.go +++ b/errors.go @@ -3,18 +3,22 @@ package mssqlx import ( "database/sql" "database/sql/driver" + "errors" "fmt" "os" "strings" "time" "github.com/go-sql-driver/mysql" + "github.com/lib/pq" ) // check bad connection func isErrBadConn(err error) bool { if err != nil { - if err == driver.ErrBadConn || err == sql.ErrConnDone || err == mysql.ErrInvalidConn { + if errors.Is(err, driver.ErrBadConn) || + errors.Is(err, mysql.ErrInvalidConn) || + errors.Is(err, pq.ErrChannelNotOpen) { return true } @@ -42,7 +46,6 @@ func isErrCode(err error, code int) bool { } switch mErr := err.(type) { - case *mysql.MySQLError: return mErr.Number == uint16(code) @@ -52,16 +55,10 @@ func isErrCode(err error, code int) bool { } } -func parseError(w *wrapper, err error) error { - if err == nil { - return nil - } - - if w != nil && ping(w) != nil { - return ErrNetwork - } - - return err +func isStdErr(err error) bool { + return errors.Is(err, sql.ErrNoRows) || + errors.Is(err, sql.ErrTxDone) || + errors.Is(err, sql.ErrConnDone) } func reportError(v string, err error) { diff --git a/go.mod b/go.mod index c70f86e..8436e18 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module github.com/linxGnu/mssqlx require ( - github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-sql-driver/mysql v1.6.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/jmoiron/sqlx v1.3.4 github.com/lib/pq v1.10.2 - github.com/mattn/go-sqlite3 v1.14.7 github.com/stretchr/testify v1.7.0 - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index dd7f094..52a9a78 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,19 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w= github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= -github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -19,6 +21,5 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mssqlx.go b/mssqlx.go index 5ceb267..2e78480 100644 --- a/mssqlx.go +++ b/mssqlx.go @@ -8,13 +8,11 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" "github.com/jmoiron/sqlx" ) var ( - // ErrNetwork networking error - ErrNetwork = errors.New("network error/connection refused") - // ErrNoConnection there is no connection to db ErrNoConnection = errors.New("no connection available") @@ -524,16 +522,21 @@ func getDBFromBalancer(target *balancer) (db *wrapper, err error) { return } -func shouldFailure(w *wrapper, isWsrep bool, err error) bool { - if err = parseError(w, err); err == nil { - return false +func shouldFailure(w *wrapper, isWsrep bool, err error) (bool, error) { + if err == nil || isStdErr(err) { + return false, err + } + + if isWsrep && IsWsrepNotReady(err) { + return true, err } - if err == ErrNetwork || (isWsrep && IsWsrepNotReady(err)) { - return true + if e := ping(w); e != nil { + err = multierror.Append(err, e).ErrorOrNil() + return true, err } - return false + return false, err } func _namedQuery(ctx context.Context, target *balancer, query string, arg interface{}) (res *sqlx.Rows, err error) { @@ -556,8 +559,9 @@ func _namedQuery(ctx context.Context, target *balancer, query string, arg interf } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -610,8 +614,9 @@ func _namedExec(ctx context.Context, target *balancer, query string, arg interfa } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -664,8 +669,9 @@ func _query(ctx context.Context, target *balancer, query string, args ...interfa } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -723,8 +729,9 @@ func _queryx(ctx context.Context, target *balancer, query string, args ...interf } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -851,8 +858,9 @@ func _select(ctx context.Context, target *balancer, dest interface{}, query stri }) // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -904,8 +912,9 @@ func _get(ctx context.Context, target *balancer, dest interface{}, query string, }) // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -967,8 +976,9 @@ func _exec(ctx context.Context, target *balancer, query string, args ...interfac } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -1017,8 +1027,9 @@ func _prepareContext(ctx context.Context, target *balancer, query string) (dbx * } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -1084,8 +1095,9 @@ func _preparexContext(ctx context.Context, target *balancer, query string) (dbx } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -1155,8 +1167,9 @@ func _prepareNamedContext(ctx context.Context, target *balancer, query string) ( } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -1205,8 +1218,9 @@ func _mustExec(ctx context.Context, target *balancer, query string, args ...inte } // check networking/wsrep error - if shouldFailure(w, target.isWsrep, err) { - target.failure(w) + var should bool + if should, err = shouldFailure(w, target.isWsrep, err); should { + target.failure(w, err) continue } @@ -1317,8 +1331,9 @@ func (dbs *DBs) BeginTx(ctx context.Context, opts *sql.TxOptions) (res *Tx, err } // check networking/wsrep error - if shouldFailure(w, dbs.masters.isWsrep, err) { - dbs.masters.failure(w) + var should bool + if should, err = shouldFailure(w, dbs.masters.isWsrep, err); should { + dbs.masters.failure(w, err) continue } @@ -1352,8 +1367,9 @@ func (dbs *DBs) Beginx() (res *Txx, err error) { } // check networking/wsrep error - if shouldFailure(w, dbs.masters.isWsrep, err) { - dbs.masters.failure(w) + var should bool + if should, err = shouldFailure(w, dbs.masters.isWsrep, err); should { + dbs.masters.failure(w, err) continue } @@ -1392,8 +1408,9 @@ func (dbs *DBs) BeginTxx(ctx context.Context, opts *sql.TxOptions) (res *Txx, er } // check networking/wsrep error - if shouldFailure(w, dbs.masters.isWsrep, err) { - dbs.masters.failure(w) + var should bool + if should, err = shouldFailure(w, dbs.masters.isWsrep, err); should { + dbs.masters.failure(w, err) continue } diff --git a/mssqlx_test.go b/mssqlx_test.go index 4d6b153..6e119e1 100644 --- a/mssqlx_test.go +++ b/mssqlx_test.go @@ -24,17 +24,14 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" - _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" ) var TestWPostgres = false // test with postgres? -var TestWSqlite = false // test with sqlite? var TestWMysql = false // test with mysql? var myDBs *DBs var pgDBs *DBs -var sqDBs *DBs func init() { ConnectMasterSlave() @@ -43,11 +40,9 @@ func init() { func ConnectMasterSlave() { pgdsn := os.Getenv("MSSQLX_POSTGRES_DSN") mydsn := os.Getenv("MSSQLX_MYSQL_DSN") - sqdsn := os.Getenv("MSSQLX_SQLITE_DSN") TestWPostgres = pgdsn != "skip" && pgdsn != "" TestWMysql = mydsn != "skip" && mydsn != "" - TestWSqlite = sqdsn != "skip" && sqdsn != "" if !strings.Contains(mydsn, "parseTime=true") { mydsn += "?parseTime=true" @@ -61,10 +56,6 @@ func ConnectMasterSlave() { fmt.Println("Disabling MySQL tests.") } - if !TestWSqlite { - fmt.Println("Disabling SQLite tests.") - } - if TestWPostgres { masterDSNs := []string{pgdsn, pgdsn} slaveDSNs := []string{pgdsn, pgdsn} @@ -82,15 +73,6 @@ func ConnectMasterSlave() { myDBs.SetMaxOpenConns(10) myDBs.SetConnMaxLifetime(3 * time.Millisecond) } - - if TestWSqlite { - masterDSNs := []string{sqdsn, sqdsn} - slaveDSNs := []string{sqdsn, sqdsn} - sqDBs, _ = ConnectMasterSlaves("sqlite3", masterDSNs, slaveDSNs) - sqDBs.SetMaxIdleConns(2) - sqDBs.SetMaxOpenConns(10) - pgDBs.SetConnMaxLifetime(3 * time.Millisecond) - } } type Schema struct { @@ -224,10 +206,6 @@ func _RunWithSchema(schema Schema, t *testing.T, test func(db *DBs, t *testing.T create, drop := schema.Postgres() runner(pgDBs, t, create, drop) } - if TestWSqlite { - create, drop := schema.Sqlite3() - runner(sqDBs, t, create, drop) - } if TestWMysql { create, drop := schema.MySQL() runner(myDBs, t, create, drop) @@ -268,34 +246,14 @@ func _loadDefaultFixture(db *DBs, t *testing.T) { } } -func TestParseError(t *testing.T) { - err := parseError(nil, nil) - if err != nil { - t.Fatal(err) - } - - errT := fmt.Errorf("abc") - if err = parseError(nil, errT); err != errT { - t.Fatal(err) - } - - db, _ := sqlx.Open("postgres", "user=test1 dbname=test2 sslmode=disable") - - errT = fmt.Errorf("abc") - if err = parseError(&wrapper{db: db, dsn: "user=test1 dbname=test2 sslmode=disable"}, errT); err != ErrNetwork { - t.Fatal(err) - } -} - func (c *balancer) size() int { return c.dbs.size() } func (b *dbList) size() (v int) { - list, stored := b.list.Load().([]*wrapper) - if stored { - v = len(list) - } + b.lk.RLock() + v = len(b.list) + b.lk.RUnlock() return } @@ -345,21 +303,18 @@ func TestDbBalancer(t *testing.T) { if x := dbB.get(false); x.db != db3 { t.Fatal("DbBalancer: get fail") } else { - dbB.failure(x) + dbB.failure(x, nil) if dbB.size() != 3 || (&wrapper{db: db3, dsn: dsn}).checkWsrepReady() { t.Fatal("DbBalancer: failure fail") } - dbB.failure(nil) + dbB.failure(nil, nil) if dbB.size() != 3 || (&wrapper{db: db3, dsn: dsn}).checkWsrepReady() { t.Fatal("DbBalancer: failure fail") } } dbB.destroy() - if dbB.size() != 0 { - t.Fatal("DbBalancer: destroy fail") - } } func TestConnectMasterSlave(t *testing.T) { @@ -1920,9 +1875,6 @@ func TestStressQueries(t *testing.T) { } limit := 4 - if db == sqDBs { - limit = 2 - } wg.Add(limit) for i := 0; i < limit; i++ { diff --git a/retry.go b/retry.go index be98e65..70b143c 100644 --- a/retry.go +++ b/retry.go @@ -1,26 +1,21 @@ package mssqlx import ( - "database/sql" "time" ) func retryFunc(query string, f func() (interface{}, error)) (result interface{}, err error) { for retry := 0; retry < 50; retry++ { - if result, err = f(); err == nil { + result, err = f() + if err == nil || isStdErr(err) { return } - switch err { - case sql.ErrTxDone, sql.ErrNoRows: - return - - default: - if isErrBadConn(err) || IsDeadlock(err) { - time.Sleep(5 * time.Millisecond) - } else { - return - } + if isErrBadConn(err) || IsDeadlock(err) { + time.Sleep(5 * time.Millisecond) + continue + } else { + break } } diff --git a/types.go b/types.go deleted file mode 100644 index 7235e31..0000000 --- a/types.go +++ /dev/null @@ -1,137 +0,0 @@ -package mssqlx - -import ( - "runtime" - "sync/atomic" - - "github.com/jmoiron/sqlx" -) - -type wrapper struct { - db *sqlx.DB - dsn string -} - -func (w *wrapper) checkWsrepReady() bool { - type wsrepVariable struct { - VariableName string `db:"Variable_name"` - Value string `db:"Value"` - } - - var v wsrepVariable - - if err := w.db.Get(&v, "SHOW VARIABLES LIKE 'wsrep_on'"); err != nil { - reportError("SHOW VARIABLES LIKE 'wsrep_on'", err) - return false - } - - if v.Value != "ON" { - return true - } - - if err := w.db.Get(&v, "SHOW STATUS LIKE 'wsrep_ready'"); err != nil || v.Value != "ON" { - reportError("SHOW STATUS LIKE 'wsrep_ready'", err) - return false - } - - return true -} - -var empty = []*wrapper{} - -type dbList struct { - list atomic.Value // []*wrapper - _ [9]uint64 // prevent false sharing - state int32 - _ [9]uint64 - currentIndex uint32 - _ [9]uint64 -} - -func (b *dbList) current() (w *wrapper) { - list, stored := b.list.Load().([]*wrapper) - if stored { - if n := uint32(len(list)); n > 0 { - w = list[atomic.LoadUint32(&b.currentIndex)%n] - } - } - return -} - -func (b *dbList) next() (w *wrapper) { - list, stored := b.list.Load().([]*wrapper) - if stored { - if n := uint32(len(list)); n > 0 { - w = list[atomic.AddUint32(&b.currentIndex, 1)%n] - } - } - return -} - -func (b *dbList) add(w *wrapper) { - if w != nil { - for { - if atomic.CompareAndSwapInt32(&b.state, 0, 1) { // lock first - list, stored := b.list.Load().([]*wrapper) - if !stored { - list = make([]*wrapper, 0, 8) - } else { - n := len(list) - newList := make([]*wrapper, n, n+1) - copy(newList, list) // copy-on-write - list = newList - } - - // append to list - list = append(list, w) - - // store back - b.list.Store(list) - - atomic.CompareAndSwapInt32(&b.state, 1, 0) - return - } - runtime.Gosched() - } - } -} - -func (b *dbList) remove(w *wrapper) (removed bool) { - if w != nil { - for { - if atomic.CompareAndSwapInt32(&b.state, 0, 1) { // lock first - list, stored := b.list.Load().([]*wrapper) - if stored { - if n := len(list); n > 0 { - for i := range list { - if list[i] == w { // found - removed = true - - newList := make([]*wrapper, n-1, n) - if i > 0 { - copy(newList, list[:i]) - } - if i < n-1 { - copy(newList[i:], list[i+1:]) - } - - b.list.Store(newList) - break - } - } - } - } - - atomic.CompareAndSwapInt32(&b.state, 1, 0) - return - } - runtime.Gosched() - } - } - return -} - -func (b *dbList) clear() { - atomic.StoreUint32(&b.currentIndex, 0) - b.list.Store(empty) -}