Skip to content

Commit

Permalink
bugfix: data may lost when using batch update meets duplicate error
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed Mar 27, 2020
1 parent 7e649a8 commit aa42830
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 15 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2020-03-27 Alibaba Cloud.
* version: 2.0.10
* BUGFIX: data may lost when using batch update meets duplicate error.

2020-03-15 Alibaba Cloud.
* version: 2.0.9
* BUGFIX: transform namespace bug. see #302. thanks monkeyWie.
Expand Down
47 changes: 47 additions & 0 deletions src/mongoshake/common/db_opertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,50 @@ func SortDBRef(input bson.M) bson.D {
}
return output
}

// used to handle bulk return error
func FindFirstErrorIndexAndMessage(error string) (int, string, bool) {
subIndex := "index["
subMsg := "msg["
subDup := "dup["
index := strings.Index(error, subIndex)
if index == -1 {
return index, "", false
}

indexVal := 0
for i := index + len(subIndex); i < len(error) && error[i] != ']'; i++ {
// fmt.Printf("%c %d\n", rune(error[i]), int(error[i] - '0'))
indexVal = indexVal * 10 + int(error[i] - '0')
}

index = strings.Index(error, subMsg)
if index == -1 {
return indexVal, "", false
}

i := index + len(subMsg)
stack := 0
for ; i < len(error); i++ {
if error[i] == ']' {
if stack == 0 {
break
} else {
stack -= 1
}
} else if error[i] == '[' {
stack += 1
}
}
msg := error[index + len(subMsg): i]

index = strings.Index(error, subDup)
if index == -1 {
return indexVal, msg, false
}
i = index + len(subMsg)
for ; i < len(error) && error[i] != ']'; i++ {}
dupVal := error[index + len(subMsg):i]

return indexVal, msg, dupVal == "true"
}
13 changes: 9 additions & 4 deletions src/mongoshake/executor/db_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,14 @@ func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.M,
}

if _, err := bulk.Run(); err != nil {
// parse error
index, errMsg, dup := utils.FindFirstErrorIndexAndMessage(err.Error())
LOG.Error("detail error info with index[%v] msg[%v] dup[%v]", index, errMsg, dup)
if mgo.IsDup(err) {
HandleDuplicated(bw.session.DB(database).C(collection), oplogs, OpUpdate)
return nil
// create single writer to write one by one
sw := NewDbWriter(bw.session, bson.M{}, false)
return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
}
return fmt.Errorf("doUpdate run upsert/update[%v] failed[%v]", upsert, err)
}
Expand Down Expand Up @@ -506,7 +511,7 @@ func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.M,
errMsgs = append(errMsgs, errMsg)
}

LOG.Debug("writer: upsert %v", log.original.partialLog.Dump(nil, true))
LOG.Debug("writer: upsert %v", log.original.partialLog)
}
} else {
for _, log := range oplogs {
Expand All @@ -519,7 +524,7 @@ func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.M,
err := collectionHandle.Update(log.original.partialLog.Query, newObject)
if err != nil {
if utils.IsNotFound(err) {
LOG.Warn("doUpdate[update] data[%v] not found", log.original.partialLog.Query)
return fmt.Errorf("doUpdate[update] data[%v] not found", log.original.partialLog.Query)
} else if mgo.IsDup(err) {
HandleDuplicated(collectionHandle, oplogs, OpUpdate)
} else {
Expand All @@ -529,7 +534,7 @@ func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.M,
}
}

LOG.Debug("writer: update %v", log.original.partialLog.Dump(nil, true))
LOG.Debug("writer: update %v", log.original.partialLog)
}
}

Expand Down
285 changes: 285 additions & 0 deletions src/mongoshake/executor/db_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package executor

import (
"testing"
"fmt"

"mongoshake/common"
"mongoshake/oplog"

"github.com/vinllen/mgo/bson"
"github.com/vinllen/mgo"
"github.com/stretchr/testify/assert"
)

const (
testMongoAddress = "100.81.164.186:31771,100.81.164.186:31772,100.81.164.186:31773"
testDb = "test"
testCollection = "test"
)

func mockOplogRecord(oId, oX int, o2Id int) *OplogRecord {
or := &OplogRecord{
original: &PartialLogWithCallbak{
partialLog: &oplog.PartialLog{
Object: bson.D{
bson.DocElem{
Name: "_id",
Value: oId,
},
bson.DocElem{
Name: "x",
Value: oX,
},
},
},
},
}

if o2Id != -1 {
or.original.partialLog.Query = bson.M{
"_id": o2Id,
}
}

return or
}

func TestBulkWriter(t *testing.T) {
// test bulk writer

var nr int

// basic test
{
fmt.Printf("TestBulkWriter case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, "primary", true)
assert.Equal(t, nil, err, "should be equal")

// drop database
err = conn.Session.DB(testDb).DropDatabase()
assert.Equal(t, nil, err, "should be equal")

writer := NewDbWriter(conn.Session, bson.M{}, true)

// 1-5
inserts := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
mockOplogRecord(3, 3, -1),
mockOplogRecord(4, 4, -1),
mockOplogRecord(5, 5, -1),
}

// write 1
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts, false)
assert.Equal(t, nil, err, "should be equal")

// 4-8
inserts = []*OplogRecord{
mockOplogRecord(4, 4, -1),
mockOplogRecord(5, 5, -1),
mockOplogRecord(6, 6, -1),
mockOplogRecord(7, 7, -1),
mockOplogRecord(8, 8, -1),
}

// write 1
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts, false)
assert.Equal(t, nil, err, "should be equal")

// query
result := make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 8, len(result), "should be equal")
assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal")
assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal")
assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal")
assert.Equal(t, 4, result[3].(bson.M)["x"], "should be equal")
assert.Equal(t, 5, result[4].(bson.M)["x"], "should be equal")
assert.Equal(t, 6, result[5].(bson.M)["x"], "should be equal")
assert.Equal(t, 7, result[6].(bson.M)["x"], "should be equal")
assert.Equal(t, 8, result[7].(bson.M)["x"], "should be equal")

// 8-10
inserts = []*OplogRecord{
mockOplogRecord(8, 80, -1),
mockOplogRecord(9, 90, -1),
mockOplogRecord(10, 100, -1),
}

// write 1
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts, true)
assert.Equal(t, nil, err, "should be equal")

// query
result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 10, len(result), "should be equal")
assert.Equal(t, 80, result[7].(bson.M)["x"], "should be equal")
assert.Equal(t, 90, result[8].(bson.M)["x"], "should be equal")
assert.Equal(t, 100, result[9].(bson.M)["x"], "should be equal")

// delete 8-11
deletes := []*OplogRecord{
mockOplogRecord(8, 80, -1),
mockOplogRecord(9, 90, -1),
mockOplogRecord(10, 100, -1),
mockOplogRecord(11, 110, -1), // not found
}
err = writer.doDelete(testDb, testCollection, bson.M{}, deletes)
assert.Equal(t, nil, err, "should be equal") // won't throw error if not found

result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 7, len(result), "should be equal")
}

// bulk update, delete
{
fmt.Printf("TestBulkWriter case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, "primary", true)
assert.Equal(t, nil, err, "should be equal")

// drop database
err = conn.Session.DB(testDb).DropDatabase()
assert.Equal(t, nil, err, "should be equal")

writer := NewDbWriter(conn.Session, bson.M{}, true)

// 1-5
inserts := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
mockOplogRecord(3, 3, -1),
mockOplogRecord(4, 4, -1),
mockOplogRecord(5, 5, -1),
}

// write 1
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts, false)
assert.Equal(t, nil, err, "should be equal")

// update not exist
updates := []*OplogRecord{
mockOplogRecord(5, 50, 5),
mockOplogRecord(10, 100, 10),
mockOplogRecord(11, 110, 11),
}

// not work
err = writer.doUpdate(testDb, testCollection, bson.M{}, updates, false)
assert.Equal(t, nil, err, "should be equal")

result := make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 5, len(result), "should be equal")
assert.Equal(t, 50, result[4].(bson.M)["x"], "should be equal")

// updates
updates = []*OplogRecord{
mockOplogRecord(4, 40, 4),
mockOplogRecord(10, 100, 10),
mockOplogRecord(11, 110, 11),
}

// upsert
err = writer.doUpdate(testDb, testCollection, bson.M{}, updates, true)
assert.Equal(t, nil, err, "should be equal")

result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 7, len(result), "should be equal")
assert.Equal(t, 40, result[3].(bson.M)["x"], "should be equal")
assert.Equal(t, 50, result[4].(bson.M)["x"], "should be equal")
assert.Equal(t, 100, result[5].(bson.M)["x"], "should be equal")
assert.Equal(t, 110, result[6].(bson.M)["x"], "should be equal")

// deletes
deletes := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
mockOplogRecord(999, 999, -1), // not exist
}

err = writer.doDelete(testDb, testCollection, bson.M{}, deletes)
result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 5, len(result), "should be equal")
}

// bulk update, delete
{
fmt.Printf("TestBulkWriter case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, "primary", true)
assert.Equal(t, nil, err, "should be equal")

// drop database
err = conn.Session.DB(testDb).DropDatabase()
assert.Equal(t, nil, err, "should be equal")

writer := NewDbWriter(conn.Session, bson.M{}, true)

// 1-5
inserts := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
mockOplogRecord(3, 3, -1),
mockOplogRecord(4, 4, -1),
mockOplogRecord(5, 5, -1),
}

// write 1
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts, false)
assert.Equal(t, nil, err, "should be equal")

// build index
err = conn.Session.DB(testDb).C(testCollection).EnsureIndex(mgo.Index{
Key: []string{"x"},
Unique: true,
})
assert.Equal(t, nil, err, "should be equal")

// updates
updates := []*OplogRecord{
mockOplogRecord(3, 5, 3), // dup
mockOplogRecord(10, 100, 10),
mockOplogRecord(11, 110, 11),
}

// upsert = false
err = writer.doUpdate(testDb, testCollection, bson.M{}, updates, false)
assert.NotEqual(t, nil, err, "should be equal")
fmt.Println(err)

result := make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 5, len(result), "should be equal")
assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal")

// upsert
err = writer.doUpdate(testDb, testCollection, bson.M{}, updates, true)
assert.Equal(t, nil, err, "should be equal")

result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 7, len(result), "should be equal")
assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal")
assert.Equal(t, 100, result[5].(bson.M)["x"], "should be equal")
assert.Equal(t, 110, result[6].(bson.M)["x"], "should be equal")
}
}
Loading

0 comments on commit aa42830

Please sign in to comment.