diff --git a/client.go b/client.go index 89dcaf3..098d9b3 100644 --- a/client.go +++ b/client.go @@ -5,23 +5,43 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "time" "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/client" "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" ) -func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutput bool) error { +type askClient struct { + drc *client.Client + out io.Writer + pretty bool +} + +func newAskClient(endpoint string, prettyOutput bool, out io.Writer) (*askClient, error) { drc, err := client.New(endpoint) if err != nil { - return err + return nil, err } - recordsIter, err := drc.FindProviders(ctx, key) + if out == nil { + out = os.Stdout + } + + return &askClient{ + drc: drc, + pretty: prettyOutput, + out: out, + }, nil +} + +func (a *askClient) findProviders(ctx context.Context, key cid.Cid) error { + recordsIter, err := a.drc.FindProviders(ctx, key) if err != nil { return err } @@ -40,33 +60,24 @@ func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutp return nil } - if prettyOutput { + if a.pretty { switch res.Val.GetSchema() { case types.SchemaPeer: record := res.Val.(*types.PeerRecord) - fmt.Fprintln(os.Stdout, record.ID) - fmt.Fprintln(os.Stdout, "\tProtocols:", record.Protocols) - fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs) - - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - record := res.Val.(*types.BitswapRecord) - fmt.Fprintln(os.Stdout, record.ID) - fmt.Fprintln(os.Stdout, "\tProtocol:", record.Protocol) - fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs) - + fmt.Fprintln(a.out, record.ID) + fmt.Fprintln(a.out, "\tProtocols:", record.Protocols) + fmt.Fprintln(a.out, "\tAddresses:", record.Addrs) default: // This is an unknown schema. Let's just print it raw. - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } } - fmt.Fprintln(os.Stdout) + fmt.Fprintln(a.out) } else { - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } @@ -76,13 +87,56 @@ func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutp return nil } -func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput bool) error { - drc, err := client.New(endpoint) +func (a *askClient) provide(ctx context.Context, records ...*types.AnnouncementRecord) error { + for _, rec := range records { + err := rec.Verify() + if err != nil { + return err + } + } + + recordsIter, err := a.drc.ProvideRecords(ctx, records...) if err != nil { return err } + defer recordsIter.Close() + return a.printProvideResult(recordsIter) +} + +func (a *askClient) printProvideResult(recordsIter iter.ResultIter[*types.AnnouncementResponseRecord]) error { + for recordsIter.Next() { + res := recordsIter.Val() + + // Check for error, but do not complain if we exceeded the timeout. We are + // expecting that to happen: we explicitly defined a timeout. + if res.Err != nil { + if !errors.Is(res.Err, context.DeadlineExceeded) { + return res.Err + } + + return nil + } + + if a.pretty { + if res.Val.Error != "" { + fmt.Fprintf(a.out, "Error: %s", res.Val.Error) + } else { + fmt.Fprintf(a.out, "TTL: %s", res.Val.TTL) + } + fmt.Fprintln(a.out) + } else { + err := json.NewEncoder(a.out).Encode(res.Val) + if err != nil { + return err + } + } + } - recordsIter, err := drc.FindPeers(ctx, pid) + return nil +} + +func (a *askClient) findPeers(ctx context.Context, pid peer.ID) error { + recordsIter, err := a.drc.FindPeers(ctx, pid) if err != nil { return err } @@ -101,13 +155,13 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b return nil } - if prettyOutput { - fmt.Fprintln(os.Stdout, res.Val.ID) - fmt.Fprintln(os.Stdout, "\tProtocols:", res.Val.Protocols) - fmt.Fprintln(os.Stdout, "\tAddresses:", res.Val.Addrs) - fmt.Fprintln(os.Stdout) + if a.pretty { + fmt.Fprintln(a.out, res.Val.ID) + fmt.Fprintln(a.out, "\tProtocols:", res.Val.Protocols) + fmt.Fprintln(a.out, "\tAddresses:", res.Val.Addrs) + fmt.Fprintln(a.out) } else { - err := json.NewEncoder(os.Stdout).Encode(res.Val) + err := json.NewEncoder(a.out).Encode(res.Val) if err != nil { return err } @@ -117,18 +171,30 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b return nil } -func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput bool) error { - drc, err := client.New(endpoint) +func (a *askClient) providePeer(ctx context.Context, records ...*types.AnnouncementRecord) error { + for _, rec := range records { + err := rec.Verify() + if err != nil { + return err + } + } + + recordsIter, err := a.drc.ProvidePeerRecords(ctx, records...) if err != nil { return err } + defer recordsIter.Close() - rec, err := drc.GetIPNS(ctx, name) + return a.printProvideResult(recordsIter) +} + +func (a *askClient) getIPNS(ctx context.Context, name ipns.Name) error { + rec, err := a.drc.GetIPNS(ctx, name) if err != nil { return err } - if prettyOutput { + if a.pretty { v, err := rec.Value() if err != nil { return err @@ -144,19 +210,19 @@ func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput return err } - fmt.Printf("/ipns/%s\n", name) + fmt.Fprintf(a.out, "/ipns/%s\n", name) // Since [client.Client.GetIPNS] verifies if the retrieved record is valid, we // do not need to verify it again. However, if you were not using this specific // client, but using some other tool, you should always validate the IPNS Record // using the [ipns.Validate] or [ipns.ValidateWithName] functions. - fmt.Println("\tSignature Validated") - fmt.Println("\tValue:", v.String()) - fmt.Println("\tSequence:", seq) - fmt.Println("\tValidityType : EOL/End-of-Life") - fmt.Println("\tValidity:", eol.Format(time.RFC3339)) + fmt.Fprintln(a.out, "\tSignature Validated") + fmt.Fprintln(a.out, "\tValue:", v.String()) + fmt.Fprintln(a.out, "\tSequence:", seq) + fmt.Fprintln(a.out, "\tValidityType : EOL/End-of-Life") + fmt.Fprintln(a.out, "\tValidity:", eol.Format(time.RFC3339)) if ttl, err := rec.TTL(); err == nil { - fmt.Println("\tTTL:", ttl.String()) + fmt.Fprintln(a.out, "\tTTL:", ttl.String()) } return nil @@ -167,20 +233,15 @@ func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput return err } - _, err = os.Stdout.Write(raw) + _, err = a.out.Write(raw) return err } -func putIPNS(ctx context.Context, name ipns.Name, record []byte, endpoint string) error { - drc, err := client.New(endpoint) - if err != nil { - return err - } - +func (a *askClient) putIPNS(ctx context.Context, name ipns.Name, record []byte) error { rec, err := ipns.UnmarshalRecord(record) if err != nil { return err } - return drc.PutIPNS(ctx, name, rec) + return a.drc.PutIPNS(ctx, name, rec) } diff --git a/go.mod b/go.mod index ad05f3f..1c0fb4e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/CAFxX/httpcompression v0.0.9 github.com/felixge/httpsnoop v1.0.4 - github.com/ipfs/boxo v0.18.0 + github.com/ipfs/boxo v0.18.1-0.20240306111355-f261eac52f4c github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.33.0 diff --git a/go.sum b/go.sum index b72b0d7..3f9f998 100644 --- a/go.sum +++ b/go.sum @@ -186,8 +186,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/boxo v0.18.0 h1:MOL9/AgoV3e7jlVMInicaSdbgralfqSsbkc31dZ9tmw= -github.com/ipfs/boxo v0.18.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= +github.com/ipfs/boxo v0.18.1-0.20240306111355-f261eac52f4c h1:e0PwRwv2XN6K1PBFxaUJfvCTEtXXW4A7d3d/bcoxwRk= +github.com/ipfs/boxo v0.18.1-0.20240306111355-f261eac52f4c/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= diff --git a/main.go b/main.go index 626c008..8ea63ef 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,13 @@ package main import ( + "encoding/json" "errors" "log" "os" "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multibase" @@ -85,7 +87,39 @@ func main() { if err != nil { return err } - return findProviders(ctx.Context, c, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.findProviders(ctx.Context, c) + }, + }, + { + Name: "provide", + Usage: "provide ", + UsageText: "Provide a one or multiple announcement records to the network", + Action: func(ctx *cli.Context) error { + if ctx.NArg() < 1 { + return errors.New("invalid command, see help") + } + records := []*types.AnnouncementRecord{} + for _, recordPath := range ctx.Args().Slice() { + recordData, err := os.ReadFile(recordPath) + if err != nil { + return err + } + var record *types.AnnouncementRecord + err = json.Unmarshal(recordData, &record) + if err != nil { + return err + } + records = append(records, record) + } + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.provide(ctx.Context, records...) }, }, { @@ -101,7 +135,39 @@ func main() { if err != nil { return err } - return findPeers(ctx.Context, pid, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.findPeers(ctx.Context, pid) + }, + }, + { + Name: "providepeers", + Usage: "providepeers ", + UsageText: "Provide a one or multiple peer announcement records to the network", + Action: func(ctx *cli.Context) error { + if ctx.NArg() < 1 { + return errors.New("invalid command, see help") + } + records := []*types.AnnouncementRecord{} + for _, recordPath := range ctx.Args().Slice() { + recordData, err := os.ReadFile(recordPath) + if err != nil { + return err + } + var record *types.AnnouncementRecord + err = json.Unmarshal(recordData, &record) + if err != nil { + return err + } + records = append(records, record) + } + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.providePeer(ctx.Context, records...) }, }, { @@ -117,7 +183,11 @@ func main() { if err != nil { return err } - return getIPNS(ctx.Context, name, ctx.String("endpoint"), ctx.Bool("pretty")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.getIPNS(ctx.Context, name) }, }, { @@ -139,7 +209,11 @@ func main() { if err != nil { return err } - return putIPNS(ctx.Context, name, recBytes, ctx.String("endpoint")) + cl, err := newAskClient(ctx.String("endpoint"), ctx.Bool("pretty"), os.Stdout) + if err != nil { + return err + } + return cl.putIPNS(ctx.Context, name, recBytes) }, }, }, diff --git a/server_routers.go b/server_routers.go index fa32bd7..b7c5745 100644 --- a/server_routers.go +++ b/server_routers.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "math" "sync" "time" @@ -24,10 +25,12 @@ type router interface { type providersRouter interface { FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) + Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) } type peersRouter interface { FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) + ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) } type ipnsRouter interface { @@ -50,6 +53,13 @@ func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit return r.providers.FindProviders(ctx, key, limit) } +func (r composableRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if r.providers == nil { + return 0, nil + } + return r.providers.Provide(ctx, req) +} + func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { if r.peers == nil { return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil @@ -57,6 +67,13 @@ func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) return r.peers.FindPeers(ctx, pid, limit) } +func (r composableRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if r.peers == nil { + return 0, nil + } + return r.peers.ProvidePeer(ctx, req) +} + func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { if r.ipns == nil { return nil, routing.ErrNotFound @@ -71,11 +88,6 @@ func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *i return r.ipns.PutIPNS(ctx, name, record) } -//lint:ignore SA1019 // ignore staticcheck -func (r composableRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported -} - var _ server.ContentRouter = parallelRouter{} type parallelRouter struct { @@ -206,6 +218,57 @@ func (mi *manyIter[T]) Close() error { return err } +func (r parallelRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) { + return r.Provide(ctx, req) + }) +} + +func (r parallelRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) { + return r.ProvidePeer(ctx, req) + }) +} + +func provide(ctx context.Context, routers []router, call func(context.Context, router) (time.Duration, error)) (time.Duration, error) { + switch len(routers) { + case 0: + return 0, nil + case 1: + return call(ctx, routers[0]) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + resultsTTL := make([]time.Duration, len(routers)) + resultsErr := make([]error, len(routers)) + wg.Add(len(routers)) + for i, ri := range routers { + go func(ri router, i int) { + resultsTTL[i], resultsErr[i] = call(ctx, ri) + wg.Done() + }(ri, i) + } + wg.Wait() + + var err error + for _, e := range resultsErr { + err = errors.Join(err, e) + } + + // Choose lowest TTL to return. + var ttl time.Duration = math.MaxInt64 + for _, t := range resultsTTL { + if t < ttl { + ttl = t + } + } + + return ttl, err +} + func (r parallelRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { switch len(r.routers) { case 0: @@ -296,11 +359,6 @@ func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipn return errs } -//lint:ignore SA1019 // ignore staticcheck -func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported -} - var _ router = libp2pRouter{} type libp2pRouter struct { @@ -316,6 +374,12 @@ func (d libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) }), nil } +func (d libp2pRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // NOTE: this router cannot provide further to the DHT, since we can only + // announce CIDs that our own node has, which is not the case. + return 0, routing.ErrNotSupported +} + func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -337,6 +401,10 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil } +func (r libp2pRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return 0, routing.ErrNotSupported +} + func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -409,6 +477,37 @@ func (d clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) return d.Client.FindProviders(ctx, cid) } +func (d clientRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return d.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return d.Client.ProvideRecords(ctx, req) + }) +} + func (d clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { return d.Client.FindPeers(ctx, pid) } + +func (d clientRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + return d.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return d.Client.ProvidePeerRecords(ctx, req) + }) +} + +func (d clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementResponseRecord], error)) (time.Duration, error) { + resultsIter, err := do() + if err != nil { + return 0, err + } + defer resultsIter.Close() + + records, err := iter.ReadAllResults(resultsIter) + if err != nil { + return 0, err + } + + if len(records) != 1 { + return 0, errors.New("invalid number of records returned") + } + + return records[0].TTL, nil +} diff --git a/server_routers_test.go b/server_routers_test.go index 2ef826d..32680b0 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -32,6 +32,11 @@ func (m *mockRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } +func (m *mockRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid, limit) if arg0 := args.Get(0); arg0 == nil { @@ -40,6 +45,11 @@ func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (ite return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) if arg0 := args.Get(0); arg0 == nil {