-
Notifications
You must be signed in to change notification settings - Fork 279
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial commit of LogService and HAKeeper (#2970)
* logservice: initial commit * logservice: added LogShardManager * logservice: renamed service.go -> shards.go * logservice: added hakeeper and tests * logservice: some cleanups * logservice: added stateMachine.LeaseHistory * logservice: added the ability to filter internal entries * logservice: fixed various issues in LogStore * logservice: updated store APIs to expose LogRecord and LSN * logservice: pass meta info to NodeHost * logservice: allow NodeHostID to be specified * logservice: added the ability to query serviceAddress * logservice: added the ability to query shard info * logservice: minor update * logservice: truncate log asynchronously in a worker goroutine * logservice: fixed copyright notice * logservice: added heartbeat message ready to be sent to HAKeeper * logservice: use gogofaster to generate .pb.go * logservice: minor update to reflect dragonboat v4 changes * logservice: added pb/rpc * logservice: added server side rpc support * logservice: bumped cube version to avoid pebble version conflicts * logservice: added error handling * logservice: more error handling * logservice: added some tests * logservice: minor fixess for errors.go * logservice: added DN heartbeat messages * logservice: minor cleanups * logservice: minor fix for error handling * logservice: moved hakeeper code to its own directory * logservice: made logservice rsm a IStateMachine * hakeeper: use Lookup() for querying shard ID * logservice: move pb file to matrixone/proto, some other cleanups * hakeeper: added hakeeper/state.go * logservice: added methods for updating hakeeper * hakeeper: minor fix, added some tests * logservice: added ticker for hakeeper replica * logservice: minor fix for service.go * logservice: fixed some issues in transport.go * logservice: added client.go * logservice: minor refactoring * logservice: added client_test.go, fixed a few bugs * logservice: readonly clients now reject write requests * logservice: refactored query shard info API * logservice: added tests for Servide.GetShardInfo * logservice: fixed a few static-check reported issues
- Loading branch information
Showing
27 changed files
with
7,609 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
// Copyright 2021 - 2022 Matrix Origin | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package hakeeper | ||
|
||
import ( | ||
"encoding/binary" | ||
"encoding/gob" | ||
"io" | ||
"time" | ||
|
||
"github.com/lni/dragonboat/v4/logger" | ||
sm "github.com/lni/dragonboat/v4/statemachine" | ||
|
||
"github.com/matrixorigin/matrixone/pkg/common/moerr" | ||
"github.com/matrixorigin/matrixone/pkg/pb/logservice" | ||
) | ||
|
||
var ( | ||
plog = logger.GetLogger("hakeeper") | ||
) | ||
|
||
var ( | ||
binaryEnc = binary.BigEndian | ||
) | ||
|
||
const ( | ||
// TickDuration defines the frequency of ticks. | ||
TickDuration = time.Second | ||
// DefaultHAKeeperShardID is the shard ID assigned to the special HAKeeper | ||
// shard. | ||
DefaultHAKeeperShardID uint64 = 0 | ||
|
||
headerSize = 2 | ||
) | ||
|
||
const ( | ||
createLogShardTag uint16 = iota + 0xAE01 | ||
tickTag | ||
dnHeartbeatTag | ||
logHeartbeatTag | ||
) | ||
|
||
type logShardIDQuery struct { | ||
name string | ||
} | ||
|
||
type logShardIDQueryResult struct { | ||
id uint64 | ||
found bool | ||
} | ||
|
||
type stateMachine struct { | ||
replicaID uint64 | ||
|
||
Tick uint64 | ||
NextID uint64 | ||
|
||
LogShards map[string]uint64 | ||
DNState DNState | ||
LogState LogState | ||
} | ||
|
||
func parseCmdTag(cmd []byte) uint16 { | ||
return binaryEnc.Uint16(cmd) | ||
} | ||
|
||
func getCreateLogShardCmd(name string) []byte { | ||
return getLogShardCmd(name, createLogShardTag) | ||
} | ||
|
||
func getLogShardCmd(name string, tag uint16) []byte { | ||
cmd := make([]byte, headerSize+len(name)) | ||
binaryEnc.PutUint16(cmd, tag) | ||
copy(cmd[headerSize:], []byte(name)) | ||
return cmd | ||
} | ||
|
||
func isCreateLogShardCmd(cmd []byte) (string, bool) { | ||
return isLogShardCmd(cmd, createLogShardTag) | ||
} | ||
|
||
func isDNHeartbeatCmd(cmd []byte) bool { | ||
return isHeartbeatCmd(cmd, dnHeartbeatTag) | ||
} | ||
|
||
func isLogHeartbeatCmd(cmd []byte) bool { | ||
return isHeartbeatCmd(cmd, logHeartbeatTag) | ||
} | ||
|
||
func isHeartbeatCmd(cmd []byte, tag uint16) bool { | ||
if len(cmd) <= headerSize { | ||
return false | ||
} | ||
return parseCmdTag(cmd) == tag | ||
} | ||
|
||
func parseHeartbeatCmd(cmd []byte) []byte { | ||
return cmd[headerSize:] | ||
} | ||
|
||
func isLogShardCmd(cmd []byte, tag uint16) (string, bool) { | ||
if len(cmd) <= headerSize { | ||
return "", false | ||
} | ||
if parseCmdTag(cmd) == tag { | ||
return string(cmd[headerSize:]), true | ||
} | ||
return "", false | ||
} | ||
|
||
func isTickCmd(cmd []byte) bool { | ||
return len(cmd) == headerSize && binaryEnc.Uint16(cmd) == tickTag | ||
} | ||
|
||
func GetTickCmd() []byte { | ||
cmd := make([]byte, headerSize) | ||
binaryEnc.PutUint16(cmd, tickTag) | ||
return cmd | ||
} | ||
|
||
func GetLogStoreHeartbeatCmd(data []byte) []byte { | ||
return getHeartbeatCmd(data, logHeartbeatTag) | ||
} | ||
|
||
func GetDNStoreHeartbeatCmd(data []byte) []byte { | ||
return getHeartbeatCmd(data, dnHeartbeatTag) | ||
} | ||
|
||
func getHeartbeatCmd(data []byte, tag uint16) []byte { | ||
cmd := make([]byte, headerSize+len(data)) | ||
binaryEnc.PutUint16(cmd, tag) | ||
copy(cmd[headerSize:], data) | ||
return cmd | ||
} | ||
|
||
func NewStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine { | ||
if shardID != DefaultHAKeeperShardID { | ||
panic(moerr.NewError(moerr.INVALID_INPUT, "invalid HAKeeper shard ID")) | ||
} | ||
return &stateMachine{ | ||
replicaID: replicaID, | ||
LogShards: make(map[string]uint64), | ||
DNState: NewDNState(), | ||
LogState: NewLogState(), | ||
} | ||
} | ||
|
||
func (s *stateMachine) Close() error { | ||
return nil | ||
} | ||
|
||
func (s *stateMachine) assignID() uint64 { | ||
s.NextID++ | ||
return s.NextID | ||
} | ||
|
||
func (s *stateMachine) handleCreateLogShardCmd(cmd []byte) (sm.Result, error) { | ||
name, ok := isCreateLogShardCmd(cmd) | ||
if !ok { | ||
panic(moerr.NewError(moerr.INVALID_INPUT, "not create log shard cmd")) | ||
} | ||
if shardID, ok := s.LogShards[name]; ok { | ||
data := make([]byte, 8) | ||
binaryEnc.PutUint64(data, shardID) | ||
return sm.Result{Value: 0, Data: data}, nil | ||
} | ||
s.LogShards[name] = s.assignID() | ||
return sm.Result{Value: s.NextID}, nil | ||
} | ||
|
||
func (s *stateMachine) handleDNHeartbeat(cmd []byte) (sm.Result, error) { | ||
data := parseHeartbeatCmd(cmd) | ||
var hb logservice.DNStoreHeartbeat | ||
if err := hb.Unmarshal(data); err != nil { | ||
panic(err) | ||
} | ||
s.DNState.Update(hb, s.Tick) | ||
return sm.Result{}, nil | ||
} | ||
|
||
func (s *stateMachine) handleLogHeartbeat(cmd []byte) (sm.Result, error) { | ||
data := parseHeartbeatCmd(cmd) | ||
var hb logservice.LogStoreHeartbeat | ||
if err := hb.Unmarshal(data); err != nil { | ||
panic(err) | ||
} | ||
s.LogState.Update(hb, s.Tick) | ||
return sm.Result{}, nil | ||
} | ||
|
||
func (s *stateMachine) handleTick(cmd []byte) (sm.Result, error) { | ||
s.Tick++ | ||
return sm.Result{}, nil | ||
} | ||
|
||
func (s *stateMachine) Update(e sm.Entry) (sm.Result, error) { | ||
cmd := e.Cmd | ||
if _, ok := isCreateLogShardCmd(cmd); ok { | ||
return s.handleCreateLogShardCmd(cmd) | ||
} else if isDNHeartbeatCmd(cmd) { | ||
return s.handleDNHeartbeat(cmd) | ||
} else if isLogHeartbeatCmd(cmd) { | ||
return s.handleLogHeartbeat(cmd) | ||
} else if isTickCmd(cmd) { | ||
return s.handleTick(cmd) | ||
} | ||
panic(moerr.NewError(moerr.INVALID_INPUT, "unexpected haKeeper cmd")) | ||
} | ||
|
||
func (s *stateMachine) Lookup(query interface{}) (interface{}, error) { | ||
if q, ok := query.(*logShardIDQuery); ok { | ||
id, ok := s.LogShards[q.name] | ||
if ok { | ||
return &logShardIDQueryResult{found: true, id: id}, nil | ||
} | ||
return &logShardIDQueryResult{found: false}, nil | ||
} | ||
panic("unknown query type") | ||
} | ||
|
||
func (s *stateMachine) SaveSnapshot(w io.Writer, | ||
_ sm.ISnapshotFileCollection, _ <-chan struct{}) error { | ||
enc := gob.NewEncoder(w) | ||
return enc.Encode(s) | ||
} | ||
|
||
func (s *stateMachine) RecoverFromSnapshot(r io.Reader, | ||
_ []sm.SnapshotFile, _ <-chan struct{}) error { | ||
dec := gob.NewDecoder(r) | ||
return dec.Decode(s) | ||
} |
Oops, something went wrong.