Skip to content

Commit

Permalink
Added test for announcements
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Nov 13, 2023
1 parent 49288f3 commit e9ced6d
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 111 deletions.
101 changes: 64 additions & 37 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +59,10 @@ func NewFxAnnouncements(h host.Host, o ...Option) (*FxAnnouncements, error) {
}

func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator) error {
if an.topicName == "" {
log.Warn("Announcement do not have any topic to subscribe to")
return errors.New("Announcement do not have any topic to subscribe to")
}
typeSystem, err := ipld.LoadSchemaBytes(schemaBytes)
if err != nil {
panic(fmt.Errorf("cannot load schema: %w", err))
Expand Down Expand Up @@ -87,65 +90,83 @@ func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator
return nil
}

func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType, addrs []multiaddr.Multiaddr) error {
func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType, addrs []multiaddr.Multiaddr, topicString string) error {
log.Info("processing announcement")
switch atype {
case IExistAnnouncementType:
log.Debug("IExist request")
log.Info("IExist request")
an.h.Peerstore().AddAddrs(from, addrs, peerstore.ConnectedAddrTTL)
case PoolJoinRequestAnnouncementType:
log.Debug("PoolJoin request")
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, strconv.Itoa(int(atype)), true); err != nil {
log.Info("PoolJoin request")
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, topicString, true); err != nil {
log.Errorw("An error occurred in handling pool join request announcement", err)
return err
}
default:
log.Debug("Unknown request")
log.Info("Unknown request")
}
return nil
}

func (an *FxAnnouncements) HandleAnnouncements(ctx context.Context) {
log.Debug("called wg.Done in HandleAnnouncements")
defer an.wg.Done()
for {
msg, err := an.sub.Next(ctx)
switch {
case ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed:
log.Info("stopped handling announcements")
return
case err != nil:
log.Errorw("failed to get the next announcement", "err", err)
continue
}
from, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Errorw("failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to decode announcement data", "err", err)
continue
}
if msg != nil {
log.Debugw("HandleAnnouncements", "msg.Topic", msg.Topic)
if err != nil {
log.Debugw("HandleAnnouncements Error", err)
}
switch {
case ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed:
log.Info("stopped handling announcements")
return
case err != nil:
log.Errorw("failed to get the next announcement", "err", err)
continue
}
from, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Errorw("failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
log.Debug("ignoring announcement from self")
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to decode announcement data", "err", err)
continue
}

addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}
addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}

log.Infow("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
err = an.processAnnouncement(ctx, from, a.Type, addrs)
if err != nil {
log.Errorw("failed to process announcement", "err", err)
log.Debugw("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
log.Debug("processAnnouncement call")
if msg.Topic != nil {
err = an.processAnnouncement(ctx, from, a.Type, addrs, *msg.Topic)
if err != nil {
log.Errorw("failed to process announcement", "err", err)
continue
}
} else {
log.Debug("Topic is nil")
continue
}
} else {
continue
}
}
}

func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
log.Debug("called wg.Done in AnnounceIExistPeriodically")
defer an.wg.Done()
ticker := time.NewTicker(an.announceInterval)
for {
Expand Down Expand Up @@ -182,6 +203,7 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
}

func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Context) {
log.Debug("called wg.Done in AnnounceJoinPoolRequestPeriodically")
log.Debug("Starting AnnounceJoinPoolRequestPeriodically")
defer an.wg.Done()
an.announcingJoinPoolMutex.Lock()
Expand Down Expand Up @@ -229,17 +251,19 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
log.Errorw("failed to publish pool join request announcement", "err", err)
continue
}
log.Infow("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t)
log.Debugw("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t)
}
}
}

func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status common.MemberStatus, exists bool) bool {
log.Debug("ValidateAnnouncement")
a := &Announcement{}
if err := a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to unmarshal announcement data", "err", err)
return false
}
log.Debugw("ValidateAnnouncement", "peerID", id, "type", a.Type)

switch a.Type {
case NewManifestAnnouncementType:
Expand All @@ -256,10 +280,13 @@ func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID,
if status != common.Unknown {
log.Errorw("peer is no longer permitted to send this message type", "peer", id)
return false
} else {
log.Debugw("PoolJoinRequestAnnouncementType status is Unknown and ok")
}
case PoolJoinApproveAnnouncementType, IExistAnnouncementType:
// Any member status is valid for a pool join announcement
default:
log.Errorw("The Type is not set ", a.Type)
return false
}

Expand Down
2 changes: 2 additions & 0 deletions announcements/announcements_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
package announcements_test

//Test are included in the blox
3 changes: 3 additions & 0 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
}
bl.stopFetchUsersAfterJoinChan = make(chan struct{})
ticker := time.NewTicker(bl.fetchInterval * time.Minute)
log.Debug("called wg.Add in PoolJoin ticker")
bl.wg.Add(1) // Increment the wait group counter
go func() {
log.Debug("called wg.Done in PoolJoin ticker")
defer bl.wg.Done() // Decrement the wait group counter when the goroutine completes
defer ticker.Stop() // Ensure the ticker is stopped when the goroutine exits

Expand All @@ -132,6 +134,7 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
}
}()
if bl.a != nil {
log.Debug("called wg.Add in PoolJoin ticker2")
bl.wg.Add(1)
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
Expand Down
15 changes: 15 additions & 0 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ func (bl *FxBlockchain) startFetchCheck() {
bl.fetchCheckTicker = time.NewTicker(1 * time.Hour) // check every hour, adjust as needed

// Increment the WaitGroup counter before starting the goroutine
log.Debug("called wg.Add in blockchain startFetchCheck")
bl.wg.Add(1)

go func() {
log.Debug("called wg.Done in startFetchCheck ticker")
defer bl.wg.Done() // Decrement the counter when the goroutine completes

for {
Expand All @@ -167,8 +169,10 @@ func (bl *FxBlockchain) Start(ctx context.Context) error {
return err
}
bl.s.Handler = http.HandlerFunc(bl.serve)
log.Debug("called wg.Add in blockchain start")
bl.wg.Add(1)
go func() {
log.Debug("called wg.Done in Start blockchain")
defer bl.wg.Done()
bl.s.Serve(listen)
}()
Expand Down Expand Up @@ -800,3 +804,14 @@ func (bl *FxBlockchain) GetMemberStatus(id peer.ID) (common.MemberStatus, bool)
bl.membersLock.RUnlock()
return status, true
}

func (bl *FxBlockchain) GetMembers() map[peer.ID]common.MemberStatus {
bl.membersLock.RLock()
defer bl.membersLock.RUnlock()

copy := make(map[peer.ID]common.MemberStatus)
for k, v := range bl.members {
copy[k] = v
}
return copy
}
47 changes: 43 additions & 4 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/functionland/go-fula/announcements"
"github.com/functionland/go-fula/blockchain"
"github.com/functionland/go-fula/common"
"github.com/functionland/go-fula/exchange"
"github.com/functionland/go-fula/ping"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -96,12 +97,14 @@ func New(o ...Option) (*Blox, error) {
return &p, nil
}

func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Message) bool {
status, exists := p.bl.GetMemberStatus(id)
return p.an.ValidateAnnouncement(ctx, id, msg, status, exists)
}

func (p *Blox) Start(ctx context.Context) error {
// implemented topic validators with chain integration.
validator := func(ctx context.Context, id peer.ID, msg *pubsub.Message) bool {
status, exists := p.bl.GetMemberStatus(id)
return p.an.ValidateAnnouncement(ctx, id, msg, status, exists)
}
validator := p.PubsubValidator

anErr := p.an.Start(ctx, validator)

Expand All @@ -114,7 +117,11 @@ func (p *Blox) Start(ctx context.Context) error {
if err := p.bl.FetchUsersAndPopulateSets(ctx, p.topicName, true); err != nil {
log.Errorw("FetchUsersAndPopulateSets failed", "err", err)
}
log.Debug("called wg.Add in blox start")
p.wg.Add(1)
go func() {
log.Debug("called wg.Done in Start blox")
defer p.wg.Done()
log.Infow("IPFS RPC server started on address http://localhost:5001")
switch err := http.ListenAndServe("localhost:5001", p.ServeIpfsRpc()); {
case errors.Is(err, http.ErrServerClosed):
Expand All @@ -125,8 +132,11 @@ func (p *Blox) Start(ctx context.Context) error {
}()

if anErr == nil {
log.Debug("called wg.Add in blox start anErr1")
p.wg.Add(1)
go p.an.AnnounceIExistPeriodically(ctx)
log.Debug("called wg.Add in blox start anErr2")

p.wg.Add(1)
go p.an.HandleAnnouncements(ctx)
} else {
Expand All @@ -146,13 +156,42 @@ func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow b
}

func (p *Blox) StartPingServer(ctx context.Context) error {
//This is for unit testing and no need to call directly
return p.pn.Start(ctx)
}

func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error) {
//This is for unit testing and no need to call directly
return p.pn.Ping(ctx, to)
}

func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus {
//This is for unit testing and no need to call directly
return p.bl.GetMembers()
}

func (p *Blox) StartAnnouncementServer(ctx context.Context) error {
//This is for unit testing and no need to call directly
p.wg.Add(1)
err := p.an.Start(ctx, p.PubsubValidator)
if err == nil {
log.Debug("called wg.Add in StartAnnouncementServer1")
p.wg.Add(1)
go p.an.AnnounceIExistPeriodically(ctx)
log.Debug("called wg.Add in StartAnnouncementServer2")
p.wg.Add(1)
go p.an.HandleAnnouncements(ctx)
}
return err
}

func (p *Blox) AnnounceJoinPoolRequestPeriodically(ctx context.Context) {
p.wg.Add(1)
//This is for unit testing and no need to call directly
log.Debugf("AnnounceJoinPoolRequest ping count %d", p.pingCount)
go p.an.AnnounceJoinPoolRequestPeriodically(ctx)
}

func (p *Blox) Shutdown(ctx context.Context) error {
log.Info("Shutdown in progress")
bErr := p.bl.Shutdown(ctx)
Expand Down
Loading

0 comments on commit e9ced6d

Please sign in to comment.