Skip to content

Commit

Permalink
Merge #1210
Browse files Browse the repository at this point in the history
1210: Subscription filter r=smnzhu a=smnzhu

Implements subscription filter. Will not work 100% until libp2p/go-libp2p-pubsub#449 is fixed.

https://github.com/dapperlabs/flow-go/issues/5801

Co-authored-by: Simon Zhu <[email protected]>
Co-authored-by: vishal <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2021
2 parents 28a6922 + 3dab281 commit 03cba83
Show file tree
Hide file tree
Showing 22 changed files with 406 additions and 139 deletions.
18 changes: 13 additions & 5 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"

"github.com/libp2p/go-libp2p-core/host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/crypto"
Expand Down Expand Up @@ -102,11 +104,11 @@ func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {

anb.
Component("unstaked sync request proxy", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
proxyEngine = splitter.New(node.Logger, engine.UnstakedSyncCommittee)
proxyEngine = splitter.New(node.Logger, engine.PublicSyncCommittee)

// register the proxy engine with the unstaked network
var err error
unstakedNetworkConduit, err = node.Network.Register(engine.UnstakedSyncCommittee, proxyEngine)
unstakedNetworkConduit, err = node.Network.Register(engine.PublicSyncCommittee, proxyEngine)
if err != nil {
return nil, fmt.Errorf("could not register unstaked sync request proxy: %w", err)
}
Expand Down Expand Up @@ -213,13 +215,19 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
})
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
// no connection gater
SetConnectionManager(connManager).
// act as a DHT server
SetDHTOptions(dhtOptions...).
SetPubsubOptions(p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)...).
SetPubsubOptions(psOpts...).
SetLogger(builder.Logger).
SetResolver(resolver).
Build(ctx)
Expand All @@ -246,7 +254,7 @@ func (builder *StakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
builder.RootBlock.ID().String(),
builder.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating to allow unstaked nodes to connect
builder.IDTranslator,
Expand Down
6 changes: 3 additions & 3 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

return func() (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
// unlike the staked side of the network where currently all the node addresses are known upfront,
// for the unstaked side of the network, the nodes need to discover each other using DHT Discovery.
Expand Down Expand Up @@ -269,7 +269,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Con
return nil, err
}

anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.UnstakedSyncCommittee)
anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.PublicSyncCommittee)

anb.Logger.Info().Msgf("network will run on address: %s", anb.BindAddr)

Expand Down Expand Up @@ -304,7 +304,7 @@ func (anb *UnstakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
anb.RootBlock.ID().String(),
anb.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating for the unstaked nodes
anb.IDTranslator,
Expand Down
6 changes: 4 additions & 2 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
fnb.Me.NodeID(),
myAddr,
fnb.NetworkKey,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.RootChainID,
fnb.IdentityProvider,
p2p.DefaultMaxPubSubMsgSize,
fnb.Metrics.Network,
pingProvider,
Expand All @@ -204,7 +206,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
libP2PNodeFactory,
fnb.Me.NodeID(),
fnb.Metrics.Network,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.BaseConfig.UnicastMessageTimeout,
true,
fnb.IDTranslator,
Expand Down
107 changes: 70 additions & 37 deletions engine/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,41 @@ import (
)

// init is called first time this package is imported.
// It creates and initializes the channelRoleMap map.
// It creates and initializes channelRoleMap and clusterChannelPrefixRoleMap.
func init() {
initializeChannelRoleMap()
}

// channelRoleMap keeps a map between channels and the list of flow roles involved in them.
var channelRoleMap map[network.Channel]flow.RoleList

// clusterChannelPrefixRoleMap keeps a map between cluster channel prefixes and the list of flow roles involved in them.
var clusterChannelPrefixRoleMap map[string]flow.RoleList

// RolesByChannel returns list of flow roles involved in the channel.
// If the given channel is a public channel, the returned list will
// contain all roles.
func RolesByChannel(channel network.Channel) (flow.RoleList, bool) {
if clusterChannel, isCluster := ClusterChannel(channel); isCluster {
// replaces channel with the stripped-off prefix
channel = clusterChannel
if IsClusterChannel(channel) {
return ClusterChannelRoles(channel), true
}
if PublicChannels().Contains(channel) {
return flow.Roles(), true
}
roles, ok := channelRoleMap[channel]
return roles, ok
}

// Exists returns true if channel exists in channelRoleMap.
// At the current state, any developer-defined channel should be added
// to channelRoleMap as a constant channel type manually.
// Exists returns true if the channel exists.
func Exists(channel network.Channel) bool {
_, exists := RolesByChannel(channel)
return exists || UnstakedChannels().Contains(channel)
if _, ok := RolesByChannel(channel); ok {
return true
}

return false
}

// ChannelsByRole returns a list of all channels the role subscribes to.
// ChannelsByRole returns a list of all channels the role subscribes to (except cluster-based channels and public channels).
func ChannelsByRole(role flow.Role) network.ChannelList {
channels := make(network.ChannelList, 0)
for channel, roles := range channelRoleMap {
Expand All @@ -63,10 +71,9 @@ func UniqueChannels(channels network.ChannelList) network.ChannelList {
// has already been added to uniques.
// We use identifier of RoleList to determine its uniqueness.
for _, channel := range channels {
id := channelRoleMap[channel].ID()

// non-cluster channel deduplicated based identifier of role list
if _, cluster := ClusterChannel(channel); !cluster {
if !IsClusterChannel(channel) {
id := channelRoleMap[channel].ID()
if _, ok := added[id]; ok {
// a channel with same RoleList already added, hence skips
continue
Expand All @@ -80,20 +87,21 @@ func UniqueChannels(channels network.ChannelList) network.ChannelList {
return uniques
}

// Channels returns all channels that nodes of any role have subscribed to.
// Channels returns all channels that nodes of any role have subscribed to (except cluster-based channels).
func Channels() network.ChannelList {
channels := make(network.ChannelList, 0)
for channel := range channelRoleMap {
channels = append(channels, channel)
}
channels = append(channels, PublicChannels()...)

return channels
}

// UnstakedChannels returns all channels that unstaked nodes can send messages on.
func UnstakedChannels() network.ChannelList {
// PublicChannels returns all channels that are used on the public network.
func PublicChannels() network.ChannelList {
return network.ChannelList{
UnstakedSyncCommittee,
PublicSyncCommittee,
}
}

Expand All @@ -106,11 +114,11 @@ const (

// Channels for consensus protocols
ConsensusCommittee = network.Channel("consensus-committee")
consensusClusterPrefix = network.Channel("consensus-cluster") // dynamic channel, use ChannelConsensusCluster function
consensusClusterPrefix = "consensus-cluster" // dynamic channel, use ChannelConsensusCluster function

// Channels for protocols actively synchronizing state across nodes
SyncCommittee = network.Channel("sync-committee")
syncClusterPrefix = network.Channel("sync-cluster") // dynamic channel, use ChannelSyncCluster function
syncClusterPrefix = "sync-cluster" // dynamic channel, use ChannelSyncCluster function
SyncExecution = network.Channel("sync-execution")

// Channels for dkg communication
Expand Down Expand Up @@ -141,8 +149,8 @@ const (
ProvideReceiptsByBlockID = RequestReceiptsByBlockID
ProvideApprovalsByChunk = RequestApprovalsByChunk

// Unstaked network channels
UnstakedSyncCommittee = network.Channel("unstaked-sync-committee")
// Public network channels
PublicSyncCommittee = network.Channel("public-sync-committee")
)

// initializeChannelRoleMap initializes an instance of channelRoleMap and populates it with the channels and their
Expand All @@ -161,7 +169,7 @@ func initializeChannelRoleMap() {
channelRoleMap[ConsensusCommittee] = flow.RoleList{flow.RoleConsensus}

// Channels for protocols actively synchronizing state across nodes
channelRoleMap[SyncCommittee] = flow.RoleList{flow.RoleConsensus}
channelRoleMap[SyncCommittee] = flow.Roles()
channelRoleMap[SyncExecution] = flow.RoleList{flow.RoleExecution}

// Channels for DKG communication
Expand All @@ -177,7 +185,7 @@ func initializeChannelRoleMap() {
channelRoleMap[PushApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}

// Channels for actively requesting missing entities
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution}
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess}
channelRoleMap[RequestChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification}
channelRoleMap[RequestReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution}
channelRoleMap[RequestApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}
Expand All @@ -190,40 +198,65 @@ func initializeChannelRoleMap() {
flow.RoleAccess}
channelRoleMap[ReceiveApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}

channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution}
channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess}
channelRoleMap[ProvideChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification}
channelRoleMap[ProvideReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution}
channelRoleMap[ProvideApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}

channelRoleMap[syncClusterPrefix] = flow.RoleList{flow.RoleCollection}
channelRoleMap[consensusClusterPrefix] = flow.RoleList{flow.RoleCollection}
clusterChannelPrefixRoleMap = make(map[string]flow.RoleList)

clusterChannelPrefixRoleMap[syncClusterPrefix] = flow.RoleList{flow.RoleCollection}
clusterChannelPrefixRoleMap[consensusClusterPrefix] = flow.RoleList{flow.RoleCollection}
}

// ClusterChannel returns true if channel is cluster-based.
// At the current implementation, only collection nodes are involved in a cluster-based channels.
// If the channel is a cluster-based one, this method also strips off the channel prefix and returns it.
func ClusterChannel(channel network.Channel) (network.Channel, bool) {
if strings.HasPrefix(channel.String(), syncClusterPrefix.String()) {
return syncClusterPrefix, true
// ClusterChannelRoles returns the list of roles that are involved in the given cluster-based channel.
func ClusterChannelRoles(clusterChannel network.Channel) flow.RoleList {
if prefix, ok := clusterChannelPrefix(clusterChannel); ok {
return clusterChannelPrefixRoleMap[prefix]
}

if strings.HasPrefix(channel.String(), consensusClusterPrefix.String()) {
return consensusClusterPrefix, true
return flow.RoleList{}
}

func clusterChannelPrefix(clusterChannel network.Channel) (string, bool) {
for prefix := range clusterChannelPrefixRoleMap {
if strings.HasPrefix(clusterChannel.String(), prefix) {
return prefix, true
}
}

return "", false
}

// IsClusterChannel returns true if channel is cluster-based.
// Currently, only collection nodes are involved in a cluster-based channels.
func IsClusterChannel(channel network.Channel) bool {
_, ok := clusterChannelPrefix(channel)
return ok
}

// TopicFromChannel returns the unique LibP2P topic form the channel.
// The channel is made up of name string suffixed with root block id.
// The root block id is used to prevent cross talks between nodes on different sporks.
func TopicFromChannel(channel network.Channel, rootBlockID string) network.Topic {
func TopicFromChannel(channel network.Channel, rootBlockID flow.Identifier) network.Topic {
// skip root block suffix, if this is a cluster specific channel. A cluster specific channel is inherently
// unique for each epoch
if strings.HasPrefix(channel.String(), syncClusterPrefix.String()) || strings.HasPrefix(string(channel), consensusClusterPrefix.String()) {
if IsClusterChannel(channel) {
return network.Topic(channel)
}
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID))
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID.String()))
}

func ChannelFromTopic(topic network.Topic) (network.Channel, bool) {
if IsClusterChannel(network.Channel(topic)) {
return network.Channel(topic), true
}

if index := strings.LastIndex(topic.String(), "/"); index != -1 {
return network.Channel(topic[:index]), true
}

return "", false
}

// ChannelConsensusCluster returns a dynamic cluster consensus channel based on
Expand Down
23 changes: 12 additions & 11 deletions engine/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,33 @@ func TestGetChannelByRole(t *testing.T) {
// - TestMetric
// the roles list should contain collection and consensus roles
topics := ChannelsByRole(flow.RoleVerification)
assert.Len(t, topics, 7)
assert.Len(t, topics, 8)
assert.Contains(t, topics, PushBlocks)
assert.Contains(t, topics, PushReceipts)
assert.Contains(t, topics, PushApprovals)
assert.Contains(t, topics, ProvideApprovalsByChunk)
assert.Contains(t, topics, RequestChunks)
assert.Contains(t, topics, TestMetrics)
assert.Contains(t, topics, TestNetwork)
assert.Contains(t, topics, SyncCommittee)
}

// TestIsClusterChannel verifies the correctness of ClusterChannel method
// against cluster and non-cluster channel.
func TestIsClusterChannel(t *testing.T) {
// creates a consensus cluster channel and verifies it
conClusterChannel := ChannelConsensusCluster("some-consensus-cluster-id")
clusterChannel, ok := ClusterChannel(conClusterChannel)
ok := IsClusterChannel(conClusterChannel)
require.True(t, ok)
require.Equal(t, clusterChannel, consensusClusterPrefix)

// creates a sync cluster channel and verifies it
syncClusterChannel := ChannelSyncCluster("some-sync-cluster-id")
clusterChannel, ok = ClusterChannel(syncClusterChannel)
ok = IsClusterChannel(syncClusterChannel)
require.True(t, ok)
require.Equal(t, clusterChannel, syncClusterPrefix)

// non-cluster channel should not be verified
clusterChannel, ok = ClusterChannel("non-cluster-channel-id")
ok = IsClusterChannel("non-cluster-channel-id")
require.False(t, ok)
require.Empty(t, clusterChannel)
}

// TestUniqueChannels_Uniqueness verifies that non-cluster channels returned by
Expand All @@ -100,7 +98,7 @@ func TestUniqueChannels_Uniqueness(t *testing.T) {
visited := make(map[flow.Identifier]struct{})
for _, channel := range uniques {

if _, ok := ClusterChannel(channel); ok {
if IsClusterChannel(channel) {
continue //only considering non-cluster channel in this test case
}

Expand All @@ -121,10 +119,13 @@ func TestUniqueChannels_Uniqueness(t *testing.T) {
// We use the identifier of RoleList to determine their uniqueness.
func TestUniqueChannels_ClusterChannels(t *testing.T) {
channels := ChannelsByRole(flow.RoleCollection)
consensusCluster := ChannelConsensusCluster(flow.Emulator)
syncCluster := ChannelSyncCluster(flow.Emulator)
channels = append(channels, consensusCluster, syncCluster)
uniques := UniqueChannels(channels)
// collection role has two cluster and one non-cluster channels all with the same RoleList.
// Hence all of them should be returned as unique channels.
require.Contains(t, uniques, syncClusterPrefix) // cluster channel
require.Contains(t, uniques, consensusClusterPrefix) // cluster channel
require.Contains(t, uniques, PushTransactions) // non-cluster channel
require.Contains(t, uniques, syncCluster) // cluster channel
require.Contains(t, uniques, consensusCluster) // cluster channel
require.Contains(t, uniques, PushTransactions) // non-cluster channel
}
2 changes: 1 addition & 1 deletion model/flow/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Role) UnmarshalText(text []byte) error {
return err
}

func Roles() []Role {
func Roles() RoleList {
return []Role{RoleCollection, RoleConsensus, RoleExecution, RoleVerification, RoleAccess}
}

Expand Down
Loading

0 comments on commit 03cba83

Please sign in to comment.