Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lightpush v2 #3279

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import
import
waku/[
waku_core,
waku_lightpush/common,
waku_lightpush/rpc,
waku_lightpush_legacy/common,
waku_lightpush_legacy/rpc,
waku_enr,
discovery/waku_dnsdisc,
waku_store_legacy,
Expand Down Expand Up @@ -227,9 +227,9 @@ proc publish(c: Chat, line: string) =
c.node.wakuRlnRelay.lastEpoch = proof.epoch

try:
if not c.node.wakuLightPush.isNil():
if not c.node.wakuLegacyLightPush.isNil():
# Attempt lightpush
(waitFor c.node.lightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
(waitFor c.node.legacyLightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish lightpush message", error = error
else:
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
Expand Down Expand Up @@ -502,8 +502,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if conf.lightpushnode != "":
let peerInfo = parsePeerInfo(conf.lightpushnode)
if peerInfo.isOk():
await mountLightPush(node)
node.mountLightPushClient()
await mountLegacyLightPush(node)
node.mountLegacyLightPushClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
else:
error "LightPush not mounted. Couldn't parse conf.lightpushnode",
Expand Down
4 changes: 2 additions & 2 deletions apps/liteprotocoltester/lightpush_publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ proc publishMessages(
lightpushContentTopic,
renderMsgSize,
)
let wlpRes = await wakuNode.lightpushPublish(
let wlpRes = await wakuNode.legacyLightpushPublish(
some(lightpushPubsubTopic), message, actualServicePeer
)

Expand Down Expand Up @@ -209,7 +209,7 @@ proc setupAndPublish*(
if isNil(wakuNode.wakuLightpushClient):
# if we have not yet initialized lightpush client, then do it as the only way we can get here is
# by having a service peer discovered.
wakuNode.mountLightPushClient()
wakuNode.mountLegacyLightPushClient()

# give some time to receiver side to set up
let waitTillStartTesting = conf.startPublishingAfter.seconds
Expand Down
2 changes: 1 addition & 1 deletion apps/liteprotocoltester/liteprotocoltester.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ when isMainModule:
var codec = WakuLightPushCodec
# mounting relevant client, for PX filter client must be mounted ahead
if conf.testFunc == TesterFunctionality.SENDER:
wakuApp.node.mountLightPushClient()
wakuApp.node.mountLegacyLightPushClient()
codec = WakuLightPushCodec
else:
waitFor wakuApp.node.mountFilterClient()
Expand Down
6 changes: 3 additions & 3 deletions examples/lightpush_publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
## use lightpush to publish messages without relay

import chronicles, chronos, stew/byteutils, results
import waku/[common/logging, node/peer_manager, waku_core, waku_lightpush/client]
import waku/[common/logging, node/peer_manager, waku_core, waku_lightpush_legacy/client]

const
LightpushPeer =
Expand All @@ -12,7 +12,7 @@ const
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc publishMessages(
wlc: WakuLightpushClient,
wlc: WakuLegacyLightpushClient,
lightpushPeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic,
Expand Down Expand Up @@ -44,7 +44,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) =
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wlc = WakuLightpushClient.new(pm, rng)
wlc = WakuLegacyLightpushClient.new(pm, rng)

# Start maintaining subscription
asyncSpawn publishMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import options
import chronicles, chronos, results
import
../../../../../waku/waku_core/message/message,
../../../../../waku/waku_core/codecs,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_lightpush/client,
../../../../../waku/waku_lightpush/common,
../../../../../waku/waku_lightpush_legacy/client,
../../../../../waku/waku_lightpush_legacy/common,
../../../../../waku/node/peer_manager/peer_manager,
../../../../alloc

Expand Down Expand Up @@ -98,7 +99,7 @@ proc process*(
return err(errorMsg)

let msgHashHex = (
await waku.node.wakuLightpushClient.publish(
await waku.node.wakuLegacyLightpushClient.publish(
pubsubTopic, msg, peer = peerOpt.get()
)
).valueOr:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ proc process*(
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic

let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
let errorMsg = "Message not sent because no peers found."
error "PUBLISH failed", error = errorMsg
let publishRes = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if publishRes.isErr():
let errorMsg = "Message not sent."
error "PUBLISH failed", error = errorMsg, reason = publishRes.error.msg
return err(errorMsg)
elif numPeers > 0:
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
let numPeers = publishRes.get()
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
of LIST_CONNECTED_PEERS:
let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
Expand Down
7 changes: 4 additions & 3 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ import
./node/test_all,
./waku_filter_v2/test_all,
./waku_peer_exchange/test_all,
./waku_lightpush/test_all,
./waku_lightpush_legacy/test_all,
./waku_relay/test_all,
./incentivization/test_all

import
# Waku v2 tests
./test_wakunode,
./test_wakunode_lightpush,
# ./test_wakunode_lightpush,
./test_wakunode_legacy_lightpush,
./test_peer_store_extended,
./test_message_cache,
./test_peer_manager,
Expand All @@ -98,7 +99,7 @@ import
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_lightpush,
./wakunode_rest/test_rest_lightpush_legacy,
./wakunode_rest/test_rest_admin,
./wakunode_rest/test_rest_cors,
./wakunode_rest/test_rest_health
Expand Down
3 changes: 2 additions & 1 deletion tests/node/test_all.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import
./test_wakunode_filter,
./test_wakunode_lightpush,
./test_wakunode_legacy_lightpush,
# ./test_wakunode_lightpush,
./test_wakunode_peer_exchange,
./test_wakunode_store,
./test_wakunode_legacy_store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import
waku_filter_v2,
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_lightpush,
waku_lightpush/common,
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush_legacy,
waku_lightpush_legacy/common,
waku_lightpush_legacy/client,
waku_lightpush_legacy/protocol_metrics,
waku_lightpush_legacy/rpc,
waku_rln_relay,
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
../resources/payloads

suite "Waku Lightpush - End To End":
suite "Waku Legacy Lightpush - End To End":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler
Expand Down Expand Up @@ -60,8 +60,8 @@ suite "Waku Lightpush - End To End":
await server.start()

await server.mountRelay()
await server.mountLightpush() # without rln-relay
client.mountLightpushClient()
await server.mountLegacyLightpush() # without rln-relay
client.mountLegacyLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
Expand All @@ -79,7 +79,7 @@ suite "Waku Lightpush - End To End":
lightpushClient.mountLightpushClient()

# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
let publishResponse = await lightpushClient.legacyLightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)

Expand All @@ -100,7 +100,7 @@ suite "Waku Lightpush - End To End":
)

# When the client publishes an over-limit message
let publishResponse = await client.lightpushPublish(
let publishResponse = await client.legacyLightpushPublish(
some(pubsubTopic), msgOverLimit, serverRemotePeerInfo
)

Expand Down Expand Up @@ -170,7 +170,7 @@ suite "RLN Proofs as a Lightpush Service":
lightpushClient.mountLightpushClient()

# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
let publishResponse = await lightpushClient.legacyLightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)

Expand Down
22 changes: 11 additions & 11 deletions tests/node/test_wakunode_sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ suite "Sharding":

asyncTest "lightpush":
# Given a connected server and client subscribed to the same pubsub topic
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()

let
Expand All @@ -299,7 +299,7 @@ suite "Sharding":
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic")
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(topic), msg, server.switch.peerInfo.toRemotePeerInfo()
)

Expand Down Expand Up @@ -409,7 +409,7 @@ suite "Sharding":

asyncTest "lightpush (automatic sharding filtering)":
# Given a connected server and client using the same content topic (with two different formats)
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()

let
Expand All @@ -424,7 +424,7 @@ suite "Sharding":
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo()
)

Expand Down Expand Up @@ -567,7 +567,7 @@ suite "Sharding":

asyncTest "lightpush - exclusion (automatic sharding filtering)":
# Given a connected server and client using different content topics
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()

let
Expand All @@ -584,7 +584,7 @@ suite "Sharding":
# When a peer publishes a message in the server's subscribed topic (the client, for testing easeness)
let
msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo()
)

Expand Down Expand Up @@ -854,12 +854,12 @@ suite "Sharding":
(await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()

asyncTest "Protocol with Unconfigured PubSub Topic Fails":
# Given a
# Given a
let
contentTopic = "myContentTopic"
topic = "/waku/2/rs/0/1"
# Using a different topic to simulate "unconfigured" pubsub topic
# but to have a handler (and be able to assert the test)
# but to have a handler (and be able to assert the test)
serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0")
clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0")

Expand All @@ -878,7 +878,7 @@ suite "Sharding":

asyncTest "Waku LightPush Sharding (Static Sharding)":
# Given a connected server and client using two different pubsub topics
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()

# Given a connected server and client subscribed to multiple pubsub topics
Expand All @@ -898,7 +898,7 @@ suite "Sharding":
# When a peer publishes a message (the client, for testing easeness) in topic1
let
msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo()
)

Expand All @@ -916,7 +916,7 @@ suite "Sharding":
clientHandler2.reset()
let
msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
lightpublishResponse2 = await client.lightpushPublish(
lightpublishResponse2 = await client.legacyLightpushPublish(
some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo()
)

Expand Down
10 changes: 5 additions & 5 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ procSuite "Peer Manager":

# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyLightPushCodec)
node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec)

# relay peers (should not be added)
Expand All @@ -788,7 +788,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId

# but the relay peer is not
Expand Down Expand Up @@ -917,21 +917,21 @@ procSuite "Peer Manager":
selectedPeer2.get().peerId == peers[0].peerId

# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLightPushCodec)
let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer3.isSome() == false

# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLightPushCodec)
pm.addServicePeer(peers[2], WakuLegacyLightPushCodec)

# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId

let selectedPeer5 = pm.selectPeer(WakuLightPushCodec)
let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_enr.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import std/[options, sequtils], stew/results, testutils/unittests
import waku/waku_core, waku/waku_enr, ./testlib/wakucore, waku/waku_core/codecs
import waku/waku_core, waku/waku_enr, ./testlib/wakucore

suite "Waku ENR - Capabilities bitfield":
test "check capabilities support":
Expand Down
Loading
Loading