Skip to content

Commit

Permalink
Merge pull request #93 from SiaFoundation/nate/add-state-endpoints
Browse files Browse the repository at this point in the history
Add state endpoints
  • Loading branch information
ChrisSchinnerl authored Apr 3, 2024
2 parents ec95962 + b994800 commit 5dc0f18
Show file tree
Hide file tree
Showing 15 changed files with 437 additions and 140 deletions.
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ FROM docker.io/library/golang:1.21 AS builder

WORKDIR /walletd

# Install dependencies
COPY go.mod go.sum ./
RUN go mod download

# Copy source
COPY . .

# Enable CGO for sqlite3 support
ENV CGO_ENABLED=1

RUN go generate ./...
RUN go build -o bin/ -tags='netgo timetzdata' -trimpath -a -ldflags '-s -w -linkmode external -extldflags "-static"' ./cmd/walletd

FROM docker.io/library/alpine:3
Expand Down
17 changes: 17 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ import (
"go.sia.tech/walletd/wallet"
)

// A StateResponse returns information about the current state of the walletd
// daemon.
type StateResponse struct {
Version string `json:"version"`
Commit string `json:"commit"`
OS string `json:"os"`
BuildTime time.Time `json:"buildTime"`
StartTime time.Time `json:"startTime"`
}

// A GatewayPeer is a currently-connected peer.
type GatewayPeer struct {
Addr string `json:"addr"`
Expand Down Expand Up @@ -83,3 +93,10 @@ type SeedSignRequest struct {
Transaction types.Transaction `json:"transaction"`
Keys []uint64 `json:"keys"`
}

type RescanResponse struct {
StartIndex types.ChainIndex `json:"startIndex"`
Index types.ChainIndex `json:"index"`
StartTime time.Time `json:"startTime"`
Error *string `json:"error,omitempty"`
}
33 changes: 25 additions & 8 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,13 @@ func TestWallet(t *testing.T) {
}
defer ws.Close()

peerStore, err := sqlite.NewPeerStore(ws)
if err != nil {
t.Fatal(err)
}

// create the syncer
s := syncer.New(syncerListener, cm, ws, gateway.Header{
s := syncer.New(syncerListener, cm, peerStore, gateway.Header{
GenesisID: genesisBlock.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: syncerListener.Addr().String(),
Expand All @@ -288,9 +293,10 @@ func TestWallet(t *testing.T) {
t.Fatalf("expected wallet name to be 'primary', got %v", w.Name)
}
wc := c.Wallet(w.ID)
if err := c.Resubscribe(0); err != nil {
if err := c.Rescan(0); err != nil {
t.Fatal(err)
}
waitForBlock(t, cm, ws)

balance, err := wc.Balance()
if err != nil {
Expand Down Expand Up @@ -505,9 +511,10 @@ func TestAddresses(t *testing.T) {
t.Fatalf("expected wallet name to be 'primary', got %v", w.Name)
}
wc := c.Wallet(w.ID)
if err := c.Resubscribe(0); err != nil {
if err := c.Rescan(0); err != nil {
t.Fatal(err)
}
waitForBlock(t, cm, ws)

balance, err := wc.Balance()
if err != nil {
Expand Down Expand Up @@ -707,9 +714,11 @@ func TestV2(t *testing.T) {
if err := secondary.AddAddress(wallet.Address{Address: secondaryAddress}); err != nil {
t.Fatal(err)
}
if err := c.Resubscribe(0); err != nil {

if err := c.Rescan(0); err != nil {
t.Fatal(err)
}
waitForBlock(t, cm, ws)

// define some helper functions
addBlock := func(txns []types.Transaction, v2txns []types.V2Transaction) error {
Expand Down Expand Up @@ -901,6 +910,12 @@ func TestP2P(t *testing.T) {
t.Fatal(err)
}
defer store1.Close()

peerStore, err := sqlite.NewPeerStore(store1)
if err != nil {
t.Fatal(err)
}

wm1, err := wallet.NewManager(cm1, store1, log1.Named("wallet"))
if err != nil {
t.Fatal(err)
Expand All @@ -910,7 +925,7 @@ func TestP2P(t *testing.T) {
t.Fatal(err)
}
defer l1.Close()
s1 := syncer.New(l1, cm1, store1, gateway.Header{
s1 := syncer.New(l1, cm1, peerStore, gateway.Header{
GenesisID: genesisBlock.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
Expand All @@ -926,9 +941,10 @@ func TestP2P(t *testing.T) {
if err := primary.AddAddress(wallet.Address{Address: primaryAddress}); err != nil {
t.Fatal(err)
}
if err := c1.Resubscribe(0); err != nil {
if err := c1.Rescan(0); err != nil {
t.Fatal(err)
}
waitForBlock(t, cm1, store1)

dbstore2, tipState, err := chain.NewDBStore(chain.NewMemDB(), n, genesisBlock)
if err != nil {
Expand All @@ -950,7 +966,7 @@ func TestP2P(t *testing.T) {
t.Fatal(err)
}
defer l2.Close()
s2 := syncer.New(l2, cm2, store2, gateway.Header{
s2 := syncer.New(l2, cm2, peerStore, gateway.Header{
GenesisID: genesisBlock.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
Expand All @@ -967,9 +983,10 @@ func TestP2P(t *testing.T) {
if err := secondary.AddAddress(wallet.Address{Address: secondaryAddress}); err != nil {
t.Fatal(err)
}
if err := c2.Resubscribe(0); err != nil {
if err := c2.Rescan(0); err != nil {
t.Fatal(err)
}
waitForBlock(t, cm2, store2)

// define some helper functions
addBlock := func() error {
Expand Down
19 changes: 15 additions & 4 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Client struct {
n *consensus.Network // for ConsensusTipState
}

// State returns information about the current state of the walletd daemon.
func (c *Client) State() (resp StateResponse, err error) {
err = c.c.GET("/state", &resp)
return
}

// TxpoolBroadcast broadcasts a set of transaction to the network.
func (c *Client) TxpoolBroadcast(txns []types.Transaction, v2txns []types.V2Transaction) (err error) {
err = c.c.POST("/txpool/broadcast", TxpoolBroadcastRequest{txns, v2txns}, nil)
Expand Down Expand Up @@ -110,10 +116,15 @@ func (c *Client) Wallet(id wallet.ID) *WalletClient {
return &WalletClient{c: c.c, id: id}
}

// Resubscribe subscribes the wallet to consensus updates, starting at the
// specified height.
func (c *Client) Resubscribe(height uint64) (err error) {
err = c.c.POST("/resubscribe", height, nil)
// ScanStatus returns the current state of wallet scanning.
func (c *Client) ScanStatus() (resp RescanResponse, err error) {
err = c.c.GET("/rescan", &resp)
return
}

// Rescan rescans the blockchain starting from the specified height.
func (c *Client) Rescan(height uint64) (err error) {
err = c.c.POST("/rescan", height, nil)
return
}

Expand Down
131 changes: 104 additions & 27 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"net/http"
"reflect"
"runtime"
"sync"
"time"

Expand All @@ -15,12 +16,14 @@ import (
"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/walletd/build"
"go.sia.tech/walletd/wallet"
)

type (
// A ChainManager manages blockchain and txpool state.
ChainManager interface {
BestIndex(height uint64) (types.ChainIndex, bool)
TipState() consensus.State
AddBlocks([]types.Block) error
RecommendedFee() types.Currency
Expand All @@ -45,7 +48,8 @@ type (

// A WalletManager manages wallets, keyed by name.
WalletManager interface {
Subscribe(startHeight uint64) error
Tip() (types.ChainIndex, error)
Scan(index types.ChainIndex) error

AddWallet(wallet.Wallet) (wallet.Wallet, error)
UpdateWallet(wallet.Wallet) (wallet.Wallet, error)
Expand All @@ -71,13 +75,29 @@ type (
)

type server struct {
startTime time.Time

cm ChainManager
s Syncer
wm WalletManager

// for walletsReserveHandler
mu sync.Mutex
used map[types.Hash256]bool

scanMu sync.Mutex // for resubscribe
scanInProgress bool
scanInfo RescanResponse
}

func (s *server) stateHandler(jc jape.Context) {
jc.Encode(StateResponse{
Version: build.Version(),
Commit: build.Commit(),
OS: runtime.GOOS,
BuildTime: build.Time(),
StartTime: s.startTime,
})
}

func (s *server) consensusNetworkHandler(jc jape.Context) {
Expand Down Expand Up @@ -240,13 +260,65 @@ func (s *server) walletsIDHandlerDELETE(jc jape.Context) {
}
}

func (s *server) resubscribeHandler(jc jape.Context) {
func (s *server) rescanHandlerGET(jc jape.Context) {
index, err := s.wm.Tip()
if jc.Check("couldn't get tip", err) != nil {
return
}

s.scanMu.Lock()
defer s.scanMu.Unlock()
if s.scanInfo.StartTime.IsZero() {
s.scanInfo.StartTime = s.startTime
}
s.scanInfo.Index = index
jc.Encode(s.scanInfo)
}

func (s *server) rescanHandlerPOST(jc jape.Context) {
var height uint64
if jc.Decode(&height) != nil {
return
} else if jc.Check("couldn't subscribe wallet", s.wm.Subscribe(height)) != nil {
}

s.scanMu.Lock()
defer s.scanMu.Unlock()

if s.scanInProgress {
jc.Error(errors.New("scan already in progress"), http.StatusConflict)
return
}

var index types.ChainIndex
if height > 0 {
var ok bool
index, ok = s.cm.BestIndex(height)
if !ok {
jc.Error(errors.New("height not found"), http.StatusNotFound)
return
}
}

s.scanInProgress = true
s.scanInfo = RescanResponse{
StartIndex: index,
Index: index,
StartTime: time.Now(),
Error: nil,
}

go func() {
err := s.wm.Scan(index)

// update the scan state
s.scanMu.Lock()
defer s.scanMu.Unlock()
s.scanInProgress = false
if err != nil {
msg := err.Error()
s.scanInfo.Error = &msg
}
}()
}

func (s *server) walletsAddressHandlerPUT(jc jape.Context) {
Expand Down Expand Up @@ -622,42 +694,47 @@ func (s *server) addressesAddrOutputsSFHandler(jc jape.Context) {
// NewServer returns an HTTP handler that serves the walletd API.
func NewServer(cm ChainManager, s Syncer, wm WalletManager) http.Handler {
srv := server{
startTime: time.Now(),

cm: cm,
s: s,
wm: wm,
used: make(map[types.Hash256]bool),
}
return jape.Mux(map[string]jape.Handler{
"GET /consensus/network": srv.consensusNetworkHandler,
"GET /consensus/tip": srv.consensusTipHandler,
"GET /consensus/tipstate": srv.consensusTipStateHandler,
"GET /state": srv.stateHandler,

"GET /consensus/network": srv.consensusNetworkHandler,
"GET /consensus/tip": srv.consensusTipHandler,
"GET /consensus/tipstate": srv.consensusTipStateHandler,

"GET /syncer/peers": srv.syncerPeersHandler,
"POST /syncer/connect": srv.syncerConnectHandler,
"POST /syncer/broadcast/block": srv.syncerBroadcastBlockHandler,
"GET /syncer/peers": srv.syncerPeersHandler,
"POST /syncer/connect": srv.syncerConnectHandler,
"POST /syncer/broadcast/block": srv.syncerBroadcastBlockHandler,

"GET /txpool/transactions": srv.txpoolTransactionsHandler,
"GET /txpool/fee": srv.txpoolFeeHandler,
"POST /txpool/broadcast": srv.txpoolBroadcastHandler,
"GET /txpool/transactions": srv.txpoolTransactionsHandler,
"GET /txpool/fee": srv.txpoolFeeHandler,
"POST /txpool/broadcast": srv.txpoolBroadcastHandler,

"POST /resubscribe": srv.resubscribeHandler,
"GET /rescan": srv.rescanHandlerGET,
"POST /rescan": srv.rescanHandlerPOST,

"GET /wallets": srv.walletsHandler,
"GET /wallets": srv.walletsHandler,
"POST /wallets": srv.walletsHandlerPOST,
"POST /wallets/:id": srv.walletsIDHandlerPOST,
"DELETE /wallets/:id": srv.walletsIDHandlerDELETE,
"PUT /wallets/:id/addresses": srv.walletsAddressHandlerPUT,
"POST /wallets/:id": srv.walletsIDHandlerPOST,
"DELETE /wallets/:id": srv.walletsIDHandlerDELETE,
"PUT /wallets/:id/addresses": srv.walletsAddressHandlerPUT,
"DELETE /wallets/:id/addresses/:addr": srv.walletsAddressHandlerDELETE,
"GET /wallets/:id/addresses": srv.walletsAddressesHandlerGET,
"GET /wallets/:id/balance": srv.walletsBalanceHandler,
"GET /wallets/:id/events": srv.walletsEventsHandler,
"GET /wallets/:id/txpool": srv.walletsTxpoolHandler,
"GET /wallets/:id/outputs/siacoin": srv.walletsOutputsSiacoinHandler,
"GET /wallets/:id/outputs/siafund": srv.walletsOutputsSiafundHandler,
"POST /wallets/:id/reserve": srv.walletsReserveHandler,
"POST /wallets/:id/release": srv.walletsReleaseHandler,
"POST /wallets/:id/fund": srv.walletsFundHandler,
"POST /wallets/:id/fundsf": srv.walletsFundSFHandler,
"GET /wallets/:id/addresses": srv.walletsAddressesHandlerGET,
"GET /wallets/:id/balance": srv.walletsBalanceHandler,
"GET /wallets/:id/events": srv.walletsEventsHandler,
"GET /wallets/:id/txpool": srv.walletsTxpoolHandler,
"GET /wallets/:id/outputs/siacoin": srv.walletsOutputsSiacoinHandler,
"GET /wallets/:id/outputs/siafund": srv.walletsOutputsSiafundHandler,
"POST /wallets/:id/reserve": srv.walletsReserveHandler,
"POST /wallets/:id/release": srv.walletsReleaseHandler,
"POST /wallets/:id/fund": srv.walletsFundHandler,
"POST /wallets/:id/fundsf": srv.walletsFundSFHandler,

"GET /addresses/:addr/balance": srv.addressesAddrBalanceHandler,
"GET /addresses/:addr/events": srv.addressesAddrEventsHandler,
Expand Down
Loading

0 comments on commit 5dc0f18

Please sign in to comment.