Skip to content

Commit

Permalink
Merge pull request #579 from threefoldtech/development_rmb_use_sub_ma…
Browse files Browse the repository at this point in the history
…nager

use substrate manager instead of connection and open a connection if …
  • Loading branch information
xmonader authored Dec 27, 2023
2 parents b02cae7 + b61a923 commit fd83ee0
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 97 deletions.
7 changes: 1 addition & 6 deletions farmerbot/examples/findnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ import (
func findNode() (uint32, error) {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return 0, fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()

client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-find-node",
sub,
subManager,
true,
)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions farmerbot/examples/includenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ import (
func powerOn() error {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()

client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-include",
sub,
subManager,
true,
)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions farmerbot/examples/poweroff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ import (
func powerOff() error {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()

client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-power-off",
sub,
subManager,
true,
)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions farmerbot/examples/poweron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ import (
func powerOn() error {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()

client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-power-on",
sub,
subManager,
true,
)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions farmerbot/examples/version/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ import (
func version() error {
mnemonics := "<Enter MNEMONIC here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()

client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-version",
sub,
subManager,
true,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions farmerbot/internal/farmerbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewFarmerBot(ctx context.Context, config Config, network, mnemonicOrSeed st
return FarmerBot{}, err
}

rmb, err := peer.NewRpcClient(ctx, peer.KeyTypeSr25519, farmerbot.mnemonicOrSeed, relayURLs[network], fmt.Sprintf("farmerbot-rpc-%d", config.FarmID), subConn, true)
rmb, err := peer.NewRpcClient(ctx, peer.KeyTypeSr25519, farmerbot.mnemonicOrSeed, relayURLs[network], fmt.Sprintf("farmerbot-rpc-%d", config.FarmID), farmerbot.substrateManager, true)
if err != nil {
return FarmerBot{}, fmt.Errorf("could not create rmb client with error %w", err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func (f *FarmerBot) serve(ctx context.Context) error {
_, err = peer.NewPeer(
ctx,
f.mnemonicOrSeed,
subConn,
f.substrateManager,
router.Serve,
peer.WithRelay(relayURLs[f.network]),
peer.WithSession(fmt.Sprintf("farmerbot-%d", f.farm.ID)),
Expand Down
2 changes: 1 addition & 1 deletion grid-client/deployer/tf_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func NewTFPluginClient(
ctx, cancel := context.WithCancel(context.Background())
tfPluginClient.cancelRelayContext = cancel

rmbClient, err := peer.NewRpcClient(ctx, keyType, tfPluginClient.mnemonicOrSeed, tfPluginClient.relayURL, sessionID, sub.Substrate, true)
rmbClient, err := peer.NewRpcClient(ctx, keyType, tfPluginClient.mnemonicOrSeed, tfPluginClient.relayURL, sessionID, manager, true)
if err != nil {
return TFPluginClient{}, errors.Wrap(err, "could not create rmb client")
}
Expand Down
13 changes: 4 additions & 9 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,8 @@ func main() {
defer cancel()

subManager := substrate.NewManager(f.tfChainURL)
sub, err := subManager.Substrate()
if err != nil {
log.Fatal().Err(err).Msg(fmt.Sprintf("failed to connect to TF chain URL: %s", err))
}
defer sub.Close()

relayRPCClient, err := createRPCRMBClient(ctx, f.relayURL, f.mnemonics, sub)
relayRPCClient, err := createRPCRMBClient(ctx, f.relayURL, f.mnemonics, subManager)
if err != nil {
log.Fatal().Err(err).Msg("failed to create relay client")
}
Expand All @@ -124,7 +119,7 @@ func main() {
ctx,
f.relayURL,
f.mnemonics,
sub, db,
subManager, db,
f.indexerCheckIntervalMins,
f.indexerBatchSize,
f.indexerResultWorkers,
Expand Down Expand Up @@ -192,9 +187,9 @@ func app(s *http.Server, f flags) error {
return nil
}

func createRPCRMBClient(ctx context.Context, relayURL, mnemonics string, sub *substrate.Substrate) (rmb.Client, error) {
func createRPCRMBClient(ctx context.Context, relayURL, mnemonics string, subManager substrate.Manager) (rmb.Client, error) {
sessionId := fmt.Sprintf("tfgrid_proxy-%d", os.Getpid())
client, err := peer.NewRpcClient(ctx, peer.KeyTypeSr25519, mnemonics, relayURL, sessionId, sub, true)
client, err := peer.NewRpcClient(ctx, peer.KeyTypeSr25519, mnemonics, relayURL, sessionId, subManager, true)
if err != nil {
return nil, fmt.Errorf("failed to create direct RPC RMB client: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions grid-proxy/internal/gpuindexer/gpuindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewNodeGPUIndexer(
ctx context.Context,
relayURL,
mnemonics string,
sub *substrate.Substrate,
subManager substrate.Manager,
db db.Database,
indexerCheckIntervalMins,
batchSize,
Expand All @@ -60,7 +60,7 @@ func NewNodeGPUIndexer(
client, err := peer.NewPeer(
ctx,
mnemonics,
sub,
subManager,
indexer.relayCallback,
peer.WithRelay(relayURL),
peer.WithSession(sessionId),
Expand Down
7 changes: 1 addition & 6 deletions monitoring-bot/internal/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,8 @@ func NewMonitor(ctx context.Context, env config, wallets wallets) (Monitor, erro
for _, network := range networks {
mon.managers[network] = client.NewManager(SubstrateURLs[network]...)

con, err := mon.managers[network].Substrate()
if err != nil {
return mon, fmt.Errorf("substrate connection for %s network failed with error: %w", network, err)
}

sessionID := fmt.Sprintf("monbot-%d", os.Getpid())
rmbClient, err := peer.NewRpcClient(ctx, "sr25519", mon.mnemonics[network], RelayURLS[network], sessionID, con, true)
rmbClient, err := peer.NewRpcClient(ctx, "sr25519", mon.mnemonics[network], RelayURLS[network], sessionID, mon.managers[network], true)
if err != nil {
return mon, fmt.Errorf("couldn't create rpc client in network %s with error: %w", network, err)
}
Expand Down
17 changes: 4 additions & 13 deletions rmb-sdk-go/examples/rpc_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,19 @@ func app() error {
subNodeURL := "wss://tfchain.dev.grid.tf/ws"
relayURL := "wss://relay.dev.grid.tf"

sub, err := substrate.NewManager(subNodeURL).Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}
defer sub.Close()
subManager := substrate.NewManager(subNodeURL)

client, err := peer.NewRpcClient(context.Background(), peer.KeyTypeSr25519, mnemonics, relayURL, "test-client", sub, true)
client, err := peer.NewRpcClient(context.Background(), peer.KeyTypeSr25519, mnemonics, relayURL, "test-client", subManager, true)
if err != nil {
return fmt.Errorf("failed to create direct client: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

const dstNode = 11 // <- replace this with any node id
node, err := sub.GetNode(dstNode)
if err != nil {
return err
}

const dstTwin uint32 = 11 // <- replace this with any node i
var ver version
if err := client.Call(ctx, uint32(node.TwinID), "zos.system.version", nil, &ver); err != nil {
if err := client.Call(ctx, dstTwin, "zos.system.version", nil, &ver); err != nil {
return err
}

Expand Down
8 changes: 1 addition & 7 deletions rmb-sdk-go/peer/examples/peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ var resultsChan = make(chan bool)
func app() error {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
ctx := context.Background()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}

defer sub.Close()

peer, err := peer.NewPeer(
ctx,
mnemonics,
sub,
subManager,
relayCallback,
peer.WithRelay("wss://relay.dev.grid.tf"),
peer.WithSession("test-client"),
Expand Down
10 changes: 2 additions & 8 deletions rmb-sdk-go/peer/examples/router_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,15 @@ func app() error {
// adding a peer for the router
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
ctx := context.Background()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}

defer sub.Close()

// this peer will be a 'calculator' session.
// means other peers on the network need to know that
// session id to use when they are making calls
_, err = peer.NewPeer(
_, err := peer.NewPeer(
ctx,
mnemonics,
sub,
subManager,
router.Serve,
peer.WithRelay("wss://relay.dev.grid.tf"),
peer.WithSession("calculator"),
Expand Down
7 changes: 1 addition & 6 deletions rmb-sdk-go/peer/examples/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ import (
func app() error {
mnemonics := "<mnemonics goes here>"
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
sub, err := subManager.Substrate()
if err != nil {
return fmt.Errorf("failed to connect to substrate: %w", err)
}

defer sub.Close()
client, err := peer.NewRpcClient(
context.Background(),
peer.KeyTypeSr25519,
mnemonics,
"wss://relay.dev.grid.tf",
"test-client",
sub,
subManager,
true,
)
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions rmb-sdk-go/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getIdentity(keytype string, mnemonics string) (substrate.Identity, error) {
func NewPeer(
ctx context.Context,
mnemonics string,
sub *substrate.Substrate,
subManager substrate.Manager,
handler Handler,
opts ...PeerOpt) (*Peer, error) {

Expand All @@ -103,7 +103,7 @@ func NewPeer(
return nil, err
}

twinDB := NewTwinDB(sub)
twinDB := NewTwinDB(subManager)
id, err := twinDB.GetByPk(identity.PublicKey())
if err != nil {
return nil, errors.Wrapf(err, "failed to get twin by public key")
Expand Down Expand Up @@ -134,7 +134,13 @@ func NewPeer(

if !bytes.Equal(twin.E2EKey, publicKey) || twin.Relay == nil || url.Hostname() != *twin.Relay {
log.Info().Msg("twin relay/public key didn't match, updating on chain ...")
if _, err = sub.UpdateTwin(identity, url.Hostname(), publicKey); err != nil {
subConn, err := subManager.Substrate()
if err != nil {
return nil, errors.Wrap(err, "could not start substrate connection")
}
defer subConn.Close()

if _, err = subConn.UpdateTwin(identity, url.Hostname(), publicKey); err != nil {
return nil, errors.Wrap(err, "could not update twin relay information")
}
}
Expand Down
4 changes: 2 additions & 2 deletions rmb-sdk-go/peer/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRpcClient(
mnemonics string,
relayURL string,
session string,
sub *substrate.Substrate,
subManager substrate.Manager,
enableEncryption bool) (*RpcCLient, error) {

rpc := RpcCLient{
Expand All @@ -47,7 +47,7 @@ func NewRpcClient(
base, err := NewPeer(
ctx,
mnemonics,
sub,
subManager,
rpc.router,
WithEncryption(enableEncryption),
WithKeyType(keytype),
Expand Down
Loading

0 comments on commit fd83ee0

Please sign in to comment.