Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix p2p peers #449

Merged
merged 20 commits into from
May 22, 2024
Merged
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,12 @@ npm run start
```

The dashboard will be made available at: `http://localhost:8000/dashboard/`

## Networking in cloud environments or DMZ

Sometimes, you need to manually specify an external address that has to be announced by the node, in order for others to be able to connect to the node.
In order to do that, use P2P_ANNOUNCE_ADDRESSES env:

```bash
export P2P_ANNOUNCE_ADDRESSES=[\"/ip4/1.2.3.4/tcp/8000\"]
```
18 changes: 11 additions & 7 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,33 @@
**Warning**: the names of some of these environment variables might change at some point in the future.

This page lists the environment variables used by `ocean-node` and what effect
they have.

they have.

## Core

- `PRIVATE_KEY` : Private key used by this node (applies to p2p peer id, asset encryption key, etc)
- `RPCS` : List of RPC URL for each chain. Example:
- `RPCS` : List of RPC URL for each chain. Example:

```bash
export RPC="{ \"1\": \"https://rpc.eth.gateway.fm\", \"137\": \"https://polygon.meowrpc.com\", \"80001\": \"https://rpc-mumbai.maticvigil.com\" }"
```

## P2P

- `P2P_ipV4BindAddress` : Bind address for IPV4. Defaults to `0.0.0.0`
- `P2P_ipV4BindTcpPort` : Port used on IPv4 TCP connections. Defaults to `0` (Use whatever port is free. When running as docker, please set it explicitly)
- `P2P_ipV4BindWsPort` : Port used on IPv4 WS connections. Defaults to `0` (Use whatever port is free. When running as docker, please set it explicitly)
- `P2P_ipV6BindAddress` : Bind address for IPV6. Defaults to `::1`
- `P2P_ipV6BindTcpPort` : Port used on IPv6 TCP connections. Defaults to `0` (Use whatever port is free. When running as docker, please set it explicitly)
- `P2P_ipV6BindWsPort` : Port used on IPv6 WS connections. Defaults to `0` (Use whatever port is free. When running as docker, please set it explicitly)
- `P2P_pubsubPeerDiscoveryInterval` : Interval (in ms) for discovery using pubsub. Defaults to `1000` (one second)
- `P2P_ANNOUNCE_ADDRESSES` : List of addresses to announce to the network. Example: ["/ip4/1.2.3.4/tcp/8000"]
- `P2P_pubsubPeerDiscoveryInterval` : Interval (in ms) for discovery using pubsub. Defaults to `1000` (one second)
- `P2P_dhtMaxInboundStreams` : Maximum no of DHT inbound streams. Defaults to `500`
- `P2P_dhtMaxOutboundStreams` : Maximum no of DHT outbound streams. Defaults to `500`
- `P2P_mDNSInterval` : Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds)
- `P2P_mDNSInterval` : Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds)
- `P2P_connectionsMaxParallelDials` : Maximum no of parallel dials. Defaults to `150`
- `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds)
- `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds)

## HTTP
- `HTTP_API_PORT` : Port used for HTTP interface. Defaults to `8000`

- `HTTP_API_PORT` : Port used for HTTP interface. Defaults to `8000`
1 change: 1 addition & 0 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface OceanNodeP2PConfig {
mDNSInterval: number
connectionsMaxParallelDials: number
connectionsDialTimeout: number
announceAddresses: string[]
}

export interface OceanNodeConfig {
Expand Down
52 changes: 0 additions & 52 deletions src/components/P2P/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,2 @@
export * from './handleBroadcasts.js'
export * from './handleProtocolCommands.js'

export function handlePeerConnect(details: any) {
if (details) {
// const peerId = details.detail
// console.log('Connection established to:', peerId.toString()) // Emitted when a peer has been found
/*
try{
this._libp2p.services.pubsub.connect(peerId.toString())
}
catch(e){
console.log(e)
console.log("Failed to connect pubsub")
}
*/
}
// else{
// console.log("Null evt ")
// }
}

export function handlePeerDisconnect(details: any) {
// const peerId = details.detail
// console.log('Connection closed to:', peerId.toString()) // Emitted when a peer has been found
}

export function handlePeerDiscovery(details: any) {
// const peerInfo = details.detail
// console.log('Discovered new peer:', peerInfo.id.toString())
// console.log(details.detail)
/*
try{
//this._libp2p.services.pubsub.connect(peerInfo.id.toString())
this._libp2p.services.dht.connect(peerInfo.id.toString())
}
catch(e){
console.log(e)
console.log("Failed to connect pubsub")
}
*/
}

export function handlePeerJoined(details: any) {
// console.log('New peer joined us:', details)
}

export function handlePeerLeft(details: any) {
// console.log('New peer joined us:', details)
}

export function handleSubscriptionCHange(details: any) {
// console.log('subscription-change:', details.detail)
}
149 changes: 106 additions & 43 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import diff from 'hyperdiff'

Check failure on line 1 in src/components/P2P/index.ts

View workflow job for this annotation

GitHub Actions / lint

'diff' is defined but never used
import { P2PCommandResponse, TypesenseSearchResponse } from '../../@types/index'
import EventEmitter from 'node:events'
import clone from 'lodash.clonedeep'

import {
handleBroadcasts,
handlePeerConnect,
handlePeerDiscovery,
handlePeerDisconnect,
// handlePeerConnect,
// handlePeerDiscovery,
// handlePeerDisconnect,
handleProtocolCommands
} from './handlers.js'

Expand All @@ -32,7 +32,6 @@
import { uPnPNAT } from '@libp2p/upnp-nat'
import { ping } from '@libp2p/ping'
import { dcutr } from '@libp2p/dcutr'

import { kadDHT } from '@libp2p/kad-dht'
import { gossipsub } from '@chainsafe/libp2p-gossipsub'

Expand Down Expand Up @@ -110,12 +109,21 @@
this._topic = 'oceanprotocol'
this._libp2p = await this.createNode(this._config)

this._libp2p.addEventListener('peer:connect', (evt: any) => {
this.handlePeerConnect(evt)
})
this._libp2p.addEventListener('peer:disconnect', (evt: any) => {
this.handlePeerDisconnect(evt)
})
this._libp2p.addEventListener('peer:discovery', (evt: any) => {
this.handlePeerDiscovery(evt)
})
this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
this._peers = []
this._connections = {}
this._protocol = '/ocean/nodes/1.0.0'

this._interval = setInterval(this._pollPeers.bind(this), this._options.pollInterval)
// this._interval = setInterval(this._pollPeers.bind(this), this._options.pollInterval)
this._libp2p.handle(this._protocol, handleProtocolCommands.bind(this))

setInterval(this.republishStoredDDOS.bind(this), REPUBLISH_INTERVAL_HOURS)
Expand All @@ -134,12 +142,54 @@
})
}

handlePeerConnect(details: any) {
if (details) {
const peerId = details.detail
P2P_LOGGER.debug('Connection established to:' + peerId.toString()) // Emitted when a peer has been found
try {
this._libp2p.services.pubsub.connect(peerId.toString())
} catch (e) {}
} else {
/* empty */
}
}

handlePeerDisconnect(details: any) {
const peerId = details.detail
P2P_LOGGER.debug('Connection closed to:' + peerId.toString()) // Emitted when a peer has been found
}

handlePeerDiscovery(details: any) {
const peerInfo = details.detail
P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString())
}

handlePeerJoined(details: any) {
P2P_LOGGER.debug('New peer joined us:' + details)
}

handlePeerLeft(details: any) {
P2P_LOGGER.debug('Peer left us:' + details)
}

handlePeerMessage(details: any) {
P2P_LOGGER.debug('peer joined us:' + details)
}

handleSubscriptionCHange(details: any) {
P2P_LOGGER.debug('subscription-change:' + details.detail)
}

shouldAnnounce(multiaddr: any) {
// TO DO
return true
}

async createNode(config: OceanNodeConfig): Promise<Libp2p | null> {
try {
this._publicAddress = config.keys.peerId.toString()
this._publicKey = config.keys.publicKey
this._privateKey = config.keys.privateKey

/** @type {import('libp2p').Libp2pOptions} */
// start with some default, overwrite based on config later
const options = {
Expand All @@ -149,7 +199,10 @@
`/ip4/${config.p2pConfig.ipV4BindAddress}/tcp/${config.p2pConfig.ipV4BindWsPort}/ws`,
`/ip6/${config.p2pConfig.ipV6BindAddress}/tcp/${config.p2pConfig.ipV6BindTcpPort}`,
`/ip6/${config.p2pConfig.ipV6BindAddress}/tcp/${config.p2pConfig.ipV6BindWsPort}/ws`
]
],
announce: config.p2pConfig.announceAddresses,
announceFilter: (multiaddrs: any[]) =>
multiaddrs.filter((m) => this.shouldAnnounce(m))
},
peerId: config.keys.peerId,
transports: [webSockets(), tcp(), circuitRelayTransport()],
Expand All @@ -160,7 +213,11 @@
],
peerDiscovery: [
bootstrap({
list: config.p2pConfig.bootstrapNodes
list: config.p2pConfig.bootstrapNodes,
timeout: 1000, // in ms,
tagName: 'bootstrap',
tagValue: 50,
tagTTL: 10000000000
}),
pubsubPeerDiscovery({
interval: config.p2pConfig.pubsubPeerDiscoveryInterval,
Expand Down Expand Up @@ -211,15 +268,6 @@
}
const node = await createLibp2p(options)
await node.start()
node.addEventListener('peer:connect', (evt: any) => {
handlePeerConnect(evt)
})
node.addEventListener('peer:disconnect', (evt: any) => {
handlePeerDisconnect(evt)
})
node.addEventListener('peer:discovery', (evt: any) => {
handlePeerDiscovery(evt)
})

// node.services.pubsub.addEventListener( 'peer joined', (evt:any) => {handlePeerJoined(evt)})
// node.services.pubsub.addEventListener('peer left', (evt:any) => {handlePeerLeft(evt)})
Expand Down Expand Up @@ -247,7 +295,8 @@
// ;(node.services.upnpNAT as any).mapIpAddresses()
;(node.services.upnpNAT as any).mapIpAddresses().catch((err: any) => {
// hole punching errors are non-fatal
console.error(err)
P2P_LOGGER.info('Failed to configure UPNP Gateway(if you have one)')
P2P_LOGGER.debug(err)
})
return node
} catch (e) {
Expand All @@ -269,12 +318,23 @@
// }
}

getPeers() {
return this._peers.slice(0)
async getPeers() {
const allPeers = await this._libp2p.peerStore.all()
const oceanPeers = []
for (const peer of allPeers) {
if (peer && peer.protocols) console.log(peer.id)
for (const protocol of peer.protocols) {
if (protocol === this._protocol) {
oceanPeers.push(peer.id.toString())
}
}
}
return oceanPeers
}

hasPeer(peer: any) {
return Boolean(this._peers.find((p) => p.toString() === peer.toString()))
async hasPeer(peer: any) {
const s = await this._libp2p.peerStore.all()
return Boolean(s.find((p: any) => p.toString() === peer.toString()))
}

async broadcast(_message: any) {
Expand Down Expand Up @@ -386,11 +446,12 @@

// return response
// }

/*
async _pollPeers() {
const node = <any>this._libp2p
const newPeers = (await node.services.pubsub.getSubscribers(this._topic)).sort()

console.log('============ newPeers ====================')
console.log(newPeers)
if (this._emitChanges(newPeers)) {
const addedNew = newPeers.length > this._peers.length
this._peers = newPeers
Expand All @@ -407,16 +468,19 @@
}

_emitChanges(newPeers: any) {
console.log('============ _emitChanges ================== ')
console.log(newPeers)
const peers = this._peers.map((p) => p.toString())
const newpeers = newPeers.map((x: any) => x.toString())
const differences = diff(peers, newpeers)

differences.added.forEach((peer: any) => this.emit('peer joined', peer))
differences.removed.forEach((peer: any) => this.emit('peer left', peer))

return differences.added.length > 0 || differences.removed.length > 0
const x = differences.added.length > 0 || differences.removed.length > 0
console.log(x)
return x
}

*/
_onMessage(event: any) {
const message = event.detail

Expand All @@ -428,22 +492,21 @@
async advertiseDid(did: string) {
P2P_LOGGER.logMessage('Advertising ' + did, true)
try {
const x = this._peers.length
if (x > 0) {
const cid = await cidFromRawString(did)
const multiAddrs = this._libp2p.components.addressManager.getAddresses()
// console.log('multiaddrs: ', multiAddrs)
await this._libp2p.contentRouting.provide(cid, multiAddrs)
} else {
P2P_LOGGER.warn(
'Could not find any Ocean peers. Nobody is listening at the moment, skipping...'
)
// save it for retry later
// https://github.com/libp2p/js-libp2p-kad-dht/issues/98
if (!this._pendingAdvertise.includes(did)) {
this._pendingAdvertise.push(did)
}
}
// const x = this._peers.length
// if (x > 0) {
const cid = await cidFromRawString(did)
const multiAddrs = this._libp2p.components.addressManager.getAddresses()
// console.log('multiaddrs: ', multiAddrs)
await this._libp2p.contentRouting.provide(cid, multiAddrs)
// } else {
// P2P_LOGGER.warn(
// 'Could not find any Ocean peers. Nobody is listening at the moment, skipping...'
// )
// // save it for retry later
// // https://github.com/libp2p/js-libp2p-kad-dht/issues/98
// if (!this._pendingAdvertise.includes(did)) {
// this._pendingAdvertise.push(did)
// }
} catch (e) {
P2P_LOGGER.error('advertiseDid():' + e.message)
}
Expand Down
Loading
Loading