From 8654be1fcce425c3a6ae3c7d38a9719c1bea3176 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 15 Apr 2024 16:59:51 +0200 Subject: [PATCH 1/2] feat: implement delegated PUTs --- client.go | 155 ++++++++---- docs/environment-variables.md | 22 ++ go.mod | 11 +- go.sum | 20 +- main.go | 106 +++++++- server.go | 153 ++++++++---- server_routers.go | 443 +++++++++++++++++++++++++++++++--- server_routers_test.go | 206 ++++++++++++++++ 8 files changed, 975 insertions(+), 141 deletions(-) diff --git a/client.go b/client.go index 89dcaf3..098d9b3 100644 --- a/client.go +++ b/client.go @@ -5,23 +5,43 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "time" "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/client" "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" ) -func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutput bool) error { +type askClient struct { + drc *client.Client + out io.Writer + pretty bool +} + +func newAskClient(endpoint string, prettyOutput bool, out io.Writer) (*askClient, error) { drc, err := client.New(endpoint) if err != nil { - return err + return nil, err } - recordsIter, err := drc.FindProviders(ctx, key) + if out == nil { + out = os.Stdout + } + + return &askClient{ + drc: drc, + pretty: prettyOutput, + out: out, + }, nil +} + +func (a *askClient) findProviders(ctx context.Context, key cid.Cid) error { + recordsIter, err := a.drc.FindProviders(ctx, key) if err != nil { return err } @@ -40,33 +60,24 @@ func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutp return nil } - if prettyOutput { + if a.pretty { switch res.Val.GetSchema() { case types.SchemaPeer: record := res.Val.(*types.PeerRecord) - fmt.Fprintln(os.Stdout, record.ID) - fmt.Fprintln(os.Stdout, "\tProtocols:", record.Protocols) - fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs) - - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - record := res.Val.(*types.BitswapRecord) - fmt.Fprintln(os.Stdout, record.ID) - fmt.Fprintln(os.Stdout, "\tProtocol:", record.Protocol) - fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs) - + fmt.Fprintln(a.out, record.ID) + fmt.Fprintln(a.out, "\tProtocols:", record.Protocols) + fmt.Fprintln(a.out, "\tAddresses:", record.Addrs) default: // This is an unknown schema. Let's just print it raw. - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } } - fmt.Fprintln(os.Stdout) + fmt.Fprintln(a.out) } else { - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } @@ -76,13 +87,56 @@ func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutp return nil } -func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput bool) error { - drc, err := client.New(endpoint) +func (a *askClient) provide(ctx context.Context, records ...*types.AnnouncementRecord) error { + for _, rec := range records { + err := rec.Verify() + if err != nil { + return err + } + } + + recordsIter, err := a.drc.ProvideRecords(ctx, records...) if err != nil { return err } + defer recordsIter.Close() + return a.printProvideResult(recordsIter) +} + +func (a *askClient) printProvideResult(recordsIter iter.ResultIter[*types.AnnouncementResponseRecord]) error { + for recordsIter.Next() { + res := recordsIter.Val() + + // Check for error, but do not complain if we exceeded the timeout. We are + // expecting that to happen: we explicitly defined a timeout. + if res.Err != nil { + if !errors.Is(res.Err, context.DeadlineExceeded) { + return res.Err + } + + return nil + } + + if a.pretty { + if res.Val.Error != "" { + fmt.Fprintf(a.out, "Error: %s", res.Val.Error) + } else { + fmt.Fprintf(a.out, "TTL: %s", res.Val.TTL) + } + fmt.Fprintln(a.out) + } else { + err := json.NewEncoder(a.out).Encode(res.Val) + if err != nil { + return err + } + } + } - recordsIter, err := drc.FindPeers(ctx, pid) + return nil +} + +func (a *askClient) findPeers(ctx context.Context, pid peer.ID) error { + recordsIter, err := a.drc.FindPeers(ctx, pid) if err != nil { return err } @@ -101,13 +155,13 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b return nil } - if prettyOutput { - fmt.Fprintln(os.Stdout, res.Val.ID) - fmt.Fprintln(os.Stdout, "\tProtocols:", res.Val.Protocols) - fmt.Fprintln(os.Stdout, "\tAddresses:", res.Val.Addrs) - fmt.Fprintln(os.Stdout) + if a.pretty { + fmt.Fprintln(a.out, res.Val.ID) + fmt.Fprintln(a.out, "\tProtocols:", res.Val.Protocols) + fmt.Fprintln(a.out, "\tAddresses:", res.Val.Addrs) + fmt.Fprintln(a.out) } else { - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } @@ -117,18 +171,30 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b return nil } -func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput bool) error { - drc, err := client.New(endpoint) +func (a *askClient) providePeer(ctx context.Context, records ...*types.AnnouncementRecord) error { + for _, rec := range records { + err := rec.Verify() + if err != nil { + return err + } + } + + recordsIter, err := a.drc.ProvidePeerRecords(ctx, records...) if err != nil { return err } + defer recordsIter.Close() - rec, err := drc.GetIPNS(ctx, name) + return a.printProvideResult(recordsIter) +} + +func (a *askClient) getIPNS(ctx context.Context, name ipns.Name) error { + rec, err := a.drc.GetIPNS(ctx, name) if err != nil { return err } - if prettyOutput { + if a.pretty { v, err := rec.Value() if err != nil { return err @@ -144,19 +210,19 @@ func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput return err } - fmt.Printf("/ipns/%s\n", name) + fmt.Fprintf(a.out, "/ipns/%s\n", name) // Since [client.Client.GetIPNS] verifies if the retrieved record is valid, we // do not need to verify it again. However, if you were not using this specific // client, but using some other tool, you should always validate the IPNS Record // using the [ipns.Validate] or [ipns.ValidateWithName] functions. - fmt.Println("\tSignature Validated") - fmt.Println("\tValue:", v.String()) - fmt.Println("\tSequence:", seq) - fmt.Println("\tValidityType : EOL/End-of-Life") - fmt.Println("\tValidity:", eol.Format(time.RFC3339)) + fmt.Fprintln(a.out, "\tSignature Validated") + fmt.Fprintln(a.out, "\tValue:", v.String()) + fmt.Fprintln(a.out, "\tSequence:", seq) + fmt.Fprintln(a.out, "\tValidityType : EOL/End-of-Life") + fmt.Fprintln(a.out, "\tValidity:", eol.Format(time.RFC3339)) if ttl, err := rec.TTL(); err == nil { - fmt.Println("\tTTL:", ttl.String()) + fmt.Fprintln(a.out, "\tTTL:", ttl.String()) } return nil @@ -167,20 +233,15 @@ func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput return err } - _, err = os.Stdout.Write(raw) + _, err = a.out.Write(raw) return err } -func putIPNS(ctx context.Context, name ipns.Name, record []byte, endpoint string) error { - drc, err := client.New(endpoint) - if err != nil { - return err - } - +func (a *askClient) putIPNS(ctx context.Context, name ipns.Name, record []byte) error { rec, err := ipns.UnmarshalRecord(record) if err != nil { return err } - return drc.PutIPNS(ctx, name, rec) + return a.drc.PutIPNS(ctx, name, rec) } diff --git a/docs/environment-variables.md b/docs/environment-variables.md index f4de147..94c87bb 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -5,6 +5,8 @@ - [Configuration](#configuration) - [`SOMEGUY_LISTEN_ADDRESS`](#someguy_listen_address) - [`SOMEGUY_ACCELERATED_DHT`](#someguy_accelerated_dht) + - [`SOMEGUY_PUT_ENABLED`](#someguy_put_enabled) + - [`SOMEGUY_DATADIR`](#someguy_datadir) - [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints) - [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints) - [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints) @@ -28,6 +30,26 @@ Whether or not the Accelerated DHT is enabled or not. Default: `true` +### `SOMEGUY_PUT_ENABLED` + +Whether or not to accept Delegated Routing V1 PUT requests. Affects all PUT requests: +provider records, peer records and IPNS records. + +By default, PUT requests are ignored. Therefore, they will neither be stored locally, +nor sent to other remote endpoints. + +Default: `false` + +### `SOMEGUY_DATADIR` + +Used in conjunction with [`SOMEGUY_PUT_ENABLED`](#someguy_put_enabled). + +The LevelDB data directory to persist PUT records. When receiving PUT requests, +the records will be stored in this database. The database is queried for GET +requests. + +Default: none + ### `SOMEGUY_PROVIDER_ENDPOINTS` Comma-separated list of other Delegated Routing V1 endpoints to proxy provider requests to. diff --git a/go.mod b/go.mod index 1e16f87..a2de133 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.21 require ( github.com/CAFxX/httpcompression v0.0.9 github.com/felixge/httpsnoop v1.0.4 - github.com/ipfs/boxo v0.19.1-0.20240415103851-7f9506844904 + github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979 github.com/ipfs/go-cid v0.4.1 + github.com/ipfs/go-datastore v0.6.0 + github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.33.2 github.com/libp2p/go-libp2p-kad-dht v0.25.2 @@ -16,6 +18,7 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/prometheus/client_golang v1.19.0 github.com/rs/cors v1.10.1 + github.com/samber/lo v1.39.0 github.com/slok/go-http-metrics v0.11.0 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.1 @@ -23,7 +26,7 @@ require ( require ( github.com/Jorropo/jsync v1.0.1 // indirect - github.com/andybalholm/brotli v1.1.0 // indirect + github.com/andybalholm/brotli v1.0.6 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -42,6 +45,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd // indirect github.com/google/uuid v1.6.0 // indirect @@ -51,7 +55,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/huin/goupnp v1.3.0 // indirect - github.com/ipfs/go-datastore v0.6.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -101,9 +104,9 @@ require ( github.com/quic-go/webtransport-go v0.7.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/samber/lo v1.39.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect go.opencensus.io v0.24.0 // indirect diff --git a/go.sum b/go.sum index 60f31ff..2d1e36e 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU= github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -135,6 +135,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -182,23 +184,27 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/boxo v0.19.1-0.20240415103851-7f9506844904 h1:HqjqN6oADXh1UNw8xKnBP50B3ZQDC/RStiBFPp6W+9Y= -github.com/ipfs/boxo v0.19.1-0.20240415103851-7f9506844904/go.mod h1:hA9Ou/YnfMZOG2nQhngsbBiYt6fiJ1EhWSmccZfV+M0= +github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979 h1:qRGh84ADfa29Lb285zFd0DbfBy0W50HfsA7cCo0EPnk= +github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979/go.mod h1:hA9Ou/YnfMZOG2nQhngsbBiYt6fiJ1EhWSmccZfV+M0= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= +github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk= github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= +github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= +github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= @@ -244,6 +250,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -370,6 +377,7 @@ github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOEL github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo/v2 v2.17.1 h1:V++EzdbhI4ZV4ev0UTIj0PzhzOcReJFyJaLjtSF55M8= github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= @@ -489,6 +497,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -714,13 +723,16 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 3315803..a03f601 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,13 @@ package main import ( + "encoding/json" "errors" "log" "os" "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multibase" @@ -35,6 +37,18 @@ func main() { EnvVars: []string{"SOMEGUY_ACCELERATED_DHT"}, Usage: "run the accelerated DHT client", }, + &cli.BoolFlag{ + Name: "put-enabled", + Value: false, + EnvVars: []string{"SOMEGUY_PUT_ENABLED"}, + Usage: "enables HTTP PUT endpoints", + }, + &cli.StringFlag{ + Name: "datadir", + Value: "", + EnvVars: []string{"SOMEGUY_DATADIR"}, + Usage: "directory for persistent data", + }, &cli.StringSliceFlag{ Name: "provider-endpoints", Value: cli.NewStringSlice(cidContactEndpoint), @@ -55,7 +69,17 @@ func main() { }, }, Action: func(ctx *cli.Context) error { - return start(ctx.Context, ctx.String("listen-address"), ctx.Bool("accelerated-dht"), ctx.StringSlice("provider-endpoints"), ctx.StringSlice("peer-endpoints"), ctx.StringSlice("ipns-endpoints")) + options := &serverOptions{ + listenAddress: ctx.String("listen-address"), + acceleratedDHT: ctx.Bool("accelerated-dht"), + putEnabled: ctx.Bool("put-enabled"), + contentEndpoints: ctx.StringSlice("provider-endpoints"), + peerEndpoints: ctx.StringSlice("peer-endpoints"), + ipnsEndpoints: ctx.StringSlice("ipns-endpoints"), + dataDirectory: ctx.String("datadir"), + } + + return startServer(ctx.Context, options) }, }, { @@ -86,7 +110,39 @@ func main() { if err != nil { return err } - return findProviders(ctx.Context, c, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.findProviders(ctx.Context, c) + }, + }, + { + Name: "provide", + Usage: "provide ", + UsageText: "Provide a one or multiple announcement records to the network", + Action: func(ctx *cli.Context) error { + if ctx.NArg() < 1 { + return errors.New("invalid command, see help") + } + records := []*types.AnnouncementRecord{} + for _, recordPath := range ctx.Args().Slice() { + recordData, err := os.ReadFile(recordPath) + if err != nil { + return err + } + var record *types.AnnouncementRecord + err = json.Unmarshal(recordData, &record) + if err != nil { + return err + } + records = append(records, record) + } + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.provide(ctx.Context, records...) }, }, { @@ -102,7 +158,39 @@ func main() { if err != nil { return err } - return findPeers(ctx.Context, pid, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.findPeers(ctx.Context, pid) + }, + }, + { + Name: "providepeers", + Usage: "providepeers ", + UsageText: "Provide a one or multiple peer announcement records to the network", + Action: func(ctx *cli.Context) error { + if ctx.NArg() < 1 { + return errors.New("invalid command, see help") + } + records := []*types.AnnouncementRecord{} + for _, recordPath := range ctx.Args().Slice() { + recordData, err := os.ReadFile(recordPath) + if err != nil { + return err + } + var record *types.AnnouncementRecord + err = json.Unmarshal(recordData, &record) + if err != nil { + return err + } + records = append(records, record) + } + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.providePeer(ctx.Context, records...) }, }, { @@ -118,7 +206,11 @@ func main() { if err != nil { return err } - return getIPNS(ctx.Context, name, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.getIPNS(ctx.Context, name) }, }, { @@ -140,7 +232,11 @@ func main() { if err != nil { return err } - return putIPNS(ctx.Context, name, recBytes, ctx.String("endpoint")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.putIPNS(ctx.Context, name, recBytes) }, }, }, diff --git a/server.go b/server.go index c16bb20..e4476b8 100644 --- a/server.go +++ b/server.go @@ -2,10 +2,15 @@ package main import ( "context" + "errors" "fmt" "log" "net" "net/http" + "os" + "os/signal" + "sync" + "syscall" "github.com/CAFxX/httpcompression" "github.com/felixge/httpsnoop" @@ -33,60 +38,35 @@ func withRequestLogger(next http.Handler) http.Handler { }) } -func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bool, contentEndpoints, peerEndpoints, ipnsEndpoints []string) error { - h, err := newHost(runAcceleratedDHTClient) - if err != nil { - return err - } - - var dhtRouting routing.Routing - if runAcceleratedDHTClient { - wrappedDHT, err := newBundledDHT(ctx, h) - if err != nil { - return err - } - dhtRouting = wrappedDHT - } else { - standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) - if err != nil { - return err - } - dhtRouting = standardDHT - } - - crRouters, err := getCombinedRouting(contentEndpoints, dhtRouting) - if err != nil { - return err - } - - prRouters, err := getCombinedRouting(peerEndpoints, dhtRouting) - if err != nil { - return err - } +type serverOptions struct { + listenAddress string + acceleratedDHT bool + putEnabled bool + contentEndpoints []string + peerEndpoints []string + ipnsEndpoints []string + dataDirectory string +} - ipnsRouters, err := getCombinedRouting(ipnsEndpoints, dhtRouting) +func startServer(ctx context.Context, options *serverOptions) error { + router, err := newRouter(ctx, options) if err != nil { return err } - _, port, err := net.SplitHostPort(listenAddress) + _, port, err := net.SplitHostPort(options.listenAddress) if err != nil { return err } - log.Printf("Starting %s %s\n", name, version) - log.Printf("Listening on %s", listenAddress) + log.Printf("Listening on %s", options.listenAddress) log.Printf("Delegated Routing API on http://127.0.0.1:%s/routing/v1", port) mdlw := middleware.New(middleware.Config{ Recorder: metrics.NewRecorder(metrics.Config{Prefix: "someguy"}), }) - handler := server.Handler(&composableRouter{ - providers: crRouters, - peers: prRouters, - ipns: ipnsRouters, - }) + handler := server.Handler(router) // Add CORS. handler = cors.New(cors.Options{ @@ -115,8 +95,91 @@ func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bo }) http.Handle("/", handler) - server := &http.Server{Addr: listenAddress, Handler: nil} - return server.ListenAndServe() + server := &http.Server{Addr: options.listenAddress, Handler: nil} + + quit := make(chan os.Signal, 2) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + err := server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + fmt.Fprintf(os.Stderr, "Failed to start gateway: %s\n", err) + quit <- os.Interrupt + } + }() + + signal.Notify( + quit, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGHUP, + ) + <-quit + go server.Close() + go router.Close() + + wg.Wait() + + return nil +} + +func newRouter(ctx context.Context, options *serverOptions) (router, error) { + h, err := newHost(options.acceleratedDHT) + if err != nil { + return nil, err + } + + var dhtRouting routing.Routing + if options.acceleratedDHT { + wrappedDHT, err := newBundledDHT(ctx, h) + if err != nil { + return nil, err + } + dhtRouting = wrappedDHT + } else { + standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) + if err != nil { + return nil, err + } + dhtRouting = standardDHT + } + + crRouters, err := getCombinedRouting(options.contentEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + prRouters, err := getCombinedRouting(options.peerEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + ipnsRouters, err := getCombinedRouting(options.ipnsEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + remoteRouter := &composableRouter{ + providers: crRouters, + peers: prRouters, + ipns: ipnsRouters, + } + + if options.dataDirectory == "" { + return remoteRouter, nil + } + + localRouter, err := newLocalRouter(options.dataDirectory) + if err != nil { + return nil, err + } + + return ¶llelRouter{ + routers: []router{localRouter, remoteRouter}, + }, nil } func newHost(highOutboundLimits bool) (host.Host, error) { @@ -153,9 +216,9 @@ func newHost(highOutboundLimits bool) (host.Host, error) { return h, nil } -func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) { +func getCombinedRouting(endpoints []string, dht routing.Routing, putEnabled bool) (router, error) { if len(endpoints) == 0 { - return libp2pRouter{routing: dht}, nil + return libp2pRouter{routing: dht, putEnabled: putEnabled}, nil } var routers []router @@ -165,10 +228,10 @@ func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) if err != nil { return nil, err } - routers = append(routers, clientRouter{Client: drclient}) + routers = append(routers, clientRouter{client: drclient, putEnabled: putEnabled}) } return sanitizeRouter{parallelRouter{ - routers: append(routers, libp2pRouter{routing: dht}), + routers: append(routers, libp2pRouter{routing: dht, putEnabled: putEnabled}), }}, nil } diff --git a/server_routers.go b/server_routers.go index 33430a9..579e742 100644 --- a/server_routers.go +++ b/server_routers.go @@ -2,7 +2,11 @@ package main import ( "context" + "encoding/json" "errors" + "io" + "math" + "path/filepath" "reflect" "sync" "time" @@ -13,28 +17,37 @@ import ( "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + leveldb "github.com/ipfs/go-ds-leveldb" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" manet "github.com/multiformats/go-multiaddr/net" + "github.com/samber/lo" ) type router interface { providersRouter peersRouter ipnsRouter + io.Closer } type providersRouter interface { FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) + Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) + io.Closer } type peersRouter interface { FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) + ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) + io.Closer } type ipnsRouter interface { GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error + io.Closer } var _ server.ContentRouter = composableRouter{} @@ -52,6 +65,13 @@ func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit return r.providers.FindProviders(ctx, key, limit) } +func (r composableRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if r.providers == nil { + return 0, nil + } + return r.providers.Provide(ctx, req) +} + func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { if r.peers == nil { return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil @@ -59,6 +79,13 @@ func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) return r.peers.FindPeers(ctx, pid, limit) } +func (r composableRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if r.peers == nil { + return 0, nil + } + return r.peers.ProvidePeer(ctx, req) +} + func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { if r.ipns == nil { return nil, routing.ErrNotFound @@ -73,9 +100,22 @@ func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *i return r.ipns.PutIPNS(ctx, name, record) } -//lint:ignore SA1019 // ignore staticcheck -func (r composableRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported +func (r composableRouter) Close() error { + var err error + + if r.providers != nil { + err = errors.Join(err, r.providers.Close()) + } + + if r.peers != nil { + err = errors.Join(err, r.peers.Close()) + } + + if r.ipns != nil { + err = errors.Join(err, r.ipns.Close()) + } + + return err } var _ server.ContentRouter = parallelRouter{} @@ -208,6 +248,57 @@ func (mi *manyIter[T]) Close() error { return err } +func (r parallelRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) { + return r.Provide(ctx, req) + }) +} + +func (r parallelRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) { + return r.ProvidePeer(ctx, req) + }) +} + +func provide(ctx context.Context, routers []router, call func(context.Context, router) (time.Duration, error)) (time.Duration, error) { + switch len(routers) { + case 0: + return 0, nil + case 1: + return call(ctx, routers[0]) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + resultsTTL := make([]time.Duration, len(routers)) + resultsErr := make([]error, len(routers)) + wg.Add(len(routers)) + for i, ri := range routers { + go func(ri router, i int) { + resultsTTL[i], resultsErr[i] = call(ctx, ri) + wg.Done() + }(ri, i) + } + wg.Wait() + + var err error + for _, e := range resultsErr { + err = errors.Join(err, e) + } + + // Choose lowest TTL to return. + var ttl time.Duration = math.MaxInt64 + for _, t := range resultsTTL { + if t < ttl { + ttl = t + } + } + + return ttl, err +} + func (r parallelRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { switch len(r.routers) { case 0: @@ -298,31 +389,42 @@ func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipn return errs } -//lint:ignore SA1019 // ignore staticcheck -func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported +func (r parallelRouter) Close() error { + var err error + + for _, r := range r.routers { + err = errors.Join(err, r.Close()) + } + + return err } var _ router = libp2pRouter{} type libp2pRouter struct { - routing routing.Routing + putEnabled bool + routing routing.Routing } -func (d libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { +func (r libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { ctx, cancel := context.WithCancel(ctx) - ch := d.routing.FindProvidersAsync(ctx, key, limit) + ch := r.routing.FindProvidersAsync(ctx, key, limit) return iter.ToResultIter(&peerChanIter{ ch: ch, cancel: cancel, }), nil } -func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { +func (r libp2pRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // NOTE: the libp2p router cannot provide records further into the DHT. + return 0, routing.ErrNotSupported +} + +func (r libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - addr, err := d.routing.FindPeer(ctx, pid) + addr, err := r.routing.FindPeer(ctx, pid) if err != nil { return nil, err } @@ -339,11 +441,16 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{rec})), nil } -func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { +func (r libp2pRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // NOTE: the libp2p router cannot provide peers further into the DHT. + return 0, routing.ErrNotSupported +} + +func (r libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - raw, err := d.routing.GetValue(ctx, string(name.RoutingKey())) + raw, err := r.routing.GetValue(ctx, string(name.RoutingKey())) if err != nil { return nil, err } @@ -351,7 +458,11 @@ func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record return ipns.UnmarshalRecord(raw) } -func (d libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { +func (r libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + if !r.putEnabled { + return routing.ErrNotSupported + } + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -360,7 +471,11 @@ func (d libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns. return err } - return d.routing.PutValue(ctx, string(name.RoutingKey()), raw) + return r.routing.PutValue(ctx, string(name.RoutingKey()), raw) +} + +func (r libp2pRouter) Close() error { + return nil } type peerChanIter struct { @@ -404,15 +519,287 @@ func (it *peerChanIter) Close() error { var _ router = clientRouter{} type clientRouter struct { - *client.Client + putEnabled bool + client *client.Client +} + +func (r clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return r.client.FindProviders(ctx, cid) +} + +func (r clientRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if !r.putEnabled { + return 0, routing.ErrNotSupported + } + + return r.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return r.client.ProvideRecords(ctx, req) + }) +} + +func (r clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + return r.client.FindPeers(ctx, pid) +} + +func (r clientRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if !r.putEnabled { + return 0, routing.ErrNotSupported + } + + return r.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return r.client.ProvidePeerRecords(ctx, req) + }) +} + +func (r clientRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + return r.client.GetIPNS(ctx, name) +} + +func (r clientRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + if !r.putEnabled { + return routing.ErrNotSupported + } + + return r.client.PutIPNS(ctx, name, record) +} + +func (r clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementResponseRecord], error)) (time.Duration, error) { + resultsIter, err := do() + if err != nil { + return 0, err + } + defer resultsIter.Close() + + records, err := iter.ReadAllResults(resultsIter) + if err != nil { + return 0, err + } + + if len(records) != 1 { + return 0, errors.New("invalid number of records returned") + } + + return records[0].TTL, nil +} + +func (r clientRouter) Close() error { + return nil +} + +var _ router = localRouter{} + +type localRouter struct { + datastore datastore.Batching +} + +func providersKey(cid cid.Cid) datastore.Key { + return datastore.KeyWithNamespaces([]string{"providers", cid.String()}) +} + +func peersKey(pid peer.ID) datastore.Key { + return datastore.KeyWithNamespaces([]string{"peers", pid.String()}) +} + +func ipnsKey(name ipns.Name) datastore.Key { + return datastore.NewKey("ipns-" + name.String()) +} + +func newLocalRouter(datadir string) (*localRouter, error) { + ds, err := leveldb.NewDatastore(filepath.Join(datadir, "leveldb"), nil) + if err != nil { + return nil, err + } + + return &localRouter{ + datastore: ds, + }, nil +} + +func (r localRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + raw, err := r.datastore.Get(ctx, providersKey(cid)) + if errors.Is(err, datastore.ErrNotFound) { + return iter.ToResultIter(iter.FromSlice([]types.Record{})), nil + } else if err != nil { + return nil, err + } + + var peerRecords []*types.PeerRecord + err = json.Unmarshal(raw, &peerRecords) + if err != nil { + return nil, err + } + + var records []types.Record + for _, r := range peerRecords { + records = append(records, r) + } + + return iter.ToResultIter(iter.FromSlice(records)), nil +} + +// Note: we don't verify the record since that is already done by the caller, +// i.e., Boxo's implementation of the server. This also facilitates our tests. +func (r localRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + key := providersKey(req.Payload.CID) + + var records []types.PeerRecord + + raw, err := r.datastore.Get(ctx, key) + if errors.Is(err, datastore.ErrNotFound) { + // Nothing + } else if err != nil { + return 0, err + } else { + err = json.Unmarshal(raw, &records) + if err != nil { + return 0, err + } + } + + // NOTE: this is a very naive storage. We just transform the announcement + // record into a peer record and append it to the list. This will be returned + // when FindPeers is called. + records = append(records, types.PeerRecord{ + Schema: types.SchemaPeer, + ID: req.Payload.ID, + Addrs: req.Payload.Addrs, + Protocols: req.Payload.Protocols, + }) + + raw, err = json.Marshal(records) + if err != nil { + return 0, err + } + + return req.Payload.TTL, r.datastore.Put(ctx, key, raw) } -func (d clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { - return d.Client.FindProviders(ctx, cid) +func (r localRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + raw, err := r.datastore.Get(ctx, peersKey(pid)) + if errors.Is(err, datastore.ErrNotFound) { + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil + } else if err != nil { + return nil, err + } + + var record *types.PeerRecord + err = json.Unmarshal(raw, &record) + if err != nil { + return nil, err + } + + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{record})), nil } -func (d clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { - return d.Client.FindPeers(ctx, pid) +// Note: we don't verify the record since that is already done by the caller, +// i.e., Boxo's implementation of the server. This also facilitates our tests. +func (r localRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + key := peersKey(*req.Payload.ID) + + // Make a [types.PeerRecord] based on the given [types.AnnouncementRecord]. + record := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: req.Payload.ID, + Addrs: req.Payload.Addrs, + Protocols: req.Payload.Protocols, + } + + raw, err := r.datastore.Get(ctx, key) + if errors.Is(err, datastore.ErrNotFound) { + // Nothing + } else if err != nil { + return 0, err + } else { + // If we already had a record for the same peer, merge them together. + var oldRecord *types.PeerRecord + err = json.Unmarshal(raw, &oldRecord) + if err != nil { + return 0, err + } + + record.Addrs = lo.Uniq(append(record.Addrs, oldRecord.Addrs...)) + record.Protocols = lo.Uniq(append(record.Protocols, oldRecord.Protocols...)) + } + + raw, err = json.Marshal(record) + if err != nil { + return 0, err + } + + return req.Payload.TTL, r.datastore.Put(ctx, key, raw) +} + +func (r localRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + raw, err := r.datastore.Get(ctx, ipnsKey(name)) + if errors.Is(err, datastore.ErrNotFound) { + return nil, routing.ErrNotFound + } + if err != nil { + return nil, err + } + + return ipns.UnmarshalRecord(raw) +} + +func (r localRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + shouldStore, err := r.shouldStoreNewIPNS(ctx, name, record) + if err != nil { + return err + } + + if !shouldStore { + return nil + } + + data, err := ipns.MarshalRecord(record) + if err != nil { + return err + } + + return r.datastore.Put(ctx, ipnsKey(name), data) +} + +func (r localRouter) shouldStoreNewIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) (bool, error) { + raw, err := r.datastore.Get(ctx, ipnsKey(name)) + if errors.Is(err, datastore.ErrNotFound) { + return true, nil + } + + if err != nil { + return false, err + } + + oldRecord, err := ipns.UnmarshalRecord(raw) + if err != nil { + return false, err + } + + oldSequence, err := oldRecord.Sequence() + if err != nil { + return false, err + } + + oldValidity, err := oldRecord.Validity() + if err != nil { + return false, err + } + + sequence, err := record.Sequence() + if err != nil { + return false, err + } + + validity, err := record.Validity() + if err != nil { + return false, err + } + + // Only store new record if sequence is higher or the validity is higher. + return sequence > oldSequence || validity.After(oldValidity), nil +} + +func (r localRouter) Close() error { + return r.datastore.Close() } var _ server.ContentRouter = sanitizeRouter{} @@ -443,17 +830,6 @@ func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit in result.Addrs = filterPrivateMultiaddr(result.Addrs) v.Val = result - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - result, ok := v.Val.(*types.BitswapRecord) - if !ok { - logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String()) - return v - } - - result.Addrs = filterPrivateMultiaddr(result.Addrs) - v.Val = result } return v @@ -476,11 +852,6 @@ func (r sanitizeRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) ( }), nil } -//lint:ignore SA1019 // ignore staticcheck -func (r sanitizeRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported -} - func filterPrivateMultiaddr(a []types.Multiaddr) []types.Multiaddr { b := make([]types.Multiaddr, 0, len(a)) diff --git a/server_routers_test.go b/server_routers_test.go index e5669f9..2c3f31d 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -33,6 +33,11 @@ func (m *mockRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } +func (m *mockRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid, limit) if arg0 := args.Get(0); arg0 == nil { @@ -41,6 +46,11 @@ func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (ite return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) if arg0 := args.Get(0); arg0 == nil { @@ -54,6 +64,10 @@ func (m *mockRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.R return args.Error(0) } +func (m *mockRouter) Close() error { + return nil +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) @@ -690,3 +704,195 @@ func TestManyIter(t *testing.T) { require.NoError(t, manyIter.Close()) }) } + +func equalRecords(t *testing.T, a, b *ipns.Record) { + aValue, err := a.Value() + require.NoError(t, err) + + aSequence, err := a.Sequence() + require.NoError(t, err) + + aTTL, err := a.TTL() + require.NoError(t, err) + + aValidity, err := a.Validity() + require.NoError(t, err) + + bValue, err := b.Value() + require.NoError(t, err) + + bSequence, err := b.Sequence() + require.NoError(t, err) + + bTTL, err := b.TTL() + require.NoError(t, err) + + bValidity, err := b.Validity() + require.NoError(t, err) + + require.Equal(t, aValue, bValue) + require.Equal(t, aSequence, bSequence) + require.Equal(t, aTTL, bTTL) + require.Equal(t, aValidity, bValidity) +} + +func TestLocalRouter(t *testing.T) { + t.Parallel() + + t.Run("Providers", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + defer r.Close() + + _, name := makeName(t) + pid := name.Peer() + cid := makeCID() + + t.Run("Store and Retrieve Record for CID", func(t *testing.T) { + resultsIter, err := r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 0) + + _, err = r.Provide(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + CID: cid, + ID: &pid, + Protocols: []string{"a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err = r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err = iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("Provide New Record", func(t *testing.T) { + _, err = r.Provide(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + CID: cid, + Protocols: []string{"b", "a", "a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err := r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 2) + }) + }) + + t.Run("Peers", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + defer r.Close() + + _, name := makeName(t) + pid := name.Peer() + + t.Run("Store and Retrieve Peer", func(t *testing.T) { + resultsIter, err := r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 0) + + _, err = r.ProvidePeer(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + Protocols: []string{"a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err = r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err = iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, []string{"a"}, results[0].Protocols) + }) + + t.Run("Provide Updated Information", func(t *testing.T) { + _, err = r.ProvidePeer(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + Protocols: []string{"b", "a", "a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err := r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, []string{"b", "a"}, results[0].Protocols) + }) + }) + + t.Run("IPNS", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + defer r.Close() + + sk, name := makeName(t) + rec, _ := makeIPNSRecord(t, sk) + + time.Sleep(time.Millisecond * 100) + newerRecord, _ := makeIPNSRecord(t, sk) + + t.Run("Store and Retrieve IPNS Record", func(t *testing.T) { + _, err = r.GetIPNS(context.Background(), name) + require.ErrorIs(t, err, routing.ErrNotFound) + + err = r.PutIPNS(context.Background(), name, rec) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, rec, storedRecord) + }) + + t.Run("Should Replace With Newer Record", func(t *testing.T) { + err = r.PutIPNS(context.Background(), name, newerRecord) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, newerRecord, storedRecord) + }) + + t.Run("Should Not Replace With Older Record", func(t *testing.T) { + err = r.PutIPNS(context.Background(), name, rec) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, newerRecord, storedRecord) + }) + }) + +} From f290ca23876ce1b129cac6e31093051968eba12b Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 7 May 2024 15:42:44 +0200 Subject: [PATCH 2/2] chore: update boxo --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a2de133..7509bed 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/CAFxX/httpcompression v0.0.9 github.com/felixge/httpsnoop v1.0.4 - github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979 + github.com/ipfs/boxo v0.19.1-0.20240507133722-cc217db63f3b github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-leveldb v0.5.0 diff --git a/go.sum b/go.sum index 2d1e36e..48829ec 100644 --- a/go.sum +++ b/go.sum @@ -189,8 +189,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979 h1:qRGh84ADfa29Lb285zFd0DbfBy0W50HfsA7cCo0EPnk= -github.com/ipfs/boxo v0.19.1-0.20240415145225-0915caffc979/go.mod h1:hA9Ou/YnfMZOG2nQhngsbBiYt6fiJ1EhWSmccZfV+M0= +github.com/ipfs/boxo v0.19.1-0.20240507133722-cc217db63f3b h1:TxVHVwp5RNl+70bqRM5iRFIn+1Pf98CQo0QYxOgyVdg= +github.com/ipfs/boxo v0.19.1-0.20240507133722-cc217db63f3b/go.mod h1:hA9Ou/YnfMZOG2nQhngsbBiYt6fiJ1EhWSmccZfV+M0= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=