From 0e12fde6774b3fff748eb31ca721cc7c0d8a5d29 Mon Sep 17 00:00:00 2001 From: salemmohammed Date: Mon, 31 May 2021 23:19:27 -0400 Subject: [PATCH] Linear --- .DS_Store | Bin 14340 -> 12292 bytes benchmark.go | 12 +- bin/config.json | 18 +-- client.go | 15 +-- client/client.go | 14 +- consensus/consensus.go | 286 +++++++++++++++++++++++++++++++---------- consensus/msg.go | 26 ++-- consensus/replica.go | 5 +- node.go | 4 +- peerset.go | 103 +++++++++++++++ quorum.go | 4 + socket.go | 13 +- transport.go | 3 - 13 files changed, 374 insertions(+), 129 deletions(-) create mode 100644 peerset.go diff --git a/.DS_Store b/.DS_Store index 0f6a7a5d9ffe0145ee48e0bd33c70438a40c3ae1..b28dd3742a36e3cf0a6bc1af67a4acd417ac6d9f 100644 GIT binary patch delta 352 zcmZoEXh~3DU|?W$DortDV9)?EIe-{M3-ADmb_NCo?uiQeqHaJAGfay>a+7luBsX4SV&BZp!NS3)J#m88W*&hH zEWBJm-KjuW#8AwTI=MhxoVA;Qfk|!iMUgGcjHf0~6l~Cvtgbe+Fx62oHZ-Z#QK&XF zG_%lAFf%f#t>xqpS2eWtOvtUQs;;T6n>9&Uo^keOf5}+3&D;tqjAZKnsc6o~nNpmb zla!yI!?;;Nc?-+r3F2ZhKu@s(F%W2Q0|{5;AlZ2FJ@aIK6J17z$&M!SJiq_|1sem9 RHkh1aGI8^Lm9I=7LjWcZQ}h4; delta 270 zcmZokXem%&U|?W$DortDU@!nOIe-{M3-ADmHUT76 zKRGF9vLK`KWCI-*Rz{#K%VZt_Ew2CnflLMlK8B>tf{fqTCMz)dZf56T;b7FSAw-NAU?DJ>6x6N TGjX!Lp#Ekz)fG$=Cl~+#lK49H diff --git a/benchmark.go b/benchmark.go index 5b6ad8a4..b3c37c88 100644 --- a/benchmark.go +++ b/benchmark.go @@ -12,7 +12,6 @@ import ( // DB is general interface implemented by client to call client library type DB interface { Init() error - Read(key int) (int, error) Write(key int, value []byte,Globalcounter int) error Stop() error } @@ -147,10 +146,11 @@ func (b *Benchmark) Run() { keys := make(chan int, b.Concurrency) latencies := make(chan time.Duration, 1000) - globalCouner := make(chan int, 10) + globalCouner := make(chan int, 1000) defer close(latencies) go b.collect(latencies) + // number of threads or concurrency for i := 0; i < b.Concurrency; i++ { // this b is object calls worker function @@ -158,7 +158,6 @@ func (b *Benchmark) Run() { b.worker(keys, latencies,globalCouner) }() } - b.db.Init() b.startTime = time.Now() if b.T > 0 { @@ -189,6 +188,7 @@ func (b *Benchmark) Run() { b.db.Stop() close(keys) close(globalCouner) + log.Debugf("--------------------done -------------2") stat := Statistic(b.latency) log.Infof("Concurrency = %d", b.Concurrency) log.Infof("Write Ratio = %f", b.W) @@ -257,18 +257,12 @@ func (b *Benchmark) worker(keys <-chan int, result chan<- time.Duration, globalC for k := range keys { op := new(operation) if rand.Float64() < b.W { - //v = rand.Int() - //log.Debugf("value %v", data) s = time.Now() - - //counter++ - //slog.Debugf("globalCouner = %v", <- globalCouner) err = b.db.Write(k, data,<- globalCouner) e = time.Now() op.input = data } else { s = time.Now() - //v, err = b.db.Read(k) e = time.Now() op.output = data } diff --git a/bin/config.json b/bin/config.json index d238eebb..03fb3f2f 100644 --- a/bin/config.json +++ b/bin/config.json @@ -1,15 +1,15 @@ { "address": { - "1.1": "tcp://18.221.231.103:1735", - "1.2": "tcp://3.135.241.193:1736", - "1.3": "tcp://3.15.147.83:1737", - "1.4": "tcp://3.15.239.10:1738" + "1.1": "tcp://127.0.0.1:1735", + "1.2": "tcp://127.0.0.1:1736", + "1.3": "tcp://127.0.0.1:1737", + "1.4": "tcp://127.0.0.1:1738" }, "http_address": { - "1.1": "http://18.221.231.103:8080", - "1.2": "http://3.135.241.193:8081", - "1.3": "http://3.15.147.83:8082", - "1.4": "http://3.15.239.10:8083" + "1.1": "http://127.0.0.1:8080", + "1.2": "http://127.0.0.1:8081", + "1.3": "http://127.0.0.1:8082", + "1.4": "http://127.0.0.1:8083" }, "policy": "majority", "threshold": 3, @@ -23,7 +23,7 @@ "K": 1000, "W": 1, "Throttle": 0, - "Concurrency": 1, + "Concurrency": 4, "Distribution": "uniform", "LinearizabilityCheck": false, "Conflicts": 0, diff --git a/client.go b/client.go index a26fe9fe..5f1efd7f 100644 --- a/client.go +++ b/client.go @@ -16,14 +16,10 @@ import ( ) var List []ID var rr int = 0 -//type SafeCounter struct { -// mu sync.Mutex -// v map[string]int -//} -// Client interface provides get and put for key value store + type Client interface { GetMUL(Key) ([]Value, []map[string]string) - PutMUL(Key, Value) []error + PutMUL(Key, Value,int) error Get(Key) (Value, error) Put(Key, Value,int) error Next([]ID) ID @@ -140,15 +136,16 @@ func (c *HTTPClient) GetMUL(key Key) ([]Value, []map[string]string) { // Put puts new key value pair and return previous value (use REST) // Default implementation of Client interface -func (c *HTTPClient) PutMUL(key Key, value Value) []error { +func (c *HTTPClient) PutMUL(key Key, value Value,Globalcounter int) error { log.Debugf("<----------------PutMUL---------------->") i := 0 + log.Debugf("Put Function Globalcounter = %v", Globalcounter) errs := make(chan error,len(c.HTTP)) for id := range c.HTTP { //log.Debugf("id=%v",id) go func(id ID) { c.CID++ - _, _, err := c.rest(id, key, value,c.CID,0) + _, _, err := c.rest(id, key, value,c.CID,Globalcounter) if err != nil { log.Error(err) return @@ -162,7 +159,7 @@ func (c *HTTPClient) PutMUL(key Key, value Value) []error { errors = append(errors, <-errs) } //log.Debugf("errors %v ", errors) - return errors + return errors[0] } func (c *HTTPClient) GetURL(id ID, key Key) string { diff --git a/client/client.go b/client/client.go index f5975bd2..9032c793 100644 --- a/client/client.go +++ b/client/client.go @@ -1,8 +1,8 @@ package main import ( - "encoding/binary" "flag" + "fmt" "github.com/salemmohammed/BigBFT" "github.com/salemmohammed/BigBFT/log" @@ -12,7 +12,6 @@ var id = flag.String("id", "", "node id this client connects to") var algorithm = flag.String("algorithm", "consensus", "Client API type [paxos, chain]") var load = flag.Bool("load", false, "Load K keys into DB") -// db implements BigBFT.DB interface for benchmarking type db struct { BigBFT.Client } @@ -25,22 +24,13 @@ func (d *db) Stop() error { return nil } -func (d *db) Read(k int) (int, error) { - key := BigBFT.Key(k) - v, err := d.Get(key) - if len(v) == 0 { - return 0, nil - } - x, _ := binary.Uvarint(v) - return int(x), err -} - func (d *db) Write(k int, v []byte, Globalcounter int) error { key := BigBFT.Key(k) //value := make([]byte, binary.MaxVarintLen64) //binary.PutUvarint(value, uint64(v)) log.Debugf("write function global counter = %v", Globalcounter) err := d.Put(key, v,Globalcounter) + fmt.Println("k = %v", k) return err } diff --git a/consensus/consensus.go b/consensus/consensus.go index a290e092..bbda888d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1,40 +1,41 @@ package consensus import ( + "github.com/salemmohammed/BigBFT" "github.com/salemmohammed/BigBFT/log" - "strconv" + "sync" "time" - "github.com/salemmohammed/BigBFT" ) - -// entry in log +var mutex = &sync.Mutex{} type entry struct { ballot BigBFT.Ballot - command BigBFT.Command commit bool request *BigBFT.Request + command BigBFT.Command quorum *BigBFT.Quorum timestamp time.Time - active bool + received bool + Voted bool + leader bool } - type Consensus struct { BigBFT.Node - - log map[int]*entry // log ordered by slot - execute int // next execute slot number - active bool // active leader - ballot BigBFT.Ballot // highest ballot number - slot int // highest slot number - - quorum *BigBFT.Quorum // phase 1 quorum - requests []*BigBFT.Request // phase 1 pending requests - + log map[int]*entry + execute int + active bool + ballot BigBFT.Ballot + slot int + quorum *BigBFT.Quorum + requests []*BigBFT.Request Q1 func(*BigBFT.Quorum) bool Q2 func(*BigBFT.Quorum) bool -} + l map[int]*CommandBallot + Flag bool + count int + Counter int -// NewPaxos creates new paxos instance + Member *BigBFT.Memberlist +} func NewConsensus(n BigBFT.Node, options ...func(*Consensus)) *Consensus { p := &Consensus{ Node: n, @@ -44,6 +45,11 @@ func NewConsensus(n BigBFT.Node, options ...func(*Consensus)) *Consensus { requests: make([]*BigBFT.Request, 0), Q1: func(q *BigBFT.Quorum) bool { return q.Majority() }, Q2: func(q *BigBFT.Quorum) bool { return q.Majority() }, + Member: BigBFT.NewMember(), + l: make(map[int]*CommandBallot), + Flag: false, + count: -1, + Counter: -1, } for _, opt := range options { opt(p) @@ -54,14 +60,12 @@ func NewConsensus(n BigBFT.Node, options ...func(*Consensus)) *Consensus { func (p *Consensus) HandleRequest(r BigBFT.Request) { p.active = true p.slot = r.Command.Counter - log.Debugf("Current slot is %v", p.slot) p.ballot.Next(p.ID()) p.requests = append(p.requests, &r) p.Propose(&r) } - func (p *Consensus) Propose(r *BigBFT.Request) { - // create the log entry + p.log[p.slot] = &entry{ ballot: p.ballot, request: r, @@ -69,16 +73,47 @@ func (p *Consensus) Propose(r *BigBFT.Request) { timestamp: time.Now(), quorum: BigBFT.NewQuorum(), commit: false, - active: true, + received: false, + Voted: false, + leader: true, } - p.Broadcast(Propose{Ballot: p.ballot, ID: p.ID(), Request: *r, Slot:p.slot}) -} + mutex.Lock() + p.l[p.slot] = &CommandBallot{r.Command,p.slot,p.ID()} + p.count++ + mutex.Unlock() -func (p *Consensus) HandlePropose(m Propose) { + log.Debugf("p.l[%v] created = %v", p.slot, p.l[p.slot].Command) + log.Debugf("size log %v", len(p.l)) + + p.Broadcast(Propose{Ballot: p.ballot, Request: *r, Slot:p.slot, ID: p.ID()}) - log.Debugf("HandlePropose = %v", m) - log.Debugf("slot = %v", m.Slot) + log.Debugf("-------------------------------------------------------") + t := p.execute + 3 + log.Debugf("t %v", t) + log.Debugf("p.count %v", p.count) + + if p.count >= t { + log.Debugf("p.HandlePropose") + p.HandlePropose(Propose{Ballot: p.ballot, Request: *r,Slot:p.slot, ID: p.ID()}) + } + + if len(p.l) >= p.log[p.slot].quorum.Total() - 1{ + log.Debugf(" Local HandlePropose") + p.HandlePropose(Propose{Ballot: p.ballot, Request: *r,Slot:p.slot, ID: p.ID()}) + } + + log.Debugf("-------------------------------------------------------") + +} +func (p *Consensus) HandlePropose(m Propose) { + log.Debugf("HandlePropose = %v", m.Slot) + log.Debugf("------HandlePropose------") + startTime := time.Now() + log.Debugf("------startTime------ = %v", startTime) + if p.ballot < m.Ballot { + p.ballot = m.Ballot + } _, exist := p.log[m.Slot] if !exist { p.log[m.Slot] = &entry{ @@ -88,67 +123,180 @@ func (p *Consensus) HandlePropose(m Propose) { timestamp: time.Now(), quorum: BigBFT.NewQuorum(), commit: false, + received: false, + Voted: false, + leader: false, } - log.Debugf("Created") + p.count++ + log.Debugf("%v Slot is created", m.Slot) + } + //e = p.log[m.Slot] + + log.Debugf("p.count = %v", p.count) + log.Debugf("p.slot = %v", p.slot) + + mutex.Lock() + p.l[m.Slot] = &CommandBallot{m.Request.Command,m.Slot, p.ID()} + log.Debugf("p.l[%v] created = %v", m.Slot, p.l[m.Slot].Command) + mutex.Unlock() + + + e := p.log[m.Slot] + t := p.execute + e.quorum.Total() - 1 + log.Debugf("t %v", t) + log.Debugf("p.count %v", p.count) + log.Debugf("p.l %v", len(p.l)) + log.Debugf("p.count >= t %v", p.count >= t) + + log.Debugf("m.ID %v", m.ID) + p.Member.Addmember(m.ID) + log.Debugf("Nighbors %v", p.Member.Neibors) + + if p.count >= t || len(p.l) >= e.quorum.Total() - 1{ + for ss , _ := range p.log { + e := p.log[ss] + log.Debugf("ss = %v", ss) + e.Voted= true + } + } + + //e = p.log[m.Slot] + //if (p.count >= t || len(p.l) >= e.quorum.Total() - 1) && p.Member.Size() == p.Member.ClientSize()-2{ + // log.Debugf("conditions") + // p.Broadcast(Vote{ + // Slot: m.Slot, + // Id: p.ID(), + // L: p.l, + // }) + // p.l = make(map[int]*CommandBallot) + // p.Member.Reset() + //} + + c1 := make(chan Vote) + + go func() { + //time.Sleep(1 * time.Second) + c1 <- Vote{ + Slot: m.Slot, + Id: p.ID(), + L: p.l, + } + }() + + timer := time.NewTimer(10*time.Millisecond) + flag := false + loop: + for{ + select { + case m := <- c1: + e := p.log[m.Slot] + if p.count >= t || len(m.L) >= e.quorum.Total() - 1 || p.Member.Size() == 3{ + p.Member.Reset() + log.Debugf("conditions") + p.Broadcast(Vote{ + Slot: m.Slot, + Id: p.ID(), + L: p.l, + }) + p.l = make(map[int]*CommandBallot) + } + case <- timer.C: + flag = true + log.Debugf("time.After") + break loop + } + } + + + + if len(p.l) >= e.quorum.Total()/2 && flag == true && p.Member.Size() >= e.quorum.Total()/2{ + p.Member.Reset() + log.Debugf("p.count >= t || len(p.l) >= e.quorum.Total() - 2 = %v ", e.quorum.Total()-2) + log.Debugf("p.count >= t || len(p.l) >= e.quorum.Total()/2 = %v ", e.quorum.Total()/2) + p.Broadcast(Vote{ + Slot: m.Slot, + Id: p.ID(), + L: p.l, + }) + p.l = make(map[int]*CommandBallot) } - //e := p.log[m.Slot] - //e.commit = false - p.Broadcast(Vote{Ballot: m.Ballot, ID: p.ID(), Request: m.Request, Slot: m.Slot}) } func (p *Consensus) HandleVote(m Vote) { - log.Debugf("HandleVote = %v", m) - e, exist := p.log[m.Slot] - if !exist { - p.log[m.Slot] = &entry{ - ballot: m.Ballot, - request: &m.Request, - command: m.Request.Command, - quorum: BigBFT.NewQuorum(), - commit: false, + log.Debugf("------HandleVote------") + + + for s, sc := range m.L{ + log.Debugf("s =%v",s ) + e , ok := p.log[s] + if !ok{ + if p.execute > s { + log.Debugf("continue") + continue + }else{ + p.log[s] = &entry{ + command: sc.Command, + timestamp: time.Now(), + quorum: BigBFT.NewQuorum(), + commit: false, + received: false, + Voted: false, + leader: false, + } + } + log.Debugf("%v s is created", s) + } + e = p.log[s] + e.received = true + log.Debugf("e =%v",e.command ) + e.commit = true + e.quorum.ACK(sc.Id) + log.Debugf("e =%v",e.quorum.Size() ) + if e.quorum.Size() == e.quorum.Total() - 1 { + e.Voted = true + //e.quorum.Reset() } - e = p.log[m.Slot] - } - e = p.log[m.Slot] - e.quorum.ACK(m.ID) - log.Debugf("size %v", e.quorum.Size()) - if e.quorum.Majority() { - //e.quorum.Reset() - log.Debugf("inside majority") - e.command = m.Request.Command - //if e.commit == false{ - e.commit = true - p.exec() - //} } + //mutex.Unlock() + p.exec() } func (p *Consensus) exec() { for { + log.Debugf("------exec------") e, ok := p.log[p.execute] log.Debugf("p.execute = %v ", p.execute) - if !ok || !e.commit { - log.Debugf("break") + if !ok{ + log.Debugf("!ok break") + break + } + if !e.commit{ + log.Debugf("!e.commit break") + break + } + if e.Voted == false { + log.Debugf("e.Voted break") break } log.Debugf("Replica %s execute [s=%d, cmd=%v]", p.ID(), p.execute, e.command) value := p.Execute(e.command) + log.Debugf("p.Execute(e.command)") if e.request != nil { - log.Debugf("inside if reply ") - reply := BigBFT.Reply{ - Command: e.command, - Value: value, - Properties: make(map[string]string), - } - reply.Properties[HTTPHeaderSlot] = strconv.Itoa(p.execute) - reply.Properties[HTTPHeaderBallot] = e.ballot.String() - reply.Properties[HTTPHeaderExecute] = strconv.Itoa(p.execute) - if e.active{ + if e.leader{ + log.Debugf("inside if reply ") + reply := BigBFT.Reply{ + Command: e.command, + Value: value, + Properties: make(map[string]string), + } + log.Debugf("e.request.Reply(reply) B/F") e.request.Reply(reply) + log.Debugf("e.request.Reply(reply) A/F") } - //e.request = nil } - // TODO clean up the log periodically - //delete(p.log, p.execute) + delete(p.log,p.execute) + mutex.Lock() + delete(p.l,p.execute) + mutex.Unlock() p.execute++ log.Debugf("Done") } diff --git a/consensus/msg.go b/consensus/msg.go index a608cc3c..c3c0d14d 100644 --- a/consensus/msg.go +++ b/consensus/msg.go @@ -3,7 +3,6 @@ package consensus import ( "encoding/gob" "fmt" - "github.com/salemmohammed/BigBFT" ) @@ -14,22 +13,33 @@ func init() { type Propose struct { Ballot BigBFT.Ballot - ID BigBFT.ID Request BigBFT.Request Slot int + ID BigBFT.ID } func (m Propose) String() string { - return fmt.Sprintf("Propose {b=%v id=%s request=%v slot=%v}", m.Ballot, m.ID, m.Request, m.Slot) + return fmt.Sprintf("Propose {b=%v request=%v slot=%v}", m.Ballot, m.Request, m.Slot) } type Vote struct { - Ballot BigBFT.Ballot - Slot int - Request BigBFT.Request - ID BigBFT.ID + Slot int + Id BigBFT.ID + L map[int]*CommandBallot } func (m Vote) String() string { - return fmt.Sprintf("Voted {b=%v s=%d R=%v id=%s}", m.Ballot, m.Slot, m.Request, m.ID) + return fmt.Sprintf("Vote {L=%v}", m.L) +} + +// CommandBallot conbines each command with its ballot number +type CommandBallot struct { + //Request BigBFT.Request + Command BigBFT.Command + Slot int + Id BigBFT.ID +} + +func (cb CommandBallot) String() string { + return fmt.Sprintf("cmd=%v s=%v", cb.Command, cb.Slot) } \ No newline at end of file diff --git a/consensus/replica.go b/consensus/replica.go index 25007a34..f3c477b3 100644 --- a/consensus/replica.go +++ b/consensus/replica.go @@ -19,11 +19,10 @@ func NewReplica(id BigBFT.ID) *Replica { r.Register(BigBFT.Request{}, r.handleRequest) r.Register(Propose{}, r.HandlePropose) r.Register(Vote{}, r.HandleVote) - //r.Register(Block{}, r.handleBlock) return r } func (r *Replica) handleRequest(m BigBFT.Request) { - log.Debugf("the count is = %v ", m.Command.Counter) - log.Debugf("Replica %s received %v\n", r.ID(), m) + log.Debugf("The Current slot is %v", m.Command.Counter) + log.Debugf("Replica %s received %v", r.ID(), m) r.Consensus.HandleRequest(m) } \ No newline at end of file diff --git a/node.go b/node.go index 2f443c81..3c9cfbad 100644 --- a/node.go +++ b/node.go @@ -50,7 +50,6 @@ func (n *node) ID() ID { } func (n *node) Retry(r Request) { - log.Debugf("node %v retry reqeust %v", n.id, r) n.MessageChan <- r } @@ -77,6 +76,7 @@ func (n *node) Run() { // recv receives messages from socket and pass to message channel func (n *node) recv() { for { + //log.Debugf("recv receives messages from socket and pass to message channel") m := n.Recv() switch m := m.(type) { @@ -102,7 +102,7 @@ func (n *node) recv() { // handle receives messages from message channel and calls handle function using refection func (n *node) handle() { - log.Debugf("salem in handle") + //log.Debugf("handle") for { msg := <-n.MessageChan v := reflect.ValueOf(msg) diff --git a/peerset.go b/peerset.go new file mode 100644 index 00000000..96d7f697 --- /dev/null +++ b/peerset.go @@ -0,0 +1,103 @@ +package BigBFT + +import "C" + +import ( + "github.com/salemmohammed/BigBFT/log" + "net" + "net/url" + "sync" +) + +var ( + mu sync.Mutex + cm *Memberlist +) + +// IPeerSet has a (immutable) subset of the methods of PeerSet. +type IPeerSet interface { + Has(key ID) bool + HasIP(ip net.IP) bool + Get(key ID) Memberlist + List() []Memberlist + Size() int + getProposal() int +} + +type Event struct { + IPAddress string +} + +type Memberlist struct { + Addrs string + Neibors []ID + available map[ID]bool + size int +} + +// For status +const ( + RUNNING = 1 + FAILING = 2 + FAILED = 3 + REJOINED = 4 +) + +func NewMember() *Memberlist { + newm := &Memberlist{ + Addrs: "", + Neibors: make([]ID, 0), + available: make(map[ID]bool), + size: 0, + } + return newm +} + +func (m *Memberlist) Addmember(id ID) { + //for i, _ := range config.Addrs { + ur, _ := url.Parse(config.Addrs[id]) + url := ur.String() + if !m.available[id] { + m.Addrs = url + m.available[id] = true + m.size++ + m.Neibors = append(m.Neibors, id) + } + } +//} + +func (m *Memberlist) Delete(id ID) { + + for i, v := range m.Neibors { + if v == id { + ret := make([]ID, 0) + ret = append(ret, m.Neibors[:i]...) + ret = append(ret, m.Neibors[i+1:]...) + m.Neibors = ret + m.size-- + m.available[v] = false + m.Addrs = "" + } + } +} + +func (m *Memberlist) Reset() { + m.Neibors = make([]ID, 0) + m.available = make(map[ID]bool) + m.size = 0 + m.Addrs = "" +} + +// Size returns the number of unique items in the peerSet. +func (ps *Memberlist) Size() int { + mu.Lock() + defer mu.Unlock() + return len(ps.Neibors) +} + +func (ps *Memberlist) ClientSize() int { + value := config.Benchmark.Concurrency + log.Debugf("The value of concurrency is:%v", value) + return value +} + diff --git a/quorum.go b/quorum.go index fffd261f..b72bb54d 100644 --- a/quorum.go +++ b/quorum.go @@ -49,6 +49,10 @@ func (q *Quorum) All() bool { return q.size == config.n } +func (q *Quorum) Total() int { + return config.Benchmark.Concurrency +} + // Majority quorum satisfied func (q *Quorum) Majority() bool { return q.size >= config.n/2 diff --git a/socket.go b/socket.go index 74eb655e..2a92a747 100644 --- a/socket.go +++ b/socket.go @@ -65,7 +65,7 @@ func NewSocket(id ID, addrs map[ID]string) Socket { } func (s *socket) Send(to ID, m interface{}) { - log.Debugf("node %s send message %+v to %v", s.id, m, to) + if s.crash { return @@ -110,16 +110,19 @@ func (s *socket) Send(to ID, m interface{}) { }() return } - + log.Debugf("node %s send message %+v to %v", s.id, m, to) t.Send(m) } func (s *socket) Recv() interface{} { + //log.Debugf("socket receive") s.lock.RLock() + //log.Debugf("s.id = %v", s.id) t := s.nodes[s.id] s.lock.RUnlock() for { m := t.Recv() + //log.Debugf("Receive %v", m) if !s.crash { return m } @@ -142,9 +145,9 @@ func (s *socket) MulticastQuorum(quorum int, m interface{}) { //log.Debugf("node %s multicasting message %+v for %d nodes", s.id, m, quorum) i := 0 for id := range s.addresses { - if id == s.id { - continue - } + //if id == s.id { + // continue + //} s.Send(id, m) i++ if i == quorum { diff --git a/transport.go b/transport.go index ce3dbb96..0342bac5 100644 --- a/transport.go +++ b/transport.go @@ -12,9 +12,7 @@ import ( "github.com/salemmohammed/BigBFT/log" ) - var scheme = flag.String("transport", "tcp", "transport scheme (tcp, udp, chan), default tcp") - // Transport = transport + pipe + client + server type Transport interface { // Scheme returns tranport scheme @@ -35,7 +33,6 @@ type Transport interface { // Close closes send channel and stops listener Close() } - // NewTransport creates new transport object with url func NewTransport(addr string) Transport { if !strings.Contains(addr, "://") {