Skip to content

Commit

Permalink
Merge branch 'main' into hnsw_dev
Browse files Browse the repository at this point in the history
  • Loading branch information
cpegeric committed Jan 28, 2025
2 parents 0e30165 + aace63a commit 3bf59a6
Show file tree
Hide file tree
Showing 241 changed files with 10,569 additions and 3,117 deletions.
10 changes: 5 additions & 5 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestBackupData(t *testing.T) {
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)

obj := testutil.GetOneObject(rel)
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
obj := testutil.GetOneBlockMeta(rel)
id := obj.AsCommonID()
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
require.NoError(t, err)
deletedRows = 1
Expand All @@ -118,7 +118,7 @@ func TestBackupData(t *testing.T) {
v := testutil.GetSingleSortKeyValue(data, schema, 2)
filter := handle.NewEQFilter(v)
err := rel.DeleteByFilter(context.Background(), filter)
assert.NoError(t, err)
assert.NoError(t, err, v)
assert.NoError(t, txn.Commit(context.Background()))
}
backupTime := time.Now().UTC()
Expand Down Expand Up @@ -472,8 +472,8 @@ func TestBackupData5(t *testing.T) {
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)

obj := testutil.GetOneObject(rel)
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
obj := testutil.GetOneBlockMeta(rel)
id := obj.AsCommonID()
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
require.NoError(t, err)
deletedRows = 1
Expand Down
9 changes: 8 additions & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ func execBackup(
}()
now := time.Now()
baseTS := ts
// When rewriting the checkpoint and trimming the aobject,
// you need to collect the atombstone in the last checkpoint
// Before this, only the last special checkpoint needs to be collected
var lastData *logtail.CheckpointData
for i, name := range names {
if len(name) == 0 {
continue
Expand Down Expand Up @@ -322,6 +326,9 @@ func execBackup(
}
defer data.Close()
oNames = append(oNames, oneNames...)
if i == len(names)-1 {
lastData = data
}
}
loadDuration += time.Since(now)

Expand Down Expand Up @@ -402,7 +409,7 @@ func execBackup(
tnLocation objectio.Location
)
cnLocation, tnLocation, checkpointFiles, err = logtail.ReWriteCheckpointAndBlockFromKey(ctx, sid, srcFs, dstFs,
cnLocation, uint32(version), start)
cnLocation, lastData, uint32(version), start)
for _, name := range checkpointFiles {
dentry, err := dstFs.StatFile(ctx, name)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cdc/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,11 @@ func (s *mysqlSinker) appendSqlBuf(rowType RowType) (err error) {
// if s.sqlBuf has no enough space
if len(s.sqlBuf)+len(s.rowBuf)+suffixLen > cap(s.sqlBuf) {
// complete sql statement
if rowType == InsertRow {
if s.isNonEmptyInsertStmt() {
s.sqlBuf = appendString(s.sqlBuf, ";")
s.preSqlBufLen = len(s.sqlBuf)
} else {
}
if s.isNonEmptyDeleteStmt() {
s.sqlBuf = appendString(s.sqlBuf, ");")
s.preSqlBufLen = len(s.sqlBuf)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ func (s *service) initProcessCodecService() {
s._txnClient,
s.fileService,
s.lockService,
s.partitionService,
s.queryClient,
s._hakeeperClient,
s.udfService,
Expand Down
54 changes: 47 additions & 7 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,14 @@ const (
ErrTxnCannotRetry uint16 = 20630
ErrTxnNeedRetryWithDefChanged uint16 = 20631
ErrTxnStale uint16 = 20632
ErrRetryForCNRollingRestart uint16 = 20633
ErrNewTxnInCNRollingRestart uint16 = 20634
ErrPrevCheckpointNotFinished uint16 = 20635
ErrCantDelGCChecker uint16 = 20636
ErrTxnUnknown uint16 = 20637
ErrTxnControl uint16 = 20638
// ErrRetryForCNRollingRestart rolling upgrade related, do not modify
ErrRetryForCNRollingRestart uint16 = 20634
// ErrNewTxnInCNRollingRestart rolling upgrade related, do not modify
ErrNewTxnInCNRollingRestart uint16 = 20635
ErrPrevCheckpointNotFinished uint16 = 20636
ErrCantDelGCChecker uint16 = 20637
ErrTxnUnknown uint16 = 20638
ErrTxnControl uint16 = 20639

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
Expand Down Expand Up @@ -569,7 +571,7 @@ type Error struct {
}

func (e *Error) Error() string {
return e.message
return e.Display()
}

func (e *Error) Detail() string {
Expand All @@ -595,6 +597,10 @@ func (e *Error) SqlState() string {
return e.sqlState
}

func (e *Error) SetDetail(detail string) {
e.detail = detail
}

var _ encoding.BinaryMarshaler = new(Error)

func (e *Error) MarshalBinary() ([]byte, error) {
Expand Down Expand Up @@ -638,6 +644,40 @@ func IsMoErrCode(e error, rc uint16) bool {
return me.code == rc
}

func GetMoErrCode(e error) (uint16, bool) {
if e == nil {
return 0, false
}

me, ok := e.(*Error)
if !ok {
return 0, false
}

return me.code, true
}

func IsSameMoErr(a error, b error) bool {
if a == nil || b == nil {
return false
}

var (
ok bool
aa, bb *Error
)

if aa, ok = a.(*Error); !ok {
return false
}

if bb, ok = b.(*Error); !ok {
return false
}

return aa.code == bb.code
}

func DowncastError(e error) *Error {
if err, ok := e.(*Error); ok {
return err
Expand Down
50 changes: 50 additions & 0 deletions pkg/common/moerr/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,53 @@ func TestEncoding(t *testing.T) {
require.Nil(t, err)
require.Equal(t, e, e2)
}

type fakeErr struct {
}

func (f *fakeErr) Error() string {
return "fake error"
}

func TestIsSameMoErr(t *testing.T) {
var a, b error
require.False(t, IsSameMoErr(a, b))

_, ok := GetMoErrCode(a)
require.False(t, ok)

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(a)
require.False(t, ok)

b = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = GetOkExpectedEOB()
require.False(t, IsSameMoErr(a, b))

code, ok := GetMoErrCode(a)
require.True(t, ok)
require.Equal(t, OkExpectedEOB, code)

b = GetOkExpectedDup()
require.False(t, IsSameMoErr(a, b))

code, ok = GetMoErrCode(b)
require.True(t, ok)
require.Equal(t, OkExpectedDup, code)

b = nil
require.False(t, IsSameMoErr(a, b))

b = GetOkExpectedEOB()
require.True(t, IsSameMoErr(a, b))
}
19 changes: 10 additions & 9 deletions pkg/common/morpc/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,21 +808,22 @@ func TestCannotBusyLoopIfWriteCIsFull(t *testing.T) {
return conn.Write(msg, goetty.WriteOptions{Flush: true})
},
func(b *remoteBackend) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
req := newTestMessage(1)
f, err := b.Send(ctx, req)
if err == nil { //ignore timeout
_, err = f.Get()
assert.NoError(t, err)
}
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
req := newTestMessage(0)
f, err := b.Send(ctx, req)
if err == nil { //ignore timeout
_, _ = f.Get()
f.Close()
}
}()
}
}()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/container/nulls/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
)

type Bitmap = Nulls
type Grouping = Nulls

type Nulls struct {
np bitmap.Bitmap
Expand Down
4 changes: 2 additions & 2 deletions pkg/container/vector/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func CollectOffsetsByBetweenFactory[T types.BuiltinNumber | types.Times | types.
//return cols[i] > rval
return cmpRight(cols[i], rval)
})
if start == end {
if start >= end {
return nil
}
sels := make([]int64, end-start)
Expand Down Expand Up @@ -695,7 +695,7 @@ func CollectOffsetsByBetweenString(lval, rval string, hint int) func(*Vector) []
//return cols[i] > rval
return cmpRight(col[i].UnsafeGetString(area), rval)
})
if start == end {
if start >= end {
return nil
}
sels := make([]int64, end-start)
Expand Down
4 changes: 2 additions & 2 deletions pkg/container/vector/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Vector struct {
capacity int
length int

nsp nulls.Nulls // nulls list
gsp nulls.Grouping // grouping list
nsp nulls.Nulls // nulls list
gsp nulls.Nulls // grouping list

cantFreeData bool
cantFreeArea bool
Expand Down
Loading

0 comments on commit 3bf59a6

Please sign in to comment.