Skip to content

Commit

Permalink
add explored
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Feb 4, 2025
1 parent 86a3c7e commit 96a8da4
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/add_explored_support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
default: minor
---

# Add explored support
38 changes: 23 additions & 15 deletions cmd/clusterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func main() {

siafundAddr string

renterdCount int
hostdCount int
walletdCount int
renterdCount int
hostdCount int
walletdCount int
exploredCount int
)

flag.StringVar(&dir, "dir", "", "directory to store renter data")
Expand All @@ -46,6 +47,7 @@ func main() {
flag.IntVar(&renterdCount, "renterd", 0, "number of renter daemons to run")
flag.IntVar(&hostdCount, "hostd", 0, "number of host daemons to run")
flag.IntVar(&walletdCount, "walletd", 0, "number of wallet daemons to run")
flag.IntVar(&exploredCount, "explored", 0, "number of explorer daemons to run")
flag.Parse()

cfg := zap.NewProductionEncoderConfig()
Expand All @@ -58,17 +60,8 @@ func main() {
cfg.CallerKey = ""
encoder := zapcore.NewConsoleEncoder(cfg)

var level zap.AtomicLevel
switch logLevel {
case "debug":
level = zap.NewAtomicLevelAt(zap.DebugLevel)
case "info":
level = zap.NewAtomicLevelAt(zap.InfoLevel)
case "warn":
level = zap.NewAtomicLevelAt(zap.WarnLevel)
case "error":
level = zap.NewAtomicLevelAt(zap.ErrorLevel)
default:
level, err := zap.ParseAtomicLevel(logLevel)
if err != nil {
fmt.Printf("invalid log level %q", level)
os.Exit(1)
}
Expand All @@ -85,7 +78,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

dir, err := os.MkdirTemp(dir, "sia-cluster-*")
dir, err = os.MkdirTemp(dir, "sia-cluster-*")
if err != nil {
log.Panic("failed to create temp dir", zap.Error(err))
}
Expand Down Expand Up @@ -221,6 +214,21 @@ func main() {
}
}

for i := 0; i < exploredCount; i++ {
ready := make(chan struct{}, 1)
go func() {
if err := nm.StartExplored(ctx, ready); err != nil {
cancel()
log.Error("walletd failed to start", zap.Error(err))
}
}()
select {
case <-ctx.Done():
log.Panic("context canceled")
case <-ready:
}
}

// mine until all payouts have matured
if err := nm.MineBlocks(ctx, 144, types.VoidAddress); err != nil {
log.Panic("failed to mine blocks", zap.Error(err))
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.23.2
require (
go.sia.tech/core v0.9.1
go.sia.tech/coreutils v0.10.2-0.20250124134251-3a96ba4fb39c
go.sia.tech/explored v0.0.0-20250123161922-5ec30ee5f663
go.sia.tech/hostd v1.1.3-0.20250117061650-943b1da7f33a
go.sia.tech/jape v0.12.1
go.sia.tech/renterd v1.1.2-0.20250124142450-4c7858d8f9db
Expand All @@ -23,6 +24,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/gotd/contrib v0.21.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/ip2location/ip2location-go v8.3.0+incompatible // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/gotd/contrib v0.21.0 h1:4Fj05jnyBE84toXZl7mVTvt7f732n5uglvztyG6nTr4=
github.com/gotd/contrib v0.21.0/go.mod h1:ENoUh75IhHGxfz/puVJg8BU4ZF89yrL6Q47TyoNqFYo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/ip2location/ip2location-go v8.3.0+incompatible h1:QwUE+FlSbo6bjOWZpv2Grb57vJhWYFNPyBj2KCvfWaM=
github.com/ip2location/ip2location-go v8.3.0+incompatible/go.mod h1:3JUY1TBjTx1GdA7oRT7Zeqfc0bg3lMMuU5lXmzdpuME=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down Expand Up @@ -59,6 +61,8 @@ go.sia.tech/core v0.9.1 h1:p65iVQP4OnLRvPHBbZDhUR0LFserNIY82M/4de/gNPo=
go.sia.tech/core v0.9.1/go.mod h1:7buI+3k5xO+9PdzBQJlogOAc5h+twDUxEpV6EuXWZ5A=
go.sia.tech/coreutils v0.10.2-0.20250124134251-3a96ba4fb39c h1:FhGQWlWJ0G2PVjaDO4vM5vcHCJZHph6qo1D5hvKLqEY=
go.sia.tech/coreutils v0.10.2-0.20250124134251-3a96ba4fb39c/go.mod h1:99k+BlLKYsKHNdZAr5KqYIhoamPEbwhKZdq4FDV4HtU=
go.sia.tech/explored v0.0.0-20250123161922-5ec30ee5f663 h1:6HFBCDo/EYD03pmoDKwPEz9klIw3buDC2Ldio/WckU8=
go.sia.tech/explored v0.0.0-20250123161922-5ec30ee5f663/go.mod h1:7GTRw7xlV6t1VCdfXKfy2pdHHWAAZF3BpWqeawIDhYU=
go.sia.tech/gofakes3 v0.0.5 h1:vFhVBUFbKE9ZplvLE2w4TQxFMQyF8qvgxV4TaTph+Vw=
go.sia.tech/gofakes3 v0.0.5/go.mod h1:LXEzwGw+OHysWLmagleCttX93cJZlT9rBu/icOZjQ54=
go.sia.tech/hostd v1.1.3-0.20250117061650-943b1da7f33a h1:R0OVTc+MuPqLvso0Ocki+itPnKAYsWgcIte7WSopOzs=
Expand Down
195 changes: 195 additions & 0 deletions nodes/explored.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package nodes

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"go.sia.tech/core/gateway"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/testutil"
"go.sia.tech/explored/api"
"go.sia.tech/explored/config"
"go.sia.tech/explored/exchangerates"
"go.sia.tech/explored/explorer"
"go.sia.tech/explored/persist/sqlite"
"go.uber.org/zap"
"lukechampine.com/frand"
)

// StartExplored starts a new explored node. It listens on random ports and registers
// itself with the Manager. This function blocks until the context is
// canceled. All restources will be cleaned up before the funcion returns.
func (m *Manager) StartExplored(ctx context.Context, ready chan<- struct{}) (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = fmt.Errorf("panic: %v", r)
}
}()

done, err := m.incrementWaitGroup()
if err != nil {
return err
}
defer done()

node := Node{
ID: NodeID(frand.Bytes(8)),
Type: NodeTypeExplored,
}
log := m.log.Named("explored." + node.ID.String())

dir, err := createNodeDir(m.dir, node.ID)
if err != nil {
return fmt.Errorf("failed to create node directory: %w", err)
}
defer os.RemoveAll(dir)

httpListener, err := net.Listen("tcp", ":0")
if err != nil {
return fmt.Errorf("failed to listen on http address: %w", err)
}
defer httpListener.Close()

network := m.chain.TipState().Network

var cm *chain.Manager
var s *syncer.Syncer
if m.shareConsensus {
cm = m.chain
s = m.syncer
} else {
// start a chain manager
genesisIndex, ok := m.chain.BestIndex(0)
if !ok {
return errors.New("failed to get genesis index")
}
genesis, ok := m.chain.Block(genesisIndex.ID)
if !ok {
return errors.New("failed to get genesis block")
}
bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
if err != nil {
return fmt.Errorf("failed to open bolt db: %w", err)
}
defer bdb.Close()
dbstore, tipState, err := chain.NewDBStore(bdb, network, genesis)
if err != nil {
return fmt.Errorf("failed to create dbstore: %w", err)
}

cm = chain.NewManager(dbstore, tipState)

syncerListener, err := net.Listen("tcp", ":0")
if err != nil {
return fmt.Errorf("failed to listen on syncer address: %w", err)
}
defer syncerListener.Close()

// start a syncer
_, port, err := net.SplitHostPort(syncerListener.Addr().String())
if err != nil {
return fmt.Errorf("failed to split syncer address: %w", err)
}
s = syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesisIndex.ID,
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "127.0.0.1" + port,
}, syncer.WithLogger(log.Named("syncer")),
syncer.WithPeerDiscoveryInterval(5*time.Second),
syncer.WithSyncInterval(5*time.Second),
syncer.WithMaxInboundPeers(10000),
syncer.WithMaxOutboundPeers(10000))
defer s.Close()
go s.Run()

node.SyncerAddress = syncerListener.Addr().String()
// connect to the cluster syncer
_, err = m.syncer.Connect(ctx, node.SyncerAddress)
if err != nil {
return fmt.Errorf("failed to connect to cluster syncer: %w", err)
}
}

store, err := sqlite.OpenDatabase(filepath.Join(dir, "explored.sqlite3"), log.Named("sqlite3"))
if err != nil {
return fmt.Errorf("failed to open sqlite database: %w", err)
}
defer store.Close()

e, err := explorer.NewExplorer(cm, store, 10, config.Scanner{
Threads: 4,
Timeout: 10 * time.Second,
MaxLastScan: 5 * time.Minute,
MinLastAnnouncement: 365 * 24 * time.Hour,
}, log.Named("explorer"))
if err != nil {
return fmt.Errorf("failed to create explorer: %w", err)
}
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer timeoutCancel()
defer e.Shutdown(timeoutCtx)

var sources []exchangerates.Source
sources = append(sources, exchangerates.NewKraken(map[string]string{
exchangerates.CurrencyUSD: exchangerates.KrakenPairSiacoinUSD,
exchangerates.CurrencyEUR: exchangerates.KrakenPairSiacoinEUR,
exchangerates.CurrencyBTC: exchangerates.KrakenPairSiacoinBTC,
}, time.Minute))

ex, err := exchangerates.NewAverager(true, sources...)
if err != nil {
return fmt.Errorf("failed to create exchange rate source: %w", err)
}
go ex.Start(ctx)

api := api.NewServer(e, cm, s, ex)
server := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api") {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/api")
api.ServeHTTP(w, r)
return
}
http.NotFound(w, r)
}),
ReadTimeout: 15 * time.Second,
}
defer server.Close()
go server.Serve(httpListener)

node.APIAddress = "http://" + httpListener.Addr().String()

waitForSync := func() error {
tip, err := e.Tip()
if err != nil {
return err
}
// wait for sync
for m.chain.Tip() != tip {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
}
}
return nil
}

if err := waitForSync(); err != nil {
return fmt.Errorf("failed to wait for sync: %w", err)
}

log.Info("node started", zap.String("network", cm.TipState().Network.Name), zap.String("http", httpListener.Addr().String()))
m.addNodeAndWait(ctx, node, ready)

return nil
}
7 changes: 4 additions & 3 deletions nodes/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (

// Types for the supported nodes.
const (
NodeTypeRenterd = NodeType("renterd")
NodeTypeHostd = NodeType("hostd")
NodeTypeWalletd = NodeType("walletd")
NodeTypeRenterd = NodeType("renterd")
NodeTypeHostd = NodeType("hostd")
NodeTypeWalletd = NodeType("walletd")
NodeTypeExplored = NodeType("explored")
)

type (
Expand Down

0 comments on commit 96a8da4

Please sign in to comment.