Skip to content

Commit

Permalink
pump client: support change select pump's strategy (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Mar 21, 2019
1 parent b971f53 commit 1e8b48f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 70 deletions.
65 changes: 40 additions & 25 deletions tidb-binlog/pump_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var (

// PumpInfos saves pumps' infomations in pumps client.
type PumpInfos struct {
sync.RWMutex
// Pumps saves the map of pump's nodeID and pump status.
Pumps map[string]*PumpStatus

Expand All @@ -86,6 +85,8 @@ func NewPumpInfos() *PumpInfos {

// PumpsClient is the client of pumps.
type PumpsClient struct {
sync.RWMutex

ctx context.Context

cancel context.CancelFunc
Expand Down Expand Up @@ -121,7 +122,7 @@ type PumpsClient struct {

// NewPumpsClient returns a PumpsClient.
// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
func NewPumpsClient(etcdURLs, strategy string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -152,7 +153,7 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur
ClusterID: clusterID,
EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout),
Pumps: NewPumpInfos(),
Selector: NewSelector(Range),
Selector: NewSelector(strategy),
BinlogWriteTimeout: timeout,
Security: security,
nodePath: path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]),
Expand Down Expand Up @@ -239,14 +240,19 @@ func (c *PumpsClient) getPumpStatus(pctx context.Context) (revision int64, err e

// WriteBinlog writes binlog to a situable pump. Tips: will never return error for commit/rollback binlog.
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
c.RLock()
pumpNum := len(c.Pumps.AvaliablePumps)
selector := c.Selector
c.RUnlock()

var choosePump *PumpStatus
meetError := false
defer func() {
if meetError {
c.checkPumpAvaliable()
}

c.Selector.Feedback(binlog.StartTs, binlog.Tp, choosePump)
selector.Feedback(binlog.StartTs, binlog.Tp, choosePump)
}()

commitData, err := binlog.Marshal()
Expand All @@ -260,13 +266,9 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error {
var resp *pb.WriteBinlogResp
startTime := time.Now()

c.Pumps.RLock()
pumpNum := len(c.Pumps.AvaliablePumps)
c.Pumps.RUnlock()

for {
if pump == nil || binlog.Tp == pb.BinlogType_Prewrite {
pump = c.Selector.Select(binlog, retryTime)
pump = selector.Select(binlog, retryTime)
}
if pump == nil {
err = ErrNoAvaliablePump
Expand Down Expand Up @@ -333,11 +335,11 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
}

unAvaliablePumps := make([]*PumpStatus, 0, 3)
c.Pumps.RLock()
c.RLock()
for _, pump := range c.Pumps.UnAvaliablePumps {
unAvaliablePumps = append(unAvaliablePumps, pump)
}
c.Pumps.RUnlock()
c.RUnlock()

var resp *pb.WriteBinlogResp
// send binlog to unavaliable pumps to retry again.
Expand Down Expand Up @@ -365,9 +367,9 @@ func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.B
}

func (c *PumpsClient) checkPumpAvaliable() {
c.Pumps.RLock()
c.RLock()
allPumps := copyPumps(c.Pumps.Pumps)
c.Pumps.RUnlock()
c.RUnlock()

for _, pump := range allPumps {
if !pump.IsUsable() {
Expand All @@ -378,8 +380,8 @@ func (c *PumpsClient) checkPumpAvaliable() {

// setPumpAvaliable set pump's isAvaliable, and modify UnAvaliablePumps or AvaliablePumps.
func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) {
c.Pumps.Lock()
defer c.Pumps.Unlock()
c.Lock()
defer c.Unlock()

pump.Reset()

Expand All @@ -401,7 +403,7 @@ func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) {

// addPump add a new pump.
func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) {
c.Pumps.Lock()
c.Lock()

if pump.IsUsable() {
c.Pumps.AvaliablePumps[pump.NodeID] = pump
Expand All @@ -414,13 +416,26 @@ func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) {
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
}

c.Pumps.Unlock()
c.Unlock()
}

// SetSelectStrategy sets the selector's strategy, strategy should be 'range' or 'hash' now.
func (c *PumpsClient) SetSelectStrategy(strategy string) error {
if strategy != Range && strategy != Hash {
return errors.Errorf("strategy %s is not support", strategy)
}

c.Lock()
c.Selector = NewSelector(strategy)
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
c.Unlock()
return nil
}

// updatePump update pump's status, and return whether pump's IsAvaliable should be changed.
func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliableChanged, avaliable bool) {
var ok bool
c.Pumps.Lock()
c.Lock()
if pump, ok = c.Pumps.Pumps[status.NodeID]; ok {
if pump.Status.State != status.State {
if status.State == node.Online {
Expand All @@ -433,29 +448,29 @@ func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliab
}
pump.Status = *status
}
c.Pumps.Unlock()
c.Unlock()

return
}

// removePump removes a pump, used when pump is offline.
func (c *PumpsClient) removePump(nodeID string) {
c.Pumps.Lock()
c.Lock()
if pump, ok := c.Pumps.Pumps[nodeID]; ok {
pump.Reset()
}
delete(c.Pumps.Pumps, nodeID)
delete(c.Pumps.UnAvaliablePumps, nodeID)
delete(c.Pumps.AvaliablePumps, nodeID)
c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps))
c.Pumps.Unlock()
c.Unlock()
}

// exist returns true if pumps client has pump matched this nodeID.
func (c *PumpsClient) exist(nodeID string) bool {
c.Pumps.RLock()
c.RLock()
_, ok := c.Pumps.Pumps[nodeID]
c.Pumps.RUnlock()
c.RUnlock()
return ok
}

Expand Down Expand Up @@ -531,13 +546,13 @@ func (c *PumpsClient) detect() {
needCheckPumps := make([]*PumpStatus, 0, len(c.Pumps.UnAvaliablePumps))
checkPassPumps := make([]*PumpStatus, 0, 1)
req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: nil}
c.Pumps.RLock()
c.RLock()
for _, pump := range c.Pumps.UnAvaliablePumps {
if pump.IsUsable() {
needCheckPumps = append(needCheckPumps, pump)
}
}
c.Pumps.RUnlock()
c.RUnlock()

for _, pump := range needCheckPumps {
_, err := pump.WriteBinlog(req, c.BinlogWriteTimeout)
Expand Down
27 changes: 22 additions & 5 deletions tidb-binlog/pump_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ var _ = Suite(&testClientSuite{})
type testClientSuite struct{}

func (t *testClientSuite) TestSelector(c *C) {
algorithms := []string{Hash, Range}
for _, algorithm := range algorithms {
t.testSelector(c, algorithm)
strategys := []string{Hash, Range}
for _, strategy := range strategys {
t.testSelector(c, strategy)
}
}

func (*testClientSuite) testSelector(c *C, algorithm string) {
func (*testClientSuite) testSelector(c *C, strategy string) {
pumpsClient := &PumpsClient{
Pumps: NewPumpInfos(),
Selector: NewSelector(algorithm),
Selector: NewSelector(strategy),
BinlogWriteTimeout: DefaultBinlogWriteTimeout,
}

Expand Down Expand Up @@ -139,6 +139,23 @@ func (*testClientSuite) testSelector(c *C, algorithm string) {
// prewrite binlog and commit binlog with same start ts should choose same pump
c.Assert(pump1.NodeID, Equals, pump2.NodeID)
pumpsClient.setPumpAvaliable(pump1, true)

// after change strategy, prewrite binlog and commit binlog will choose same pump
pump1 = pumpsClient.Selector.Select(prewriteBinlog, 0)
pumpsClient.Selector.Feedback(prewriteBinlog.StartTs, prewriteBinlog.Tp, pump1)
if strategy == Range {
err := pumpsClient.SetSelectStrategy(Hash)
c.Assert(err, IsNil)
} else {
err := pumpsClient.SetSelectStrategy(Range)
c.Assert(err, IsNil)
}
pump2 = pumpsClient.Selector.Select(commitBinlog, 0)
c.Assert(pump1.NodeID, Equals, pump2.NodeID)

// set back
err := pumpsClient.SetSelectStrategy(strategy)
c.Assert(err, IsNil)
}
}

Expand Down
Loading

0 comments on commit 1e8b48f

Please sign in to comment.