From 228d40bf0b78ee22f3e00b55468d5a05bebcc542 Mon Sep 17 00:00:00 2001 From: Patryk Osmaczko Date: Tue, 21 Jan 2025 13:34:49 +0100 Subject: [PATCH] chore_: remove wakuv1 and wakuv2 wrappers --- eth-node/bridge/geth/node.go | 5 +- mailserver/mailserver.go | 3 +- mailserver/mailserver_db_postgres_test.go | 4 +- protocol/common/message_sender_test.go | 3 +- ...es_messenger_shared_member_address_test.go | 7 +- ...munities_messenger_test_suite_base_test.go | 3 +- ...nities_messenger_token_permissions_test.go | 10 +- protocol/messenger_base_test.go | 3 +- .../messenger_communities_sharding_test.go | 5 +- .../messenger_delete_message_for_me_test.go | 3 +- protocol/messenger_identity_image_test.go | 5 +- protocol/messenger_messages_tracking_test.go | 5 +- protocol/messenger_offline_test.go | 10 +- protocol/messenger_peersyncing_test.go | 3 +- protocol/messenger_raw_message_resend_test.go | 5 +- protocol/messenger_settings_test.go | 3 +- protocol/messenger_storenode_comunity_test.go | 23 +- protocol/messenger_storenode_request_test.go | 65 ++- protocol/messenger_sync_chat_test.go | 3 +- ...messenger_sync_customization_color_test.go | 3 +- .../messenger_sync_keycard_change_test.go | 3 +- .../messenger_sync_keycards_state_test.go | 3 +- .../messenger_sync_saved_addresses_test.go | 3 +- protocol/messenger_sync_settings_test.go | 3 +- protocol/messenger_testing_utils.go | 12 +- protocol/messenger_waku_wrapper_test.go | 8 +- protocol/push_notification_test.go | 3 +- protocol/transport/filters_manager_test.go | 3 +- protocol/waku_builder_test.go | 15 +- services/wakuext/api_test.go | 3 +- waku/bridge/envelope.go | 60 --- waku/bridge/envelope_error.go | 45 --- waku/bridge/envelope_event.go | 58 --- waku/bridge/mailserver_response.go | 21 - waku/bridge/subscription.go | 30 -- waku/bridge/waku.go | 368 ----------------- waku/bridge/wakuv2.go | 376 ------------------ waku/types/envelopes.go | 1 + waku/types/topic.go | 6 + waku/types/waku.go | 3 + wakuv1/api.go | 16 +- wakuv1/bridge.go | 146 +++++++ wakuv1/common/filter.go | 4 + wakuv1/simulation_test.go | 7 +- wakuv1/waku.go | 78 +++- wakuv1/waku_test.go | 18 +- wakuv1/waku_unsupported.go | 155 ++++++++ wakuv1/waku_version_test.go | 8 +- wakuv2/api.go | 8 +- wakuv2/bridge.go | 81 ++++ wakuv2/common/filter.go | 4 + wakuv2/waku.go | 158 +++++++- wakuv2/waku_test.go | 12 +- wakuv2/waku_unsupported.go | 11 + 54 files changed, 760 insertions(+), 1141 deletions(-) delete mode 100644 waku/bridge/envelope.go delete mode 100644 waku/bridge/envelope_error.go delete mode 100644 waku/bridge/envelope_event.go delete mode 100644 waku/bridge/mailserver_response.go delete mode 100644 waku/bridge/subscription.go delete mode 100644 waku/bridge/waku.go delete mode 100644 waku/bridge/wakuv2.go create mode 100644 wakuv1/bridge.go create mode 100644 wakuv1/waku_unsupported.go create mode 100644 wakuv2/bridge.go create mode 100644 wakuv2/waku_unsupported.go diff --git a/eth-node/bridge/geth/node.go b/eth-node/bridge/geth/node.go index cb97ca663d5..f4324ee52d3 100644 --- a/eth-node/bridge/geth/node.go +++ b/eth-node/bridge/geth/node.go @@ -15,7 +15,6 @@ import ( gethnode "github.com/status-im/status-go/eth-node/node" enstypes "github.com/status-im/status-go/eth-node/types/ens" - wakubridge "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -50,7 +49,7 @@ func (w *gethNodeWrapper) GetWaku(ctx interface{}) (wakutypes.Waku, error) { return nil, errors.New("waku service is not available") } - return wakubridge.NewGethWakuWrapper(w.waku1), nil + return w.waku1, nil } func (w *gethNodeWrapper) GetWakuV2(ctx interface{}) (wakutypes.Waku, error) { @@ -58,7 +57,7 @@ func (w *gethNodeWrapper) GetWakuV2(ctx interface{}) (wakutypes.Waku, error) { return nil, errors.New("waku service is not available") } - return wakubridge.NewGethWakuV2Wrapper(w.waku2), nil + return w.waku2, nil } func (w *gethNodeWrapper) AddPeer(url string) error { diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index e81983085d3..9a306231fb1 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -35,7 +35,6 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" "github.com/status-im/status-go/wakuv1" wakuv1common "github.com/status-im/status-go/wakuv1/common" @@ -126,7 +125,7 @@ func (s *WakuMailServer) Close() { } func (s *WakuMailServer) Archive(env *wakuv1common.Envelope) { - s.ms.Archive(bridge.NewWakuEnvelope(env)) + s.ms.Archive(wakuv1.NewWakuEnvelope(env)) } func (s *WakuMailServer) Deliver(peerID []byte, req wakuv1common.MessagesRequest) { diff --git a/mailserver/mailserver_db_postgres_test.go b/mailserver/mailserver_db_postgres_test.go index 21ba5d8e4d1..c72a5ce766b 100644 --- a/mailserver/mailserver_db_postgres_test.go +++ b/mailserver/mailserver_db_postgres_test.go @@ -17,8 +17,8 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/postgres" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" + "github.com/status-im/status-go/wakuv1" wakuv1common "github.com/status-im/status-go/wakuv1/common" ) @@ -126,5 +126,5 @@ func newTestEnvelope(topic []byte) (wakutypes.Envelope, error) { if err != nil { return nil, err } - return bridge.NewWakuEnvelope(envelope), nil + return wakuv1.NewWakuEnvelope(envelope), nil } diff --git a/protocol/common/message_sender_test.go b/protocol/common/message_sender_test.go index 76c201229ca..8169f2ff332 100644 --- a/protocol/common/message_sender_test.go +++ b/protocol/common/message_sender_test.go @@ -6,7 +6,6 @@ import ( transport2 "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/t/helpers" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" "github.com/status-im/status-go/wakuv1" @@ -74,7 +73,7 @@ func (s *MessageSenderSuite) SetupTest() { s.Require().NoError(shh.Start()) whisperTransport, err := transport2.NewTransport( - bridge.NewGethWakuWrapper(shh), + shh, identity, database, "waku_keys", diff --git a/protocol/communities_messenger_shared_member_address_test.go b/protocol/communities_messenger_shared_member_address_test.go index 527a0d11e7d..93f48aa4114 100644 --- a/protocol/communities_messenger_shared_member_address_test.go +++ b/protocol/communities_messenger_shared_member_address_test.go @@ -18,7 +18,6 @@ import ( "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -89,13 +88,13 @@ func (s *MessengerCommunitiesSharedMemberAddressSuite) TearDownTest() { TearDownMessenger(&s.Suite, s.bob) TearDownMessenger(&s.Suite, s.alice) if s.ownerWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.ownerWaku).Stop()) + s.Require().NoError(s.ownerWaku.Stop()) } if s.bobWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.bobWaku).Stop()) + s.Require().NoError(s.bobWaku.Stop()) } if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } _ = s.logger.Sync() } diff --git a/protocol/communities_messenger_test_suite_base_test.go b/protocol/communities_messenger_test_suite_base_test.go index e4c73931148..efcadf69148 100644 --- a/protocol/communities_messenger_test_suite_base_test.go +++ b/protocol/communities_messenger_test_suite_base_test.go @@ -18,7 +18,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -52,7 +51,7 @@ func (s *CommunitiesMessengerTestSuiteBase) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) } diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 38fed24d867..d25ab33e63b 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -32,7 +32,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/services/wallet/thirdparty" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -200,13 +199,13 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TearDownTest() { TearDownMessenger(&s.Suite, s.bob) TearDownMessenger(&s.Suite, s.alice) if s.ownerWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.ownerWaku).Stop()) + s.Require().NoError(s.ownerWaku.Stop()) } if s.bobWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.bobWaku).Stop()) + s.Require().NoError(s.bobWaku.Stop()) } if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } _ = s.logger.Sync() } @@ -494,7 +493,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions( } wakuStoreNode := NewTestWakuV2(&s.Suite, cfg) - storeNodeListenAddresses := wakuStoreNode.ListenAddresses() + storeNodeListenAddresses, err := wakuStoreNode.ListenAddresses() + s.Require().NoError(err) s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) storeNodeAddress := storeNodeListenAddresses[0] diff --git a/protocol/messenger_base_test.go b/protocol/messenger_base_test.go index 66a864b5db7..80af1b22b47 100644 --- a/protocol/messenger_base_test.go +++ b/protocol/messenger_base_test.go @@ -13,7 +13,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -25,7 +24,7 @@ func (s *MessengerBaseTestSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.m = s.newMessenger() diff --git a/protocol/messenger_communities_sharding_test.go b/protocol/messenger_communities_sharding_test.go index c782b4668ba..032f7f0d5fb 100644 --- a/protocol/messenger_communities_sharding_test.go +++ b/protocol/messenger_communities_sharding_test.go @@ -16,7 +16,6 @@ import ( "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -98,13 +97,13 @@ func (s *MessengerCommunitiesShardingSuite) TearDownTest() { TearDownMessenger(&s.Suite, s.owner) } if s.ownerWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.ownerWaku).Stop()) + s.Require().NoError(s.ownerWaku.Stop()) } if s.alice != nil { TearDownMessenger(&s.Suite, s.alice) } if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } _ = s.logger.Sync() } diff --git a/protocol/messenger_delete_message_for_me_test.go b/protocol/messenger_delete_message_for_me_test.go index 146e9e8c472..bd70192f79e 100644 --- a/protocol/messenger_delete_message_for_me_test.go +++ b/protocol/messenger_delete_message_for_me_test.go @@ -16,7 +16,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -63,7 +62,7 @@ func (s *MessengerDeleteMessageForMeSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.alice1 = s.newMessenger() diff --git a/protocol/messenger_identity_image_test.go b/protocol/messenger_identity_image_test.go index d5c563d1876..5a4cdcbfc7c 100644 --- a/protocol/messenger_identity_image_test.go +++ b/protocol/messenger_identity_image_test.go @@ -23,7 +23,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -50,12 +49,12 @@ func (s *MessengerProfilePictureHandlerSuite) SetupSuite() { config.MinimumAcceptedPoW = 0 wakuLogger := s.logger.Named("Waku") shh := wakuv1.New(&config, wakuLogger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) } func (s *MessengerProfilePictureHandlerSuite) TearDownSuite() { - _ = bridge.GetGethWakuFrom(s.shh).Stop() + _ = s.shh.Stop() _ = s.logger.Sync() } diff --git a/protocol/messenger_messages_tracking_test.go b/protocol/messenger_messages_tracking_test.go index 3c1be29188f..3c5cf55bbe2 100644 --- a/protocol/messenger_messages_tracking_test.go +++ b/protocol/messenger_messages_tracking_test.go @@ -17,7 +17,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/signal" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -108,14 +107,14 @@ func (s *MessengerMessagesTrackingSuite) TearDownTest() { } if s.bobWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.bobWaku).Stop()) + s.Require().NoError(s.bobWaku.Stop()) } if s.alice != nil { TearDownMessenger(&s.Suite, s.alice) } if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } _ = s.logger.Sync() diff --git a/protocol/messenger_offline_test.go b/protocol/messenger_offline_test.go index 829acf96a33..f49878212a0 100644 --- a/protocol/messenger_offline_test.go +++ b/protocol/messenger_offline_test.go @@ -15,8 +15,8 @@ import ( "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/wakuv2" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -84,20 +84,20 @@ func (s *MessengerOfflineSuite) TearDownTest() { s.Require().NoError(s.owner.Shutdown()) } if s.ownerWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.ownerWaku).Stop()) + s.Require().NoError(s.ownerWaku.Stop()) } if s.bob != nil { s.Require().NoError(s.bob.Shutdown()) } if s.bobWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.bobWaku).Stop()) + s.Require().NoError(s.bobWaku.Stop()) } if s.alice != nil { s.Require().NoError(s.alice.Shutdown()) } if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } _ = s.logger.Sync() } @@ -140,7 +140,7 @@ func (s *MessengerOfflineSuite) TestCommunityOfflineEdit() { s.checkMessageDelivery(ctx, inputMessage) // Simulate going offline - wakuv2 := bridge.GetGethWakuV2From(s.aliceWaku) + wakuv2 := s.aliceWaku.(*wakuv2.Waku) wakuv2.SkipPublishToTopic(true) resp, err := s.alice.SendChatMessage(ctx, inputMessage) diff --git a/protocol/messenger_peersyncing_test.go b/protocol/messenger_peersyncing_test.go index 29c3a1387a3..4e0905e9208 100644 --- a/protocol/messenger_peersyncing_test.go +++ b/protocol/messenger_peersyncing_test.go @@ -17,7 +17,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -46,7 +45,7 @@ func (s *MessengerPeersyncingSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.owner = s.newMessenger() diff --git a/protocol/messenger_raw_message_resend_test.go b/protocol/messenger_raw_message_resend_test.go index d81ff7786c9..7cf132c940e 100644 --- a/protocol/messenger_raw_message_resend_test.go +++ b/protocol/messenger_raw_message_resend_test.go @@ -13,7 +13,6 @@ import ( "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -77,10 +76,10 @@ func (s *MessengerRawMessageResendTest) TearDownTest() { TearDownMessenger(&s.Suite, s.aliceMessenger) TearDownMessenger(&s.Suite, s.bobMessenger) if s.aliceWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.aliceWaku).Stop()) + s.Require().NoError(s.aliceWaku.Stop()) } if s.bobWaku != nil { - s.Require().NoError(bridge.GetGethWakuV2From(s.bobWaku).Stop()) + s.Require().NoError(s.bobWaku.Stop()) } _ = s.logger.Sync() } diff --git a/protocol/messenger_settings_test.go b/protocol/messenger_settings_test.go index e5a4e9ab773..86145b88dae 100644 --- a/protocol/messenger_settings_test.go +++ b/protocol/messenger_settings_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/status-im/status-go/waku/bridge" "github.com/status-im/status-go/wakuv1" "github.com/status-im/status-go/eth-node/crypto" @@ -31,7 +30,7 @@ func (s *MessengerSettingsSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) pk, err := crypto.GenerateKey() diff --git a/protocol/messenger_storenode_comunity_test.go b/protocol/messenger_storenode_comunity_test.go index 44093d6a6b8..e383249a001 100644 --- a/protocol/messenger_storenode_comunity_test.go +++ b/protocol/messenger_storenode_comunity_test.go @@ -29,7 +29,6 @@ import ( waku2 "github.com/status-im/status-go/wakuv2" wakuV2common "github.com/status-im/status-go/wakuv2/common" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -97,7 +96,8 @@ func (s *MessengerStoreNodeCommunitySuite) createStore(name string) (*waku2.Waku } storeNode := NewTestWakuV2(&s.Suite, cfg) - addresses := storeNode.ListenAddresses() + addresses, err := storeNode.ListenAddresses() + s.Require().NoError(err) s.Require().GreaterOrEqual(len(addresses), 1, "no storenode listen address") return storeNode, addresses[0] } @@ -113,7 +113,6 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd clusterID: shard.MainStatusShardCluster, } wakuV2 := NewTestWakuV2(&s.Suite, cfg) - wakuV2Wrapper := bridge.NewGethWakuV2Wrapper(wakuV2) privateKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -143,10 +142,10 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd ) } - messenger, err := newMessengerWithKey(wakuV2Wrapper, privateKey, logger, options) + messenger, err := newMessengerWithKey(wakuV2, privateKey, logger, options) s.Require().NoError(err) - return messenger, wakuV2Wrapper + return messenger, wakuV2 } func (s *MessengerStoreNodeCommunitySuite) createCommunityWithChat(m *Messenger) (*communities.Community, *Chat) { @@ -206,8 +205,8 @@ func (s *MessengerStoreNodeCommunitySuite) fetchCommunity(m *Messenger, communit return stats } -func (s *MessengerStoreNodeCommunitySuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { - envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100) +func (s *MessengerStoreNodeCommunitySuite) setupEnvelopesWatcher(wakuNode wakutypes.Waku, topic *wakutypes.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { + envelopesWatcher := make(chan wakutypes.EnvelopeEvent, 100) envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher) go func() { @@ -218,13 +217,13 @@ func (s *MessengerStoreNodeCommunitySuite) setupEnvelopesWatcher(wakuNode *waku2 return case envelopeEvent := <-envelopesWatcher: - if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable { + if envelopeEvent.Event != wakutypes.EventEnvelopeAvailable { continue } if topic != nil && *topic != envelopeEvent.Topic { continue } - envelope := wakuNode.GetEnvelope(envelopeEvent.Hash) + envelope := wakuNode.(*waku2.Waku).GetEnvelope(envelopeEvent.Hash) cb(envelope) s.logger.Debug("envelope available event for fetched content topic", zap.Any("envelopeEvent", envelopeEvent), @@ -236,7 +235,7 @@ func (s *MessengerStoreNodeCommunitySuite) setupEnvelopesWatcher(wakuNode *waku2 }() } -func (s *MessengerStoreNodeCommunitySuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string { +func (s *MessengerStoreNodeCommunitySuite) setupStoreNodeEnvelopesWatcher(topic *wakutypes.TopicType) <-chan string { storeNodeSubscription := make(chan string, 100) s.setupEnvelopesWatcher(s.storeNode, topic, func(envelope *wakuV2common.ReceivedMessage) { storeNodeSubscription <- envelope.Hash().String() @@ -295,8 +294,8 @@ func (s *MessengerStoreNodeCommunitySuite) TestSetStorenodeForCommunity_fetchMes err = s.bob.DialPeer(s.storeNodeAddress) s.Require().NoError(err) - ownerPeerID := bridge.GetGethWakuV2From(s.ownerWaku).PeerID() - bobPeerID := bridge.GetGethWakuV2From(s.bobWaku).PeerID() + ownerPeerID := s.ownerWaku.PeerID() + bobPeerID := s.bobWaku.PeerID() // 1. Owner creates a community community, chat := s.createCommunityWithChat(s.owner) diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 59bfcd92cf3..6560f8b6ff9 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -36,7 +36,6 @@ import ( waku2 "github.com/status-im/status-go/wakuv2" wakuV2common "github.com/status-im/status-go/wakuv2/common" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -171,7 +170,7 @@ func (s *MessengerStoreNodeRequestSuite) createStore() { } func (s *MessengerStoreNodeRequestSuite) tearDownOwner() { - _ = bridge.GetGethWakuV2From(s.ownerWaku).Stop() + _ = s.ownerWaku.Stop() TearDownMessenger(&s.Suite, s.owner) } @@ -183,14 +182,13 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() { clusterID: shard.MainStatusShardCluster, } - wakuV2 := NewTestWakuV2(&s.Suite, cfg) - s.ownerWaku = bridge.NewGethWakuV2Wrapper(wakuV2) + s.ownerWaku = NewTestWakuV2(&s.Suite, cfg) messengerLogger := s.logger.Named("owner-messenger") s.owner = s.newMessenger(s.ownerWaku, messengerLogger, &s.storeNodeAddress) // We force the owner to use the store node as relay peer - WaitForPeersConnected(&s.Suite, bridge.GetGethWakuV2From(s.ownerWaku), func() peer.IDSlice { + WaitForPeersConnected(&s.Suite, s.ownerWaku, func() peer.IDSlice { err := s.owner.DialPeer(s.storeNodeAddress) s.Require().NoError(err) return peer.IDSlice{s.wakuStoreNode.PeerID()} @@ -203,15 +201,14 @@ func (s *MessengerStoreNodeRequestSuite) createBob() { enableStore: false, clusterID: shard.MainStatusShardCluster, } - wakuV2 := NewTestWakuV2(&s.Suite, cfg) - s.bobWaku = bridge.NewGethWakuV2Wrapper(wakuV2) + s.bobWaku = NewTestWakuV2(&s.Suite, cfg) messengerLogger := s.logger.Named("bob-messenger") s.bob = s.newMessenger(s.bobWaku, messengerLogger, &s.storeNodeAddress) } func (s *MessengerStoreNodeRequestSuite) tearDownBob() { - _ = bridge.GetGethWakuV2From(s.bobWaku).Stop() + _ = s.bobWaku.Stop() TearDownMessenger(&s.Suite, s.bob) } @@ -317,8 +314,8 @@ func (s *MessengerStoreNodeRequestSuite) WaitForAvailableStoreNode(messenger *Me WaitForAvailableStoreNode(&s.Suite, messenger, ctx) } -func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { - envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100) +func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode wakutypes.Waku, topic *wakutypes.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { + envelopesWatcher := make(chan wakutypes.EnvelopeEvent, 100) envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher) go func() { @@ -329,13 +326,13 @@ func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.W return case envelopeEvent := <-envelopesWatcher: - if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable { + if envelopeEvent.Event != wakutypes.EventEnvelopeAvailable { continue } if topic != nil && *topic != envelopeEvent.Topic { continue } - envelope := wakuNode.GetEnvelope(envelopeEvent.Hash) + envelope := wakuNode.(*waku2.Waku).GetEnvelope(envelopeEvent.Hash) cb(envelope) s.logger.Debug("envelope available event for fetched content topic", zap.Any("envelopeEvent", envelopeEvent), @@ -347,7 +344,7 @@ func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.W }() } -func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string { +func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakutypes.TopicType) <-chan string { storeNodeSubscription := make(chan string, 100) s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) { storeNodeSubscription <- envelope.Hash().String() @@ -370,12 +367,13 @@ func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan st } func (s *MessengerStoreNodeRequestSuite) wakuListenAddress(waku *waku2.Waku) multiaddr.Multiaddr { - addresses := waku.ListenAddresses() + addresses, err := waku.ListenAddresses() + s.Require().NoError(err) s.Require().LessOrEqual(1, len(addresses)) return addresses[0] } -func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic *wakuV2common.TopicType, minimumCount int) { +func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic *wakutypes.TopicType, minimumCount int) { // Give some time for store node to put envelope into database. Otherwise, the test is flaky. // Although we subscribed to EnvelopeEvents and waited, the actual saving to database happens asynchronously. // It would be nice to implement a subscription for database storing event, but it isn't worth it right now. @@ -482,7 +480,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() { // Create a community community := s.createCommunity(s.owner) - contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + contentTopic := wakutypes.BytesToTopic(transport.ToTopic(community.IDString())) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Push spam to the same ContentTopic & PubsubTopic @@ -621,7 +619,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() { s.Require().NoError(err) contentTopicString := transport.ContactCodeTopic(&s.owner.identity.PublicKey) - contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(contentTopicString)) + contentTopic := wakutypes.BytesToTopic(transport.ToTopic(contentTopicString)) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Set display name, this will also publish contact code @@ -662,7 +660,7 @@ func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() { community := s.createCommunity(s.owner) s.fetchCommunity(s.bob, community.CommunityShard(), community) - contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + contentTopic := wakutypes.BytesToTopic(transport.ToTopic(community.IDString())) communityName := community.Name() storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) @@ -718,7 +716,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() { } shardTopic := transport.CommunityShardInfoTopic(community.IDString()) - contentContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(shardTopic)) + contentContentTopic := wakutypes.BytesToTopic(transport.ToTopic(shardTopic)) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentContentTopic) _, err = s.owner.SetCommunityShard(shardRequest) @@ -780,7 +778,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() { const descriptionsCount = 4 community := s.createCommunity(s.owner) - contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + contentTopic := wakutypes.BytesToTopic(transport.ToTopic(community.IDString())) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Push a few descriptions to the store node @@ -793,7 +791,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() { s.waitForEnvelopes(storeNodeSubscription, descriptionsCount-1) // Subscribe to received envelope - bobWakuV2 := bridge.GetGethWakuV2From(s.bobWaku) + bobWakuV2 := s.bobWaku.(*waku2.Waku) var receivedEnvelopes []*wakuV2common.ReceivedMessage s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) { @@ -1051,8 +1049,8 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { // Prepare things depending on the configuration nodesList := mailserversDB.DefaultMailserversByFleet(fleet) - descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID)) - shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID))) + descriptionContentTopic := wakutypes.BytesToTopic(transport.ToTopic(communityID)) + shardContentTopic := wakutypes.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID))) communityIDBytes, err := types.DecodeHex(communityID) s.Require().NoError(err) @@ -1094,8 +1092,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { clusterID: clusterID, } wakuCreationMutex.Lock() - wakuV2 := NewTestWakuV2(&s.Suite, cfg) - userWaku := bridge.NewGethWakuV2Wrapper(wakuV2) + userWaku := NewTestWakuV2(&s.Suite, cfg) wakuCreationMutex.Unlock() // @@ -1147,11 +1144,11 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { // Setup envelopes watcher to gather fetched envelopes - s.setupEnvelopesWatcher(wakuV2, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) { + s.setupEnvelopesWatcher(userWaku, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) { result.ShardEnvelopes = append(result.ShardEnvelopes, envelope) }) - s.setupEnvelopesWatcher(wakuV2, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) { + s.setupEnvelopesWatcher(userWaku, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) { result.Envelopes = append(result.Envelopes, envelope) }) @@ -1246,12 +1243,12 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { s.createBob() s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String())) - s.logger.Debug("owner node info", zap.String("peerID", bridge.GetGethWakuV2From(s.ownerWaku).PeerID().String())) - s.logger.Debug("bob node info", zap.String("peerID", bridge.GetGethWakuV2From(s.bobWaku).PeerID().String())) + s.logger.Debug("owner node info", zap.String("peerID", s.ownerWaku.PeerID().String())) + s.logger.Debug("bob node info", zap.String("peerID", s.bobWaku.PeerID().String())) // Connect to store node to force "online" status { - WaitForPeersConnected(&s.Suite, bridge.GetGethWakuV2From(s.bobWaku), func() peer.IDSlice { + WaitForPeersConnected(&s.Suite, s.bobWaku, func() peer.IDSlice { err := s.bob.DialPeer(storeAddress) s.Require().NoError(err) return peer.IDSlice{storePeerID} @@ -1264,7 +1261,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { // bob goes offline { - WaitForConnectionStatus(&s.Suite, bridge.GetGethWakuV2From(s.bobWaku), func() bool { + WaitForConnectionStatus(&s.Suite, s.bobWaku, func() bool { err := s.bob.DropPeer(storePeerID) s.Require().NoError(err) return false @@ -1277,7 +1274,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { // Setup store nodes envelopes watcher partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey()) topic := transport.ToTopic(partitionedTopic) - contentTopic := wakuV2common.BytesToTopic(topic) + contentTopic := wakutypes.BytesToTopic(topic) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Send contact request @@ -1295,7 +1292,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { // owner goes offline to prevent message resend and any other side effects // to go offline we disconnect from both relay and store peers - WaitForConnectionStatus(&s.Suite, bridge.GetGethWakuV2From(s.ownerWaku), func() bool { + WaitForConnectionStatus(&s.Suite, s.ownerWaku, func() bool { err := s.owner.DropPeer(storePeerID) s.Require().NoError(err) return false @@ -1308,7 +1305,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { // We don't enable it earlier to control when we connect to the store node. s.bob.config.codeControlFlags.AutoRequestHistoricMessages = true - WaitForPeersConnected(&s.Suite, bridge.GetGethWakuV2From(s.bobWaku), func() peer.IDSlice { + WaitForPeersConnected(&s.Suite, s.bobWaku, func() peer.IDSlice { err := s.bob.DialPeer(storeAddress) s.Require().NoError(err) return peer.IDSlice{storePeerID} diff --git a/protocol/messenger_sync_chat_test.go b/protocol/messenger_sync_chat_test.go index 177953e9762..449ba0f9797 100644 --- a/protocol/messenger_sync_chat_test.go +++ b/protocol/messenger_sync_chat_test.go @@ -15,7 +15,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -64,7 +63,7 @@ func (s *MessengerSyncChatSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.alice1 = s.newMessenger() diff --git a/protocol/messenger_sync_customization_color_test.go b/protocol/messenger_sync_customization_color_test.go index fe089a739d1..ca56e3b7adb 100644 --- a/protocol/messenger_sync_customization_color_test.go +++ b/protocol/messenger_sync_customization_color_test.go @@ -14,7 +14,6 @@ import ( "github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -38,7 +37,7 @@ func (s *MessengerSyncAccountCustomizationColorSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) pk, err := crypto.GenerateKey() diff --git a/protocol/messenger_sync_keycard_change_test.go b/protocol/messenger_sync_keycard_change_test.go index 3c927bd2249..fa73f57bc9e 100644 --- a/protocol/messenger_sync_keycard_change_test.go +++ b/protocol/messenger_sync_keycard_change_test.go @@ -14,7 +14,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -41,7 +40,7 @@ func (s *MessengerSyncKeycardChangeSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.main = s.newMessenger(s.shh) diff --git a/protocol/messenger_sync_keycards_state_test.go b/protocol/messenger_sync_keycards_state_test.go index 4d24186687f..6e484d321f0 100644 --- a/protocol/messenger_sync_keycards_state_test.go +++ b/protocol/messenger_sync_keycards_state_test.go @@ -14,7 +14,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -41,7 +40,7 @@ func (s *MessengerSyncKeycardsStateSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.main = s.newMessenger(s.shh) diff --git a/protocol/messenger_sync_saved_addresses_test.go b/protocol/messenger_sync_saved_addresses_test.go index 865a44f67b7..edf2df8f156 100644 --- a/protocol/messenger_sync_saved_addresses_test.go +++ b/protocol/messenger_sync_saved_addresses_test.go @@ -18,7 +18,6 @@ import ( "github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -45,7 +44,7 @@ func (s *MessengerSyncSavedAddressesSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.main = s.newMessenger(s.logger.Named("main")) diff --git a/protocol/messenger_sync_settings_test.go b/protocol/messenger_sync_settings_test.go index cde42572975..8f42233376e 100644 --- a/protocol/messenger_sync_settings_test.go +++ b/protocol/messenger_sync_settings_test.go @@ -16,7 +16,6 @@ import ( "github.com/status-im/status-go/services/stickers" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -84,7 +83,7 @@ func (s *MessengerSyncSettingsSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.alice = s.newMessenger() diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 96d2f0d3a17..5429295963b 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -17,8 +17,6 @@ import ( "github.com/status-im/status-go/protocol/identity" - waku2 "github.com/status-im/status-go/wakuv2" - "github.com/stretchr/testify/suite" "github.com/status-im/status-go/protocol/common" @@ -207,8 +205,9 @@ func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(co } } -func WaitForConnectionStatus(s *suite.Suite, waku *waku2.Waku, action func() bool) { - subscription := waku.SubscribeToConnStatusChanges() +func WaitForConnectionStatus(s *suite.Suite, waku wakutypes.Waku, action func() bool) { + subscription, err := waku.SubscribeToConnStatusChanges() + s.Require().NoError(err) defer subscription.Unsubscribe() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -239,8 +238,9 @@ func hasAllPeers(m map[peer.ID]wakutypes.WakuV2Peer, checkSlice peer.IDSlice) bo return true } -func WaitForPeersConnected(s *suite.Suite, waku *waku2.Waku, action func() peer.IDSlice) { - subscription := waku.SubscribeToConnStatusChanges() +func WaitForPeersConnected(s *suite.Suite, waku wakutypes.Waku, action func() peer.IDSlice) { + subscription, err := waku.SubscribeToConnStatusChanges() + s.Require().NoError(err) defer subscription.Unsubscribe() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/protocol/messenger_waku_wrapper_test.go b/protocol/messenger_waku_wrapper_test.go index 2c7bbc15be9..8328e5ad79b 100644 --- a/protocol/messenger_waku_wrapper_test.go +++ b/protocol/messenger_waku_wrapper_test.go @@ -5,22 +5,20 @@ import ( "go.uber.org/zap" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" "github.com/status-im/status-go/wakuv1" ) type testWakuWrapper struct { - *bridge.GethWakuWrapper + wakutypes.Waku api *testPublicWakuAPI } func newTestWaku(w *wakuv1.Waku) wakutypes.Waku { - wrapper := bridge.NewGethWakuWrapper(w) return &testWakuWrapper{ - GethWakuWrapper: wrapper.(*bridge.GethWakuWrapper), - api: newTestPublicWakuAPI(wakuv1.NewPublicWakuAPI(w)), + Waku: w, + api: newTestPublicWakuAPI(wakuv1.NewPublicWakuAPI(w)), } } diff --git a/protocol/push_notification_test.go b/protocol/push_notification_test.go index 30f36133366..df58ebea777 100644 --- a/protocol/push_notification_test.go +++ b/protocol/push_notification_test.go @@ -21,7 +21,6 @@ import ( "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/wakuv1" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -51,7 +50,7 @@ func (s *MessengerPushNotificationSuite) SetupTest() { config := wakuv1.DefaultConfig config.MinimumAcceptedPoW = 0 shh := wakuv1.New(&config, s.logger) - s.shh = bridge.NewGethWakuWrapper(shh) + s.shh = shh s.Require().NoError(shh.Start()) s.m = s.newMessenger(s.shh) diff --git a/protocol/transport/filters_manager_test.go b/protocol/transport/filters_manager_test.go index 8ba9ef12a81..b929b96b1cc 100644 --- a/protocol/transport/filters_manager_test.go +++ b/protocol/transport/filters_manager_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/waku/bridge" "github.com/status-im/status-go/wakuv1" _ "github.com/mutecomm/go-sqlcipher/v4" @@ -90,7 +89,7 @@ func (s *FiltersManagerSuite) SetupTest() { keysPersistence := newTestKeysPersistence() - waku := bridge.NewGethWakuWrapper(wakuv1.New(&wakuv1.DefaultConfig, nil)) + waku := wakuv1.New(&wakuv1.DefaultConfig, nil) s.chats, err = NewFiltersManager(keysPersistence, waku, s.manager[0].privateKey, s.logger) s.Require().NoError(err) diff --git a/protocol/waku_builder_test.go b/protocol/waku_builder_test.go index 595512fb604..cb77e3758bd 100644 --- a/protocol/waku_builder_test.go +++ b/protocol/waku_builder_test.go @@ -14,7 +14,6 @@ import ( "github.com/status-im/status-go/t/helpers" waku2 "github.com/status-im/status-go/wakuv2" - "github.com/status-im/status-go/waku/bridge" wakutypes "github.com/status-im/status-go/waku/types" ) @@ -72,8 +71,7 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku { } func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []wakutypes.Waku { - nodes := make([]*waku2.Waku, len(nodeNames)) - wrappers := make([]wakutypes.Waku, len(nodes)) + nodes := make([]wakutypes.Waku, len(nodeNames)) for i, name := range nodeNames { nodes[i] = NewTestWakuV2(s, testWakuV2Config{ @@ -90,16 +88,15 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s continue } - addrs := nodes[j].ListenAddresses() + addrs, err := nodes[j].ListenAddresses() + s.Require().NoError(err) s.Require().Greater(len(addrs), 0) - _, err := nodes[i].AddRelayPeer(addrs[0]) + _, err = nodes[i].AddRelayPeer(addrs[0]) s.Require().NoError(err) err = nodes[i].DialPeer(addrs[0]) s.Require().NoError(err) } } - for i, n := range nodes { - wrappers[i] = bridge.NewGethWakuV2Wrapper(n) - } - return wrappers + + return nodes } diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index 3e59aad6a63..9d777aecc13 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -17,7 +17,6 @@ import ( "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/ext" "github.com/status-im/status-go/t/helpers" - "github.com/status-im/status-go/waku/bridge" "github.com/status-im/status-go/wakuv1" "github.com/status-im/status-go/walletdatabase" ) @@ -35,7 +34,7 @@ func TestInitProtocol(t *testing.T) { db, err := leveldb.Open(storage.NewMemStorage(), nil) require.NoError(t, err) - waku := bridge.NewGethWakuWrapper(wakuv1.New(nil, nil)) + waku := wakuv1.New(nil, nil) privateKey, err := crypto.GenerateKey() require.NoError(t, err) diff --git a/waku/bridge/envelope.go b/waku/bridge/envelope.go deleted file mode 100644 index 1375dd6423d..00000000000 --- a/waku/bridge/envelope.go +++ /dev/null @@ -1,60 +0,0 @@ -package bridge - -import ( - "io" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/status-im/status-go/eth-node/types" - wakuv1common "github.com/status-im/status-go/wakuv1/common" - - wakutypes "github.com/status-im/status-go/waku/types" -) - -type wakuEnvelope struct { - env *wakuv1common.Envelope -} - -// NewWakuEnvelope returns an object that wraps Geth's Waku Envelope in a types interface. -func NewWakuEnvelope(e *wakuv1common.Envelope) wakutypes.Envelope { - return &wakuEnvelope{env: e} -} - -func (w *wakuEnvelope) Unwrap() interface{} { - return w.env -} - -func (w *wakuEnvelope) Hash() types.Hash { - return types.Hash(w.env.Hash()) -} - -func (w *wakuEnvelope) Bloom() []byte { - return w.env.Bloom() -} - -func (w *wakuEnvelope) PoW() float64 { - return w.env.PoW() -} - -func (w *wakuEnvelope) Expiry() uint32 { - return w.env.Expiry -} - -func (w *wakuEnvelope) TTL() uint32 { - return w.env.TTL -} - -func (w *wakuEnvelope) Topic() wakutypes.TopicType { - return wakutypes.TopicType(w.env.Topic) -} - -func (w *wakuEnvelope) Size() int { - return len(w.env.Data) -} - -func (w *wakuEnvelope) DecodeRLP(s *rlp.Stream) error { - return w.env.DecodeRLP(s) -} - -func (w *wakuEnvelope) EncodeRLP(writer io.Writer) error { - return rlp.Encode(writer, w.env) -} diff --git a/waku/bridge/envelope_error.go b/waku/bridge/envelope_error.go deleted file mode 100644 index 7b43af265ee..00000000000 --- a/waku/bridge/envelope_error.go +++ /dev/null @@ -1,45 +0,0 @@ -package bridge - -import ( - "github.com/status-im/status-go/eth-node/types" - wakutypes "github.com/status-im/status-go/waku/types" - - wakuv1common "github.com/status-im/status-go/wakuv1/common" - wakuv2common "github.com/status-im/status-go/wakuv2/common" -) - -// NewWakuEnvelopeErrorWrapper returns a wakutypes.EnvelopeError object that mimics Geth's EnvelopeError -func NewWakuEnvelopeErrorWrapper(envelopeError *wakuv1common.EnvelopeError) *wakutypes.EnvelopeError { - if envelopeError == nil { - panic("envelopeError should not be nil") - } - - return &wakutypes.EnvelopeError{ - Hash: types.Hash(envelopeError.Hash), - Code: mapGethErrorCode(envelopeError.Code), - Description: envelopeError.Description, - } -} - -// NewWakuEnvelopeErrorWrapper returns a wakutypes.EnvelopeError object that mimics Geth's EnvelopeError -func NewWakuV2EnvelopeErrorWrapper(envelopeError *wakuv2common.EnvelopeError) *wakutypes.EnvelopeError { - if envelopeError == nil { - panic("envelopeError should not be nil") - } - - return &wakutypes.EnvelopeError{ - Hash: types.Hash(envelopeError.Hash), - Code: mapGethErrorCode(envelopeError.Code), - Description: envelopeError.Description, - } -} - -func mapGethErrorCode(code uint) uint { - switch code { - case wakuv1common.EnvelopeTimeNotSynced: - return wakutypes.EnvelopeTimeNotSynced - case wakuv1common.EnvelopeOtherError: - return wakutypes.EnvelopeOtherError - } - return wakutypes.EnvelopeOtherError -} diff --git a/waku/bridge/envelope_event.go b/waku/bridge/envelope_event.go deleted file mode 100644 index 2184f1de310..00000000000 --- a/waku/bridge/envelope_event.go +++ /dev/null @@ -1,58 +0,0 @@ -package bridge - -import ( - "github.com/status-im/status-go/eth-node/types" - wakutypes "github.com/status-im/status-go/waku/types" - "github.com/status-im/status-go/wakuv1" - - wakuv1common "github.com/status-im/status-go/wakuv1/common" - wakuv2common "github.com/status-im/status-go/wakuv2/common" -) - -// NewWakuEnvelopeEventWrapper returns a wakutypes.EnvelopeEvent object that mimics Geth's EnvelopeEvent -func NewWakuEnvelopeEventWrapper(envelopeEvent *wakuv1common.EnvelopeEvent) *wakutypes.EnvelopeEvent { - if envelopeEvent == nil { - panic("envelopeEvent should not be nil") - } - - wrappedData := envelopeEvent.Data - switch data := envelopeEvent.Data.(type) { - case []wakuv1common.EnvelopeError: - wrappedData := make([]wakutypes.EnvelopeError, len(data)) - for index := range data { - wrappedData[index] = *NewWakuEnvelopeErrorWrapper(&data[index]) - } - case *wakuv1.MailServerResponse: - wrappedData = NewWakuMailServerResponseWrapper(data) - } - return &wakutypes.EnvelopeEvent{ - Event: wakutypes.EventType(envelopeEvent.Event), - Hash: types.Hash(envelopeEvent.Hash), - Batch: types.Hash(envelopeEvent.Batch), - Peer: types.EnodeID(envelopeEvent.Peer), - Data: wrappedData, - } -} - -// NewWakuV2EnvelopeEventWrapper returns a wakutypes.EnvelopeEvent object that mimics Geth's EnvelopeEvent -func NewWakuV2EnvelopeEventWrapper(envelopeEvent *wakuv2common.EnvelopeEvent) *wakutypes.EnvelopeEvent { - if envelopeEvent == nil { - panic("envelopeEvent should not be nil") - } - - wrappedData := envelopeEvent.Data - switch data := envelopeEvent.Data.(type) { - case []wakuv2common.EnvelopeError: - wrappedData := make([]wakutypes.EnvelopeError, len(data)) - for index := range data { - wrappedData[index] = *NewWakuV2EnvelopeErrorWrapper(&data[index]) - } - } - return &wakutypes.EnvelopeEvent{ - Event: wakutypes.EventType(envelopeEvent.Event), - Hash: types.Hash(envelopeEvent.Hash), - Batch: types.Hash(envelopeEvent.Batch), - Peer: types.EnodeID(envelopeEvent.Peer), - Data: wrappedData, - } -} diff --git a/waku/bridge/mailserver_response.go b/waku/bridge/mailserver_response.go deleted file mode 100644 index cbc0ee54055..00000000000 --- a/waku/bridge/mailserver_response.go +++ /dev/null @@ -1,21 +0,0 @@ -package bridge - -import ( - "github.com/status-im/status-go/eth-node/types" - wakutypes "github.com/status-im/status-go/waku/types" - - "github.com/status-im/status-go/wakuv1" -) - -// NewWakuMailServerResponseWrapper returns a wakutypes.MailServerResponse object that mimics Geth's MailServerResponse -func NewWakuMailServerResponseWrapper(mailServerResponse *wakuv1.MailServerResponse) *wakutypes.MailServerResponse { - if mailServerResponse == nil { - panic("mailServerResponse should not be nil") - } - - return &wakutypes.MailServerResponse{ - LastEnvelopeHash: types.Hash(mailServerResponse.LastEnvelopeHash), - Cursor: mailServerResponse.Cursor, - Error: mailServerResponse.Error, - } -} diff --git a/waku/bridge/subscription.go b/waku/bridge/subscription.go deleted file mode 100644 index 520e1fda9ab..00000000000 --- a/waku/bridge/subscription.go +++ /dev/null @@ -1,30 +0,0 @@ -package bridge - -import ( - "github.com/ethereum/go-ethereum/event" - - "github.com/status-im/status-go/waku/types" -) - -type gethSubscriptionWrapper struct { - subscription event.Subscription -} - -// NewGethSubscriptionWrapper returns an object that wraps Geth's Subscription in a types interface -func NewGethSubscriptionWrapper(subscription event.Subscription) types.Subscription { - if subscription == nil { - panic("subscription cannot be nil") - } - - return &gethSubscriptionWrapper{ - subscription: subscription, - } -} - -func (w *gethSubscriptionWrapper) Err() <-chan error { - return w.subscription.Err() -} - -func (w *gethSubscriptionWrapper) Unsubscribe() { - w.subscription.Unsubscribe() -} diff --git a/waku/bridge/waku.go b/waku/bridge/waku.go deleted file mode 100644 index f32ef3133e2..00000000000 --- a/waku/bridge/waku.go +++ /dev/null @@ -1,368 +0,0 @@ -package bridge - -import ( - "context" - "crypto/ecdsa" - "errors" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - - "github.com/waku-org/go-waku/waku/v2/api/history" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/enode" - gocommon "github.com/status-im/status-go/common" - "github.com/status-im/status-go/connection" - "github.com/status-im/status-go/wakuv1" - wakuv1common "github.com/status-im/status-go/wakuv1/common" - - "github.com/status-im/status-go/waku/types" - wakutypes "github.com/status-im/status-go/waku/types" -) - -type GethWakuWrapper struct { - waku *wakuv1.Waku -} - -// NewGethWakuWrapper returns an object that wraps Geth's Waku in a types interface -func NewGethWakuWrapper(w *wakuv1.Waku) wakutypes.Waku { - if w == nil { - panic("waku cannot be nil") - } - - return &GethWakuWrapper{ - waku: w, - } -} - -// GetGethWhisperFrom retrieves the underlying whisper Whisper struct from a wrapped Whisper interface -func GetGethWakuFrom(m wakutypes.Waku) *wakuv1.Waku { - return m.(*GethWakuWrapper).waku -} - -func (w *GethWakuWrapper) PublicWakuAPI() wakutypes.PublicWakuAPI { - return wakuv1.NewPublicWakuAPI(w.waku) -} - -func (w *GethWakuWrapper) Version() uint { - return 1 -} - -// Added for compatibility with waku V2 -func (w *GethWakuWrapper) PeerCount() int { - return -1 -} - -// Added for compatibility with waku V2 -func (w *GethWakuWrapper) StartDiscV5() error { - return errors.New("not available in WakuV1") -} - -// Added for compatibility with waku V2 -func (w *GethWakuWrapper) StopDiscV5() error { - return errors.New("not available in WakuV1") -} - -// PeerCount function only added for compatibility with waku V2 -func (w *GethWakuWrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { - return "", errors.New("not available in WakuV1") -} - -// SubscribeToPubsubTopic function only added for compatibility with waku V2 -func (w *GethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { - // not available in WakuV1 - return errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error { - // not available in WakuV1 - return errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { - // not available in WakuV1 - return nil, errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { - // not available in WakuV1 - return errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) RemovePubsubTopicKey(topic string) error { - // not available in WakuV1 - return errors.New("not available in WakuV1") -} - -// AddRelayPeer function only added for compatibility with waku V2 -func (w *GethWakuWrapper) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { - return "", errors.New("not available in WakuV1") -} - -// DialPeer function only added for compatibility with waku V2 -func (w *GethWakuWrapper) DialPeer(address multiaddr.Multiaddr) error { - return errors.New("not available in WakuV1") -} - -// DialPeerByID function only added for compatibility with waku V2 -func (w *GethWakuWrapper) DialPeerByID(peerID peer.ID) error { - return errors.New("not available in WakuV1") -} - -// ListenAddresses function only added for compatibility with waku V2 -func (w *GethWakuWrapper) ListenAddresses() ([]multiaddr.Multiaddr, error) { - return nil, errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) RelayPeersByTopic(topic string) (*wakutypes.PeerList, error) { - return nil, errors.New("not available in WakuV1") -} - -// ENR function only added for compatibility with waku V2 -func (w *GethWakuWrapper) ENR() (*enode.Node, error) { - return nil, errors.New("not available in WakuV1") -} - -// PeerCount function only added for compatibility with waku V2 -func (w *GethWakuWrapper) DropPeer(peerID peer.ID) error { - return errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { - return nil, errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []wakutypes.TopicType) error { - return errors.New("not available in WakuV1") -} - -// Peers function only added for compatibility with waku V2 -func (w *GethWakuWrapper) Peers() wakutypes.PeerStats { - p := make(wakutypes.PeerStats) - return p -} - -// MinPow returns the PoW value required by this node. -func (w *GethWakuWrapper) MinPow() float64 { - return w.waku.MinPow() -} - -// MaxMessageSize returns the MaxMessageSize set -func (w *GethWakuWrapper) MaxMessageSize() uint32 { - return w.waku.MaxMessageSize() -} - -// BloomFilter returns the aggregated bloom filter for all the topics of interest. -// The nodes are required to send only messages that match the advertised bloom filter. -// If a message does not match the bloom, it will tantamount to spam, and the peer will -// be disconnected. -func (w *GethWakuWrapper) BloomFilter() []byte { - return w.waku.BloomFilter() -} - -// GetCurrentTime returns current time. -func (w *GethWakuWrapper) GetCurrentTime() time.Time { - return w.waku.CurrentTime() -} - -func (w *GethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- wakutypes.EnvelopeEvent) types.Subscription { - events := make(chan wakuv1common.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper - go func() { - defer gocommon.LogOnPanic() - for e := range events { - eventsProxy <- *NewWakuEnvelopeEventWrapper(&e) - } - }() - - return NewGethSubscriptionWrapper(w.waku.SubscribeEnvelopeEvents(events)) -} - -func (w *GethWakuWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { - return w.waku.GetPrivateKey(id) -} - -// AddKeyPair imports a asymmetric private key and returns a deterministic identifier. -func (w *GethWakuWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { - return w.waku.AddKeyPair(key) -} - -// DeleteKeyPair deletes the key with the specified ID if it exists. -func (w *GethWakuWrapper) DeleteKeyPair(keyID string) bool { - return w.waku.DeleteKeyPair(keyID) -} - -func (w *GethWakuWrapper) AddSymKeyDirect(key []byte) (string, error) { - return w.waku.AddSymKeyDirect(key) -} - -func (w *GethWakuWrapper) AddSymKeyFromPassword(password string) (string, error) { - return w.waku.AddSymKeyFromPassword(password) -} - -func (w *GethWakuWrapper) DeleteSymKey(id string) bool { - return w.waku.DeleteSymKey(id) -} - -func (w *GethWakuWrapper) GetSymKey(id string) ([]byte, error) { - return w.waku.GetSymKey(id) -} - -func (w *GethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, error) { - var ( - err error - keyAsym *ecdsa.PrivateKey - keySym []byte - ) - - if opts.SymKeyID != "" { - keySym, err = w.GetSymKey(opts.SymKeyID) - if err != nil { - return "", err - } - } - if opts.PrivateKeyID != "" { - keyAsym, err = w.GetPrivateKey(opts.PrivateKeyID) - if err != nil { - return "", err - } - } - - f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PoW, opts.Topics) - if err != nil { - return "", err - } - - id, err := w.waku.Subscribe(GetWakuFilterFrom(f)) - if err != nil { - return "", err - } - - f.(*wakuFilterWrapper).id = id - return id, nil -} - -func (w *GethWakuWrapper) GetStats() wakutypes.StatsSummary { - return w.waku.GetStats() -} - -func (w *GethWakuWrapper) GetFilter(id string) wakutypes.Filter { - return NewWakuFilterWrapper(w.waku.GetFilter(id), id) -} - -func (w *GethWakuWrapper) Unsubscribe(ctx context.Context, id string) error { - return w.waku.Unsubscribe(id) -} - -func (w *GethWakuWrapper) UnsubscribeMany(ids []string) error { - return w.waku.UnsubscribeMany(ids) -} - -func (w *GethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (wakutypes.Filter, error) { - return NewWakuFilterWrapper(&wakuv1common.Filter{ - KeyAsym: keyAsym, - KeySym: keySym, - PoW: pow, - AllowP2P: true, - Topics: topics, - Messages: wakuv1common.NewMemoryMessageStore(), - }, id), nil -} - -func (w *GethWakuWrapper) ProcessingP2PMessages() bool { - return w.waku.ProcessingP2PMessages() -} - -func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { - w.waku.MarkP2PMessageAsProcessed(hash) -} - -func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {} - -func (w *GethWakuWrapper) ClearEnvelopesCache() { - w.waku.ClearEnvelopesCache() -} - -type wakuFilterWrapper struct { - filter *wakuv1common.Filter - id string -} - -// NewWakuFilterWrapper returns an object that wraps Geth's Filter in a types interface -func NewWakuFilterWrapper(f *wakuv1common.Filter, id string) wakutypes.Filter { - if f.Messages == nil { - panic("Messages should not be nil") - } - - return &wakuFilterWrapper{ - filter: f, - id: id, - } -} - -// GetWakuFilterFrom retrieves the underlying whisper Filter struct from a wrapped Filter interface -func GetWakuFilterFrom(f wakutypes.Filter) *wakuv1common.Filter { - return f.(*wakuFilterWrapper).filter -} - -// ID returns the filter ID -func (w *wakuFilterWrapper) ID() string { - return w.id -} - -func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) { -} - -func (w *GethWakuWrapper) PeerID() peer.ID { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) GetActiveStorenode() peer.ID { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) WaitForAvailableStoreNode(ctx context.Context) bool { - return false -} - -func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { - panic("not available in WakuV1") -} - -func (w *GethWakuWrapper) ProcessMailserverBatch( - ctx context.Context, - batch wakutypes.MailserverBatch, - storenodeID peer.ID, - pageLimit uint64, - shouldProcessNextPage func(int) (bool, uint64), - processEnvelopes bool, -) error { - return errors.New("not available in WakuV1") -} - -func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool { - panic("not available in WakuV1") - -} - -func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error { - panic("not available in WakuV1") - -} - -func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) { - panic("not available in WakuV1") -} diff --git a/waku/bridge/wakuv2.go b/waku/bridge/wakuv2.go deleted file mode 100644 index e6aa75b0983..00000000000 --- a/waku/bridge/wakuv2.go +++ /dev/null @@ -1,376 +0,0 @@ -package bridge - -import ( - "context" - "crypto/ecdsa" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/api/history" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/enode" - - gocommon "github.com/status-im/status-go/common" - "github.com/status-im/status-go/connection" - wakutypes "github.com/status-im/status-go/waku/types" - "github.com/status-im/status-go/wakuv2" - wakucommon "github.com/status-im/status-go/wakuv2/common" -) - -type gethWakuV2Wrapper struct { - waku *wakuv2.Waku -} - -// NewGethWakuWrapper returns an object that wraps Geth's Waku in a types interface -func NewGethWakuV2Wrapper(w *wakuv2.Waku) wakutypes.Waku { - if w == nil { - panic("waku cannot be nil") - } - - return &gethWakuV2Wrapper{ - waku: w, - } -} - -// GetGethWhisperFrom retrieves the underlying whisper Whisper struct from a wrapped Whisper interface -func GetGethWakuV2From(m wakutypes.Waku) *wakuv2.Waku { - return m.(*gethWakuV2Wrapper).waku -} - -func (w *gethWakuV2Wrapper) PublicWakuAPI() wakutypes.PublicWakuAPI { - return wakuv2.NewPublicWakuAPI(w.waku) -} - -func (w *gethWakuV2Wrapper) Version() uint { - return 2 -} - -func (w *gethWakuV2Wrapper) PeerCount() int { - return w.waku.PeerCount() -} - -// DEPRECATED: Not used in WakuV2 -func (w *gethWakuV2Wrapper) MinPow() float64 { - return 0 -} - -// MaxMessageSize returns the MaxMessageSize set -func (w *gethWakuV2Wrapper) MaxMessageSize() uint32 { - return w.waku.MaxMessageSize() -} - -// DEPRECATED: not used in WakuV2 -func (w *gethWakuV2Wrapper) BloomFilter() []byte { - return nil -} - -// GetCurrentTime returns current time. -func (w *gethWakuV2Wrapper) GetCurrentTime() time.Time { - return w.waku.CurrentTime() -} - -func (w *gethWakuV2Wrapper) SubscribeEnvelopeEvents(eventsProxy chan<- wakutypes.EnvelopeEvent) wakutypes.Subscription { - events := make(chan wakucommon.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper - go func() { - defer gocommon.LogOnPanic() - for e := range events { - eventsProxy <- *NewWakuV2EnvelopeEventWrapper(&e) - } - }() - - return NewGethSubscriptionWrapper(w.waku.SubscribeEnvelopeEvents(events)) -} - -func (w *gethWakuV2Wrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { - return w.waku.GetPrivateKey(id) -} - -// AddKeyPair imports a asymmetric private key and returns a deterministic identifier. -func (w *gethWakuV2Wrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { - return w.waku.AddKeyPair(key) -} - -// DeleteKeyPair deletes the key with the specified ID if it exists. -func (w *gethWakuV2Wrapper) DeleteKeyPair(keyID string) bool { - return w.waku.DeleteKeyPair(keyID) -} - -func (w *gethWakuV2Wrapper) AddSymKeyDirect(key []byte) (string, error) { - return w.waku.AddSymKeyDirect(key) -} - -func (w *gethWakuV2Wrapper) AddSymKeyFromPassword(password string) (string, error) { - return w.waku.AddSymKeyFromPassword(password) -} - -func (w *gethWakuV2Wrapper) DeleteSymKey(id string) bool { - return w.waku.DeleteSymKey(id) -} - -func (w *gethWakuV2Wrapper) GetSymKey(id string) ([]byte, error) { - return w.waku.GetSymKey(id) -} - -func (w *gethWakuV2Wrapper) Subscribe(opts *wakutypes.SubscriptionOptions) (string, error) { - var ( - err error - keyAsym *ecdsa.PrivateKey - keySym []byte - ) - - if opts.SymKeyID != "" { - keySym, err = w.GetSymKey(opts.SymKeyID) - if err != nil { - return "", err - } - } - if opts.PrivateKeyID != "" { - keyAsym, err = w.GetPrivateKey(opts.PrivateKeyID) - if err != nil { - return "", err - } - } - - f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PubsubTopic, opts.Topics) - if err != nil { - return "", err - } - - id, err := w.waku.Subscribe(GetWakuV2FilterFrom(f)) - if err != nil { - return "", err - } - - f.(*wakuV2FilterWrapper).id = id - return id, nil -} - -func (w *gethWakuV2Wrapper) GetStats() wakutypes.StatsSummary { - return w.waku.GetStats() -} - -func (w *gethWakuV2Wrapper) GetFilter(id string) wakutypes.Filter { - return NewWakuV2FilterWrapper(w.waku.GetFilter(id), id) -} - -func (w *gethWakuV2Wrapper) Unsubscribe(ctx context.Context, id string) error { - return w.waku.Unsubscribe(ctx, id) -} - -func (w *gethWakuV2Wrapper) UnsubscribeMany(ids []string) error { - return w.waku.UnsubscribeMany(ids) -} - -func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pubsubTopic string, topics [][]byte) (wakutypes.Filter, error) { - return NewWakuV2FilterWrapper(&wakucommon.Filter{ - KeyAsym: keyAsym, - KeySym: keySym, - ContentTopics: wakucommon.NewTopicSetFromBytes(topics), - PubsubTopic: pubsubTopic, - Messages: wakucommon.NewMemoryMessageStore(), - }, id), nil -} - -func (w *gethWakuV2Wrapper) StartDiscV5() error { - return w.waku.StartDiscV5() -} - -func (w *gethWakuV2Wrapper) StopDiscV5() error { - return w.waku.StopDiscV5() -} - -// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected -func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { - return w.waku.SubscribeToPubsubTopic(topic, optPublicKey) -} - -func (w *gethWakuV2Wrapper) UnsubscribeFromPubsubTopic(topic string) error { - return w.waku.UnsubscribeFromPubsubTopic(topic) -} - -func (w *gethWakuV2Wrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { - return w.waku.RetrievePubsubTopicKey(topic) -} - -func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { - return w.waku.StorePubsubTopicKey(topic, privKey) -} - -func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error { - return w.waku.RemovePubsubTopicKey(topic) -} - -func (w *gethWakuV2Wrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { - return w.waku.AddStorePeer(address) -} - -func (w *gethWakuV2Wrapper) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { - return w.waku.AddRelayPeer(address) -} - -func (w *gethWakuV2Wrapper) Peers() wakutypes.PeerStats { - return w.waku.Peers() -} - -func (w *gethWakuV2Wrapper) DialPeer(address multiaddr.Multiaddr) error { - return w.waku.DialPeer(address) -} - -func (w *gethWakuV2Wrapper) DialPeerByID(peerID peer.ID) error { - return w.waku.DialPeerByID(peerID) -} - -func (w *gethWakuV2Wrapper) ListenAddresses() ([]multiaddr.Multiaddr, error) { - return w.waku.ListenAddresses(), nil -} - -func (w *gethWakuV2Wrapper) RelayPeersByTopic(topic string) (*wakutypes.PeerList, error) { - return w.waku.RelayPeersByTopic(topic) -} - -func (w *gethWakuV2Wrapper) ENR() (*enode.Node, error) { - return w.waku.ENR() -} - -func (w *gethWakuV2Wrapper) DropPeer(peerID peer.ID) error { - return w.waku.DropPeer(peerID) -} - -func (w *gethWakuV2Wrapper) ProcessingP2PMessages() bool { - return w.waku.ProcessingP2PMessages() -} - -func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) { - w.waku.MarkP2PMessageAsProcessed(hash) -} - -func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*wakutypes.ConnStatusSubscription, error) { - return w.waku.SubscribeToConnStatusChanges(), nil -} - -func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []wakutypes.TopicType) error { - var cTopics []string - for _, ct := range contentTopics { - cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic()) - } - pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic) - w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics) - - // No err can be be generated by this function. The function returns an error - // Just so there's compatibility with GethWakuWrapper from V1 - return nil -} - -func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) { - w.waku.ConnectionChanged(state) -} - -func (w *gethWakuV2Wrapper) ClearEnvelopesCache() { - w.waku.ClearEnvelopesCache() -} - -type wakuV2FilterWrapper struct { - filter *wakucommon.Filter - id string -} - -// NewWakuFilterWrapper returns an object that wraps Geth's Filter in a types interface -func NewWakuV2FilterWrapper(f *wakucommon.Filter, id string) wakutypes.Filter { - if f.Messages == nil { - panic("Messages should not be nil") - } - - return &wakuV2FilterWrapper{ - filter: f, - id: id, - } -} - -// GetWakuFilterFrom retrieves the underlying whisper Filter struct from a wrapped Filter interface -func GetWakuV2FilterFrom(f wakutypes.Filter) *wakucommon.Filter { - return f.(*wakuV2FilterWrapper).filter -} - -// ID returns the filter ID -func (w *wakuV2FilterWrapper) ID() string { - return w.id -} - -func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) { - w.waku.ConfirmMessageDelivered(hashes) -} - -func (w *gethWakuV2Wrapper) PeerID() peer.ID { - return w.waku.PeerID() -} - -func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID { - return w.waku.StorenodeCycle.GetActiveStorenode() -} - -func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID { - return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe() -} - -func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} { - return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe() -} - -func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID { - return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe() -} - -func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(ctx context.Context) bool { - return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx) -} - -func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { - w.waku.StorenodeCycle.SetStorenodeConfigProvider(c) -} - -func (w *gethWakuV2Wrapper) ProcessMailserverBatch( - ctx context.Context, - batch wakutypes.MailserverBatch, - storenodeID peer.ID, - pageLimit uint64, - shouldProcessNextPage func(int) (bool, uint64), - processEnvelopes bool, -) error { - pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic) - contentTopics := []string{} - for _, topic := range batch.Topics { - contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic()) - } - - criteria := store.FilterCriteria{ - TimeStart: proto.Int64(batch.From.UnixNano()), - TimeEnd: proto.Int64(batch.To.UnixNano()), - ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), - } - - return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes) -} - -func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool { - return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID) -} - -func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error { - return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...) -} - -func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) { - w.waku.StorenodeCycle.Lock() - defer w.waku.StorenodeCycle.Unlock() - - w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff) - if shouldCycle { - w.waku.StorenodeCycle.Cycle(ctx) - } -} diff --git a/waku/types/envelopes.go b/waku/types/envelopes.go index c65f7f13cf2..fad28b8c39d 100644 --- a/waku/types/envelopes.go +++ b/waku/types/envelopes.go @@ -57,6 +57,7 @@ const ( // EnvelopeEvent used for envelopes events. type EnvelopeEvent struct { Event EventType + Topic TopicType Hash types.Hash Batch types.Hash Peer types.EnodeID diff --git a/waku/types/topic.go b/waku/types/topic.go index eb1b4e0a0f0..af08cb50235 100644 --- a/waku/types/topic.go +++ b/waku/types/topic.go @@ -103,3 +103,9 @@ func StringToTopic(s string) (t TopicType) { func TopicTypeToByteArray(t TopicType) []byte { return t[:4] } + +// Converts a topic to its 23/WAKU2-TOPICS representation +func (t TopicType) ContentTopic() string { + enc := hexutil.Encode(t[:]) + return "/waku/1/" + enc + "/rfc26" +} diff --git a/waku/types/waku.go b/waku/types/waku.go index 9e409ded957..487af2b7a5d 100644 --- a/waku/types/waku.go +++ b/waku/types/waku.go @@ -106,6 +106,9 @@ type WakuKeyManager interface { type Waku interface { PublicWakuAPI() PublicWakuAPI + Start() error + Stop() error + // Waku protocol version Version() uint diff --git a/wakuv1/api.go b/wakuv1/api.go index b886a05a545..8503d7fb200 100644 --- a/wakuv1/api.go +++ b/wakuv1/api.go @@ -305,12 +305,12 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req types.NewMessage) ([]byt // UninstallFilter is alias for Unsubscribe func (api *PublicWakuAPI) UninstallFilter(id string) { - api.w.Unsubscribe(id) // nolint: errcheck + api.w.Unsubscribe(context.TODO(), id) // nolint: errcheck } // Unsubscribe disables and removes an existing filter. func (api *PublicWakuAPI) Unsubscribe(id string) { - api.w.Unsubscribe(id) // nolint: errcheck + api.w.Unsubscribe(context.TODO(), id) // nolint: errcheck } // Messages set up a subscription that fires events when messages arrive that match @@ -373,7 +373,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r } } - id, err := api.w.Subscribe(&filter) + id, err := api.w.subscribe(&filter) if err != nil { return nil, err } @@ -389,7 +389,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r for { select { case <-ticker.C: - if filter := api.w.GetFilter(id); filter != nil { + if filter := api.w.getFilter(id); filter != nil { for _, rpcMessage := range toMessage(filter.Retrieve()) { if err := notifier.Notify(rpcSub.ID, rpcMessage); err != nil { logutils.ZapLogger().Error("Failed to send notification", zap.Error(err)) @@ -397,7 +397,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r } } case <-rpcSub.Err(): - _ = api.w.Unsubscribe(id) + _ = api.w.Unsubscribe(context.TODO(), id) return } } @@ -450,7 +450,7 @@ func toMessage(messages []*common.ReceivedMessage) []*types.Message { func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*types.Message, error) { logger := api.w.logger.With(zap.String("site", "getFilterMessages"), zap.String("filterId", id)) api.mu.Lock() - f := api.w.GetFilter(id) + f := api.w.getFilter(id) if f == nil { api.mu.Unlock() return nil, fmt.Errorf("filter not found") @@ -475,7 +475,7 @@ func (api *PublicWakuAPI) DeleteMessageFilter(id string) (bool, error) { defer api.mu.Unlock() delete(api.lastUsed, id) - return true, api.w.Unsubscribe(id) + return true, api.w.Unsubscribe(context.TODO(), id) } // NewMessageFilter creates a new filter that can be used to poll for @@ -537,7 +537,7 @@ func (api *PublicWakuAPI) NewMessageFilter(req types.Criteria) (string, error) { Messages: common.NewMemoryMessageStore(), } - id, err := api.w.Subscribe(f) + id, err := api.w.subscribe(f) if err != nil { return "", err } diff --git a/wakuv1/bridge.go b/wakuv1/bridge.go new file mode 100644 index 00000000000..bd924056c1b --- /dev/null +++ b/wakuv1/bridge.go @@ -0,0 +1,146 @@ +package wakuv1 + +import ( + "io" + + "github.com/ethereum/go-ethereum/rlp" + + "github.com/ethereum/go-ethereum/event" + ethtypes "github.com/status-im/status-go/eth-node/types" + + "github.com/status-im/status-go/waku/types" + "github.com/status-im/status-go/wakuv1/common" +) + +// NewWakuEnvelopeEventWrapper returns a types.EnvelopeEvent object that mimics Geth's EnvelopeEvent +func NewWakuEnvelopeEventWrapper(envelopeEvent *common.EnvelopeEvent) *types.EnvelopeEvent { + if envelopeEvent == nil { + panic("envelopeEvent should not be nil") + } + + wrappedData := envelopeEvent.Data + switch data := envelopeEvent.Data.(type) { + case []common.EnvelopeError: + wrappedData := make([]types.EnvelopeError, len(data)) + for index := range data { + wrappedData[index] = *NewWakuEnvelopeErrorWrapper(&data[index]) + } + case *MailServerResponse: + wrappedData = NewWakuMailServerResponseWrapper(data) + } + return &types.EnvelopeEvent{ + Event: types.EventType(envelopeEvent.Event), + Hash: ethtypes.Hash(envelopeEvent.Hash), + Batch: ethtypes.Hash(envelopeEvent.Batch), + Peer: ethtypes.EnodeID(envelopeEvent.Peer), + Data: wrappedData, + } +} + +// NewWakuEnvelopeErrorWrapper returns a types.EnvelopeError object that mimics Geth's EnvelopeError +func NewWakuEnvelopeErrorWrapper(envelopeError *common.EnvelopeError) *types.EnvelopeError { + if envelopeError == nil { + panic("envelopeError should not be nil") + } + + return &types.EnvelopeError{ + Hash: ethtypes.Hash(envelopeError.Hash), + Code: mapGethErrorCode(envelopeError.Code), + Description: envelopeError.Description, + } +} + +func mapGethErrorCode(code uint) uint { + switch code { + case common.EnvelopeTimeNotSynced: + return types.EnvelopeTimeNotSynced + case common.EnvelopeOtherError: + return types.EnvelopeOtherError + } + return types.EnvelopeOtherError +} + +// NewWakuMailServerResponseWrapper returns a types.MailServerResponse object that mimics Geth's MailServerResponse +func NewWakuMailServerResponseWrapper(mailServerResponse *MailServerResponse) *types.MailServerResponse { + if mailServerResponse == nil { + panic("mailServerResponse should not be nil") + } + + return &types.MailServerResponse{ + LastEnvelopeHash: ethtypes.Hash(mailServerResponse.LastEnvelopeHash), + Cursor: mailServerResponse.Cursor, + Error: mailServerResponse.Error, + } +} + +type gethSubscriptionWrapper struct { + subscription event.Subscription +} + +// NewGethSubscriptionWrapper returns an object that wraps Geth's Subscription in a types interface +func NewGethSubscriptionWrapper(subscription event.Subscription) types.Subscription { + if subscription == nil { + panic("subscription cannot be nil") + } + + return &gethSubscriptionWrapper{ + subscription: subscription, + } +} + +func (w *gethSubscriptionWrapper) Err() <-chan error { + return w.subscription.Err() +} + +func (w *gethSubscriptionWrapper) Unsubscribe() { + w.subscription.Unsubscribe() +} + +type wakuEnvelope struct { + env *common.Envelope +} + +// NewWakuEnvelope returns an object that wraps Geth's Waku Envelope in a types interface. +func NewWakuEnvelope(e *common.Envelope) types.Envelope { + return &wakuEnvelope{env: e} +} + +func (w *wakuEnvelope) Unwrap() interface{} { + return w.env +} + +func (w *wakuEnvelope) Hash() ethtypes.Hash { + return ethtypes.Hash(w.env.Hash()) +} + +func (w *wakuEnvelope) Bloom() []byte { + return w.env.Bloom() +} + +func (w *wakuEnvelope) PoW() float64 { + return w.env.PoW() +} + +func (w *wakuEnvelope) Expiry() uint32 { + return w.env.Expiry +} + +func (w *wakuEnvelope) TTL() uint32 { + return w.env.TTL +} + +func (w *wakuEnvelope) Topic() types.TopicType { + return types.TopicType(w.env.Topic) +} + +func (w *wakuEnvelope) Size() int { + return len(w.env.Data) +} + +func (w *wakuEnvelope) DecodeRLP(s *rlp.Stream) error { + return w.env.DecodeRLP(s) +} + +func (w *wakuEnvelope) EncodeRLP(writer io.Writer) error { + return rlp.Encode(writer, w.env) +} diff --git a/wakuv1/common/filter.go b/wakuv1/common/filter.go index 65811c3a4ee..44634a6b4fa 100644 --- a/wakuv1/common/filter.go +++ b/wakuv1/common/filter.go @@ -273,3 +273,7 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { func (f *Filter) MatchEnvelope(envelope *Envelope) bool { return f.PoW <= 0 || envelope.pow >= f.PoW } + +func (f *Filter) ID() string { + return f.id +} diff --git a/wakuv1/simulation_test.go b/wakuv1/simulation_test.go index 844ff772e3e..0673fb46515 100644 --- a/wakuv1/simulation_test.go +++ b/wakuv1/simulation_test.go @@ -20,6 +20,7 @@ package wakuv1 import ( "bytes" + "context" "crypto/ecdsa" mrand "math/rand" "net" @@ -197,7 +198,7 @@ func initializeBloomFilterMode(t *testing.T) { topics = append(topics, sharedTopic) f := common.Filter{KeySym: sharedKey, Messages: common.NewMemoryMessageStore()} f.Topics = [][]byte{topics[0][:]} - node.filerID, err = node.waku.Subscribe(&f) + node.filerID, err = node.waku.subscribe(&f) if err != nil { t.Fatalf("failed to install the filter: %s.", err) } @@ -248,7 +249,7 @@ func stopServers() { for i := 0; i < NumNodes; i++ { n := nodes[i] if n != nil { - _ = n.waku.Unsubscribe(n.filerID) + _ = n.waku.Unsubscribe(context.Background(), n.filerID) _ = n.waku.Stop() n.server.Stop() } @@ -271,7 +272,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) { for j := 0; j < iterations; j++ { for i := first; i < NumNodes; i++ { - f := nodes[i].waku.GetFilter(nodes[i].filerID) + f := nodes[i].waku.getFilter(nodes[i].filerID) if f == nil { t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerID, i, round) } diff --git a/wakuv1/waku.go b/wakuv1/waku.go index 93f6be2efa5..32c659263dd 100644 --- a/wakuv1/waku.go +++ b/wakuv1/waku.go @@ -20,6 +20,7 @@ package wakuv1 import ( "bytes" + "context" "crypto/ecdsa" "crypto/sha256" "errors" @@ -28,8 +29,6 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/ethereum/go-ethereum/common/hexutil" "go.uber.org/zap" @@ -45,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" gocommon "github.com/status-im/status-go/common" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -122,6 +122,8 @@ type Waku struct { logger *zap.Logger } +var _ wakutypes.Waku = (*Waku)(nil) + // New creates a Waku client ready to communicate through the Ethereum P2P network. func New(cfg *Config, logger *zap.Logger) *Waku { if logger == nil { @@ -553,10 +555,22 @@ func (w *Waku) SendEnvelopeEvent(event common.EnvelopeEvent) int { // SubscribeEnvelopeEvents subscribes to envelopes feed. // In order to prevent blocking waku producers events must be amply buffered. -func (w *Waku) SubscribeEnvelopeEvents(events chan<- common.EnvelopeEvent) event.Subscription { +func (w *Waku) subscribeEnvelopeEvents(events chan<- common.EnvelopeEvent) event.Subscription { return w.envelopeFeed.Subscribe(events) } +func (w *Waku) SubscribeEnvelopeEvents(eventsProxy chan<- wakutypes.EnvelopeEvent) wakutypes.Subscription { + events := make(chan common.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper + go func() { + defer gocommon.LogOnPanic() + for e := range events { + eventsProxy <- *NewWakuEnvelopeEventWrapper(&e) + } + }() + + return NewGethSubscriptionWrapper(w.subscribeEnvelopeEvents(events)) +} + func (w *Waku) notifyPeersAboutPowRequirementChange(pow float64) { arr := w.getPeers() for _, p := range arr { @@ -915,7 +929,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Waku) Subscribe(f *common.Filter) (string, error) { +func (w *Waku) subscribe(f *common.Filter) (string, error) { s, err := w.filters.Install(f) if err != nil { return s, err @@ -966,15 +980,15 @@ func (w *Waku) updateBloomFilter(f *common.Filter) error { return nil } -// GetFilter returns the filter by id. -func (w *Waku) GetFilter(id string) *common.Filter { +// getFilter returns the filter by id. +func (w *Waku) getFilter(id string) *common.Filter { return w.filters.Get(id) } // Unsubscribe removes an installed message handler. // TODO: This does not update the bloom filter, but does update // the topic interest map -func (w *Waku) Unsubscribe(id string) error { +func (w *Waku) Unsubscribe(ctx context.Context, id string) error { ok := w.filters.Uninstall(id) if !ok { return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id) @@ -1543,10 +1557,6 @@ func (w *Waku) Clean() error { return nil } -func (w *Waku) PeerID() peer.ID { - panic("not implemented") -} - // validatePrivateKey checks the format of the given private key. func validatePrivateKey(k *ecdsa.PrivateKey) bool { if k == nil || k.D == nil || k.D.Sign() == 0 { @@ -1589,3 +1599,49 @@ func addBloom(a, b []byte) []byte { } return c } + +func (w *Waku) ConnectionChanged(_ connection.State) {} + +func (w *Waku) GetCurrentTime() time.Time { + return w.CurrentTime() +} + +func (w *Waku) GetFilter(id string) wakutypes.Filter { + return w.getFilter(id) +} + +func (w *Waku) PublicWakuAPI() wakutypes.PublicWakuAPI { + return NewPublicWakuAPI(w) +} + +func (w *Waku) Subscribe(opts *wakutypes.SubscriptionOptions) (string, error) { + var ( + err error + keyAsym *ecdsa.PrivateKey + keySym []byte + ) + + if opts.SymKeyID != "" { + keySym, err = w.GetSymKey(opts.SymKeyID) + if err != nil { + return "", err + } + } + if opts.PrivateKeyID != "" { + keyAsym, err = w.GetPrivateKey(opts.PrivateKeyID) + if err != nil { + return "", err + } + } + + f := &common.Filter{ + KeyAsym: keyAsym, + KeySym: keySym, + PoW: opts.PoW, + AllowP2P: true, + Topics: opts.Topics, + Messages: common.NewMemoryMessageStore(), + } + + return w.subscribe(f) +} diff --git a/wakuv1/waku_test.go b/wakuv1/waku_test.go index 4a94c5d3b5b..335ad338e9c 100644 --- a/wakuv1/waku_test.go +++ b/wakuv1/waku_test.go @@ -626,7 +626,7 @@ func TestCustomization(t *testing.T) { } // check w.messages() - _, err = w.Subscribe(f) + _, err = w.subscribe(f) if err != nil { t.Fatalf("failed subscribe with seed %d: %s.", seed, err) } @@ -686,12 +686,12 @@ func TestSymmetricSendCycle(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter1) + _, err = w.subscribe(filter1) if err != nil { t.Fatalf("failed subscribe 1 with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter2) + _, err = w.subscribe(filter2) if err != nil { t.Fatalf("failed subscribe 2 with seed %d: %s.", seed, err) } @@ -777,12 +777,12 @@ func TestSymmetricSendCycleWithTopicInterest(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter1) + _, err = w.subscribe(filter1) if err != nil { t.Fatalf("failed subscribe 1 with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter2) + _, err = w.subscribe(filter2) if err != nil { t.Fatalf("failed subscribe 2 with seed %d: %s.", seed, err) } @@ -858,7 +858,7 @@ func TestSymmetricSendWithoutAKey(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter) + _, err = w.subscribe(filter) if err != nil { t.Fatalf("failed subscribe 1 with seed %d: %s.", seed, err) } @@ -926,7 +926,7 @@ func TestSymmetricSendKeyMismatch(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter) + _, err = w.subscribe(filter) if err != nil { t.Fatalf("failed subscribe 1 with seed %d: %s.", seed, err) } @@ -1056,7 +1056,7 @@ func TestTopicInterest(t *testing.T) { t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter1) + _, err = w.subscribe(filter1) if err != nil { t.Fatalf("failed subscribe with seed %d: %s.", seed, err) } @@ -1071,7 +1071,7 @@ func TestTopicInterest(t *testing.T) { t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err) } - _, err = w.Subscribe(filter2) + _, err = w.subscribe(filter2) if err != nil { t.Fatalf("failed subscribe with seed %d: %s.", seed, err) } diff --git a/wakuv1/waku_unsupported.go b/wakuv1/waku_unsupported.go new file mode 100644 index 00000000000..f47ab467752 --- /dev/null +++ b/wakuv1/waku_unsupported.go @@ -0,0 +1,155 @@ +package wakuv1 + +import ( + "context" + "crypto/ecdsa" + "errors" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/enode" + + "github.com/status-im/status-go/waku/types" + "github.com/waku-org/go-waku/waku/v2/api/history" +) + +var notAvailableStr = "not available in WakuV1" +var notAvailableError = errors.New(notAvailableStr) + +func (w *Waku) PeerCount() int { + return -1 +} + +func (w *Waku) StartDiscV5() error { + return notAvailableError +} + +func (w *Waku) StopDiscV5() error { + return notAvailableError +} + +func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { + return "", notAvailableError +} + +func (w *Waku) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { + // not available in WakuV1 + return notAvailableError +} + +func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { + // not available in WakuV1 + return notAvailableError +} + +func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { + // not available in WakuV1 + return nil, notAvailableError +} + +func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { + // not available in WakuV1 + return notAvailableError +} + +func (w *Waku) RemovePubsubTopicKey(topic string) error { + // not available in WakuV1 + return notAvailableError +} + +func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { + return "", notAvailableError +} + +func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { + return notAvailableError +} + +func (w *Waku) DialPeerByID(peerID peer.ID) error { + return notAvailableError +} + +func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { + return nil, notAvailableError +} + +func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return nil, notAvailableError +} + +func (w *Waku) ENR() (*enode.Node, error) { + return nil, notAvailableError +} + +func (w *Waku) DropPeer(peerID peer.ID) error { + return notAvailableError +} + +func (w *Waku) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { + return nil, notAvailableError +} + +func (w *Waku) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error { + return notAvailableError +} + +func (w *Waku) Peers() types.PeerStats { + return types.PeerStats{} +} + +func (w *Waku) ConfirmMessageDelivered(hashes []common.Hash) { +} + +func (w *Waku) PeerID() peer.ID { + panic(notAvailableStr) +} + +func (w *Waku) GetActiveStorenode() peer.ID { + panic(notAvailableStr) +} + +func (w *Waku) OnStorenodeChanged() <-chan peer.ID { + panic(notAvailableStr) +} + +func (w *Waku) OnStorenodeNotWorking() <-chan struct{} { + panic(notAvailableStr) +} + +func (w *Waku) OnStorenodeAvailable() <-chan peer.ID { + panic(notAvailableStr) +} + +func (w *Waku) WaitForAvailableStoreNode(ctx context.Context) bool { + return false +} + +func (w *Waku) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { + panic(notAvailableStr) +} + +func (w *Waku) ProcessMailserverBatch( + ctx context.Context, + batch types.MailserverBatch, + storenodeID peer.ID, + pageLimit uint64, + shouldProcessNextPage func(int) (bool, uint64), + processEnvelopes bool, +) error { + return notAvailableError +} + +func (w *Waku) IsStorenodeAvailable(peerID peer.ID) bool { + panic(notAvailableStr) +} + +func (w *Waku) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error { + panic(notAvailableStr) +} + +func (w *Waku) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) { + panic(notAvailableStr) +} diff --git a/wakuv1/waku_version_test.go b/wakuv1/waku_version_test.go index 30c0a68ac30..9602b186dd6 100644 --- a/wakuv1/waku_version_test.go +++ b/wakuv1/waku_version_test.go @@ -79,7 +79,7 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() { go func() { handleError(s.T(), w2.Stop()) }() envelopeEvents := make(chan common.EnvelopeEvent, 10) - sub := w1.SubscribeEnvelopeEvents(envelopeEvents) + sub := w1.subscribeEnvelopeEvents(envelopeEvents) defer sub.Unsubscribe() params, err := generateMessageParams() @@ -207,7 +207,7 @@ func (s *WakuTestSuite) TestMessagesResponseWithError() { } events := make(chan common.EnvelopeEvent, 2) - sub := w1.SubscribeEnvelopeEvents(events) + sub := w1.subscribeEnvelopeEvents(events) defer sub.Unsubscribe() w2.addEnvelope(&failed) @@ -241,7 +241,7 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { w1 := New(conf, nil) w2 := New(conf, nil) events := make(chan common.EnvelopeEvent, 2) - sub := w1.SubscribeEnvelopeEvents(events) + sub := w1.subscribeEnvelopeEvents(events) defer sub.Unsubscribe() rw1, rw2 := p2p.MsgPipe() @@ -390,7 +390,7 @@ func (s *WakuTestSuite) TestMailserverCompletionEvent() { peer2.SetPeerTrusted(true) events := make(chan common.EnvelopeEvent) - sub := w1.SubscribeEnvelopeEvents(events) + sub := w1.subscribeEnvelopeEvents(events) defer sub.Unsubscribe() envelopes := []*common.Envelope{{Data: []byte{1}}, {Data: []byte{2}}} diff --git a/wakuv2/api.go b/wakuv2/api.go index 948972ba91e..37119fd30c1 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -331,7 +331,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r } } - id, err := api.w.Subscribe(&filter) + id, err := api.w.subscribe(&filter) if err != nil { return nil, err } @@ -347,7 +347,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r for { select { case <-ticker.C: - if filter := api.w.GetFilter(id); filter != nil { + if filter := api.w.getFilter(id); filter != nil { for _, rpcMessage := range toMessage(filter.Retrieve()) { if err := notifier.Notify(rpcSub.ID, rpcMessage); err != nil { logutils.ZapLogger().Error("Failed to send notification", zap.Error(err)) @@ -405,7 +405,7 @@ func toMessage(messages []*common.ReceivedMessage) []*types.Message { // are received between the last poll and now. func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*types.Message, error) { api.mu.Lock() - f := api.w.GetFilter(id) + f := api.w.getFilter(id) if f == nil { api.mu.Unlock() return nil, fmt.Errorf("filter not found") @@ -485,7 +485,7 @@ func (api *PublicWakuAPI) NewMessageFilter(req types.Criteria) (string, error) { Messages: common.NewMemoryMessageStore(), } - id, err := api.w.Subscribe(f) + id, err := api.w.subscribe(f) if err != nil { return "", err } diff --git a/wakuv2/bridge.go b/wakuv2/bridge.go new file mode 100644 index 00000000000..a71836915aa --- /dev/null +++ b/wakuv2/bridge.go @@ -0,0 +1,81 @@ +package wakuv2 + +import ( + "github.com/ethereum/go-ethereum/event" + ethtypes "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/waku/types" + "github.com/status-im/status-go/wakuv2/common" +) + +// NewWakuV2EnvelopeEventWrapper returns a wakutypes.EnvelopeEvent object that mimics Geth's EnvelopeEvent +func NewWakuV2EnvelopeEventWrapper(envelopeEvent *common.EnvelopeEvent) *types.EnvelopeEvent { + if envelopeEvent == nil { + panic("envelopeEvent should not be nil") + } + + wrappedData := envelopeEvent.Data + switch data := envelopeEvent.Data.(type) { + case []common.EnvelopeError: + wrappedData := make([]types.EnvelopeError, len(data)) + for index := range data { + wrappedData[index] = *NewWakuV2EnvelopeErrorWrapper(&data[index]) + } + } + return &types.EnvelopeEvent{ + Event: types.EventType(envelopeEvent.Event), + Hash: ethtypes.Hash(envelopeEvent.Hash), + Batch: ethtypes.Hash(envelopeEvent.Batch), + Peer: ethtypes.EnodeID(envelopeEvent.Peer), + Data: wrappedData, + } +} + +// NewWakuEnvelopeErrorWrapper returns a wakutypes.EnvelopeError object that mimics Geth's EnvelopeError +func NewWakuV2EnvelopeErrorWrapper(envelopeError *common.EnvelopeError) *types.EnvelopeError { + if envelopeError == nil { + panic("envelopeError should not be nil") + } + + return &types.EnvelopeError{ + Hash: ethtypes.Hash(envelopeError.Hash), + Code: mapGethErrorCode(envelopeError.Code), + Description: envelopeError.Description, + } +} + +func mapGethErrorCode(code uint) uint { + const ( + EnvelopeTimeNotSynced uint = iota + 1 + EnvelopeOtherError + ) + switch code { + case EnvelopeTimeNotSynced: + return types.EnvelopeTimeNotSynced + case EnvelopeOtherError: + return types.EnvelopeOtherError + } + return types.EnvelopeOtherError +} + +type gethSubscriptionWrapper struct { + subscription event.Subscription +} + +// NewGethSubscriptionWrapper returns an object that wraps Geth's Subscription in a types interface +func NewGethSubscriptionWrapper(subscription event.Subscription) types.Subscription { + if subscription == nil { + panic("subscription cannot be nil") + } + + return &gethSubscriptionWrapper{ + subscription: subscription, + } +} + +func (w *gethSubscriptionWrapper) Err() <-chan error { + return w.subscription.Err() +} + +func (w *gethSubscriptionWrapper) Unsubscribe() { + w.subscription.Unsubscribe() +} diff --git a/wakuv2/common/filter.go b/wakuv2/common/filter.go index fdfeef6b481..4815492d86e 100644 --- a/wakuv2/common/filter.go +++ b/wakuv2/common/filter.go @@ -308,3 +308,7 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { } return false } + +func (f *Filter) ID() string { + return f.id +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 82f02c0d6ab..b9023bb65dd 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -37,6 +37,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" + "google.golang.org/protobuf/proto" "go.uber.org/zap" @@ -75,6 +76,7 @@ import ( gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" + ethtypes "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/timesource" "github.com/status-im/status-go/wakuv2/common" @@ -200,6 +202,8 @@ type Waku struct { defaultShardInfo protocol.RelayShards } +var _ types.Waku = (*Waku)(nil) + func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client } @@ -371,12 +375,12 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge return waku, nil } -func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { +func (w *Waku) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { w.connStatusMu.Lock() defer w.connStatusMu.Unlock() subscription := types.NewConnStatusSubscription() w.connStatusSubscriptions[subscription.ID] = subscription - return subscription + return subscription, nil } func (w *Waku) GetNodeENRString() (string, error) { @@ -743,10 +747,22 @@ func (w *Waku) SendEnvelopeEvent(event common.EnvelopeEvent) int { // SubscribeEnvelopeEvents subscribes to envelopes feed. // In order to prevent blocking waku producers events must be amply buffered. -func (w *Waku) SubscribeEnvelopeEvents(events chan<- common.EnvelopeEvent) event.Subscription { +func (w *Waku) subscribeEnvelopeEvents(events chan<- common.EnvelopeEvent) event.Subscription { return w.envelopeFeed.Subscribe(events) } +func (w *Waku) SubscribeEnvelopeEvents(eventsProxy chan<- types.EnvelopeEvent) types.Subscription { + events := make(chan common.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper + go func() { + defer gocommon.LogOnPanic() + for e := range events { + eventsProxy <- *NewWakuV2EnvelopeEventWrapper(&e) + } + }() + + return NewGethSubscriptionWrapper(w.subscribeEnvelopeEvents(events)) +} + // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. func (w *Waku) NewKeyPair() (string, error) { @@ -985,7 +1001,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Waku) Subscribe(f *common.Filter) (string, error) { +func (w *Waku) subscribe(f *common.Filter) (string, error) { f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { @@ -1015,10 +1031,14 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error { } // GetFilter returns the filter by id. -func (w *Waku) GetFilter(id string) *common.Filter { +func (w *Waku) getFilter(id string) *common.Filter { return w.filters.Get(id) } +func (w *Waku) GetFilter(id string) types.Filter { + return w.getFilter(id) +} + // Unsubscribe removes an installed message handler. func (w *Waku) UnsubscribeMany(ids []string) error { for _, id := range ids { @@ -1577,11 +1597,11 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. -func (w *Waku) GetEnvelope(hash gethcommon.Hash) *common.ReceivedMessage { +func (w *Waku) GetEnvelope(hash ethtypes.Hash) *common.ReceivedMessage { w.poolMu.RLock() defer w.poolMu.RUnlock() - envelope := w.envelopeCache.Get(hash) + envelope := w.envelopeCache.Get(gethcommon.Hash(hash)) if envelope == nil { return nil } @@ -1624,8 +1644,8 @@ func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { }, nil } -func (w *Waku) ListenAddresses() []multiaddr.Multiaddr { - return w.node.ListenAddresses() +func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { + return w.node.ListenAddresses(), nil } func (w *Waku) ENR() (*enode.Node, error) { @@ -2019,3 +2039,123 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { func (w *Waku) LegacyStoreNode() legacy_store.Store { return w.node.LegacyStore() } + +// GetCurrentTime returns current time. +func (w *Waku) GetCurrentTime() time.Time { + return w.CurrentTime() +} + +func (w *Waku) GetActiveStorenode() peer.ID { + return w.StorenodeCycle.GetActiveStorenode() +} + +func (w *Waku) OnStorenodeChanged() <-chan peer.ID { + return w.StorenodeCycle.StorenodeChangedEmitter.Subscribe() +} + +func (w *Waku) OnStorenodeNotWorking() <-chan struct{} { + return w.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe() +} + +func (w *Waku) OnStorenodeAvailable() <-chan peer.ID { + return w.StorenodeCycle.StorenodeAvailableEmitter.Subscribe() +} + +func (w *Waku) WaitForAvailableStoreNode(ctx context.Context) bool { + return w.StorenodeCycle.WaitForAvailableStoreNode(ctx) +} + +func (w *Waku) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { + w.StorenodeCycle.SetStorenodeConfigProvider(c) +} + +func (w *Waku) ProcessMailserverBatch( + ctx context.Context, + batch types.MailserverBatch, + storenodeID peer.ID, + pageLimit uint64, + shouldProcessNextPage func(int) (bool, uint64), + processEnvelopes bool, +) error { + pubsubTopic := w.GetPubsubTopic(batch.PubsubTopic) + contentTopics := []string{} + for _, topic := range batch.Topics { + contentTopics = append(contentTopics, common.BytesToTopic(topic.Bytes()).ContentTopic()) + } + + criteria := store.FilterCriteria{ + TimeStart: proto.Int64(batch.From.UnixNano()), + TimeEnd: proto.Int64(batch.To.UnixNano()), + ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), + } + + return w.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes) +} + +func (w *Waku) IsStorenodeAvailable(peerID peer.ID) bool { + return w.StorenodeCycle.IsStorenodeAvailable(peerID) +} + +func (w *Waku) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error { + return w.StorenodeCycle.PerformStorenodeTask(fn, opts...) +} + +func (w *Waku) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) { + w.StorenodeCycle.Lock() + defer w.StorenodeCycle.Unlock() + + w.StorenodeCycle.DisconnectActiveStorenode(backoff) + if shouldCycle { + w.StorenodeCycle.Cycle(ctx) + } +} + +func (w *Waku) PublicWakuAPI() types.PublicWakuAPI { + return NewPublicWakuAPI(w) +} + +func (w *Waku) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error { + var cTopics []string + for _, ct := range contentTopics { + cTopics = append(cTopics, common.BytesToTopic(ct.Bytes()).ContentTopic()) + } + pubsubTopic = w.GetPubsubTopic(pubsubTopic) + w.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics) + + return nil +} + +func (w *Waku) Subscribe(opts *types.SubscriptionOptions) (string, error) { + var ( + err error + keyAsym *ecdsa.PrivateKey + keySym []byte + ) + + if opts.SymKeyID != "" { + keySym, err = w.GetSymKey(opts.SymKeyID) + if err != nil { + return "", err + } + } + if opts.PrivateKeyID != "" { + keyAsym, err = w.GetPrivateKey(opts.PrivateKeyID) + if err != nil { + return "", err + } + } + + f := &common.Filter{ + KeyAsym: keyAsym, + KeySym: keySym, + ContentTopics: common.NewTopicSetFromBytes(opts.Topics), + PubsubTopic: opts.PubsubTopic, + Messages: common.NewMemoryMessageStore(), + } + + return w.subscribe(f) +} + +func (w *Waku) Version() uint { + return 2 +} diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 78270363731..43ee513e692 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -250,7 +250,7 @@ func TestBasicWakuV2(t *testing.T) { ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), } - _, err = w.Subscribe(filter) + _, err = w.subscribe(filter) require.NoError(t, err) msgTimestamp := w.timestamp() @@ -444,7 +444,7 @@ func TestWakuV2Filter(t *testing.T) { ContentTopics: common.NewTopicSetFromBytes([][]byte{contentTopicBytes}), } - fID, err := w.Subscribe(filter) + fID, err := w.subscribe(filter) require.NoError(t, err) msgTimestamp := w.timestamp() @@ -538,7 +538,7 @@ func TestWakuV2Store(t *testing.T) { require.NoError(t, err) require.NoError(t, w2.Start()) w2EnvelopeCh := make(chan common.EnvelopeEvent, 100) - w2.SubscribeEnvelopeEvents(w2EnvelopeCh) + w2.subscribeEnvelopeEvents(w2EnvelopeCh) defer func() { require.NoError(t, w2.Stop()) close(w2EnvelopeCh) @@ -558,7 +558,7 @@ func TestWakuV2Store(t *testing.T) { ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), } - _, err = w2.Subscribe(filter) + _, err = w2.subscribe(filter) require.NoError(t, err) time.Sleep(2 * time.Second) @@ -699,7 +699,7 @@ func TestLightpushRateLimit(t *testing.T) { ContentTopics: contentTopics, } - _, err = w0.Subscribe(filter) + _, err = w0.subscribe(filter) require.NoError(t, err) config1 := &Config{} @@ -752,7 +752,7 @@ func TestLightpushRateLimit(t *testing.T) { waitForPeerConnectionWithTimeout(t, w2.node.Host().ID(), w1PeersCh, 5*time.Second) event := make(chan common.EnvelopeEvent, 10) - w2.SubscribeEnvelopeEvents(event) + w2.subscribeEnvelopeEvents(event) for i := range [15]int{} { msgTimestamp := w2.timestamp() diff --git a/wakuv2/waku_unsupported.go b/wakuv2/waku_unsupported.go new file mode 100644 index 00000000000..3c487d6d3cb --- /dev/null +++ b/wakuv2/waku_unsupported.go @@ -0,0 +1,11 @@ +package wakuv2 + +// DEPRECATED: Not used in WakuV2 +func (w *Waku) MinPow() float64 { + return 0 +} + +// DEPRECATED: not used in WakuV2 +func (w *Waku) BloomFilter() []byte { + return nil +}