Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support gRPC endpoints in core #1513

Merged
merged 63 commits into from
Jan 14, 2025
Merged

feat: support gRPC endpoints in core #1513

merged 63 commits into from
Jan 14, 2025

Conversation

rach-id
Copy link
Member

@rach-id rach-id commented Oct 8, 2024

This is an implementation of a streaming API for blocks in core.

Helps close celestiaorg/celestia-app#3421 but not sure it entirely closes it.

It can easily be used:

package main

import (
	"context"
	"fmt"
	coregrpc "github.com/tendermint/tendermint/rpc/grpc"
)

func main() {
	client := coregrpc.StartBlockAPIGRPCClient("tcp://localhost:9090")

	blockStreamer, err := client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: 2})
	if err != nil {
		panic(err)
	}
	blockMeta, err := client.BlockMetaByHeight(context.Background(), &coregrpc.BlockMetaByHeightRequest{Height: 2})
	if err != nil {
		panic(err)
	}
	parts := make([]*core.Part, 0)
	for i := 0; i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total); i++ {
		resp, err := blockStreamer.Recv()
		if err != nil {
			panic(err)
		}
		parts = append(parts, resp.BlockPart)
		if resp.IsLast && i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total)-1 {
			panic("couldn't get all parts")
		} else if resp.IsLast {
			break
		}
	}

	h := types.NewPartSetFromHeader(types.PartSetHeader{
		Total: blockMeta.BlockMeta.BlockID.PartSetHeader.Total,
		Hash:  blockMeta.BlockMeta.BlockID.PartSetHeader.Hash,
	})

	for _, part := range parts {
		ok, err := h.AddPart(&types.Part{
			Index: part.Index,
			Bytes: part.Bytes,
			Proof: merkle.Proof{
				Total:    part.Proof.Total,
				Index:    part.Proof.Index,
				LeafHash: part.Proof.LeafHash,
				Aunts:    part.Proof.Aunts,
			},
		})
		if err != nil {
			panic(err)
		}
		if !ok {
			panic("not okey")
		}
	}
	pbb := new(core.Block)
	bz, err := io.ReadAll(h.GetReader())
	if err != nil {
		panic(err)
	}
	err = proto.Unmarshal(bz, pbb)
	if err != nil {
		panic(err)
	}
	block, err := types.BlockFromProto(pbb)
	if err != nil {
		panic(err)
	}
	fmt.Println(block)

	// get a commit
	commit, err := client.Commit(context.Background(), &coregrpc.CommitRequest{Height: 10})
	if err != nil {
		panic(err)
	}
	fmt.Println(commit)

	// listen for new heights
	streamer, err := client.SubscribeNewHeights(context.Background(), &coregrpc.SubscribeNewHeightsRequest{})
	if err != nil {
		panic(err)
	}
	for {
		resp, err := streamer.Recv()
		if err != nil {
			panic(err)
		}
		fmt.Println(resp)
	}
}

Ps: I didn't add the tests because I didn't find a direct way of mocking the environment without polluting the rest of the repo (exporting some methods, adding new helpers, etc). And I think since the implementation is simple, just querying the block/state stores for results, it's fine to leave it untested.

@rach-id rach-id requested review from walldiss and renaynay October 8, 2024 09:44
@rach-id rach-id self-assigned this Oct 8, 2024
@rach-id rach-id requested a review from a team as a code owner October 8, 2024 09:44
@rach-id rach-id requested review from staheri14 and ninabarbakadze and removed request for a team October 8, 2024 09:44
@rach-id rach-id marked this pull request as draft October 8, 2024 11:40
@rach-id
Copy link
Member Author

rach-id commented Oct 8, 2024

Just found out that once we enable the app grpc, these endpoints get overridden, I'll be looking into it

proto/tendermint/rpc/grpc/types.proto Outdated Show resolved Hide resolved
proto/tendermint/rpc/grpc/types.proto Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
evan-forbes
evan-forbes previously approved these changes Dec 12, 2024
@rootulp rootulp requested review from rootulp and cmwaters and removed request for staheri14 December 13, 2024 15:54
rootulp
rootulp previously approved these changes Dec 13, 2024
rpc/grpc/api.go Outdated Show resolved Hide resolved
// SubscriptionCapacity the maximum number of pending blocks in the subscription.
const SubscriptionCapacity = 500

func (blockAPI *BlockAPI) retryNewBlocksSubscription(ctx context.Context) (bool, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question][nit] what does the bool return argument indicate? It seems like (isSuccess bool, err error) IMO the bool argument is unnecessary b/c either this function returns:

  1. true, no error
  2. false, an error

So the bool argument could be removed and this could just return an error if the retry failed or nil if the retry succeeded

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also returns false, no error when the context is done. This avoids reporting a context done error unnecessarily and just stops quietly

Comment on lines +199 to +202
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is unnecessary because heightListeners is always non-nil due to:

func NewBlockAPI() *BlockAPI {
	return &BlockAPI{
		heightListeners:   make(map[chan NewHeightEvent]struct{}, 1000),
		subscriptionID:    fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
		subscriptionQuery: eventstypes.EventQueryNewBlock,
	}
}

and it's never explicitly set to nil so this check will never evaluate to true. Proposal to remove it

Suggested change
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a foolproof check because the BlockAPI is exported and anyone can instantiate it and mess up initialising the fields

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's unecessary to safe-guard against this b/c the default NewBlockAPI initializes fields correctly but if you want to inform developers of a code error then this should throw an error instead of silently returning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah sorry, meant when closing, if the field hasn't been initialized correctly or was set to nil at some level, there is no need to close anything as there is nothing to close

var err error
// stop the events subscription
if blockAPI.newBlockSubscription != nil {
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error isn't explicitly handled, is that intentional? I would expect

Suggested change
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
if err != nil {
// handle the error
}
blockAPI.newBlockSubscription = nil

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's returned at the end of the function. If you prefer the error handling to be done in place, I can change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to do error handling immediately after the error b/c in this case the code would log

"gRPC streaming API has been stopped"

even though there may have been an error during the unsubscribe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and would return the error that happened during the unsubscribe to be handled separately, which if I am not missing something, is also logged at a higher level.

opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
opts = append(opts, grpc.WithContextDialer(dialerFunc))
conn, err := grpc.Dial( //nolint:staticcheck
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be refactored to eliminate the nolint directive

Suggested change
conn, err := grpc.Dial( //nolint:staticcheck
conn, err := grpc.NewClient(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that it uses an address from the config which has a tcp prefix and when used with NewClient it fails to connect.

So, changing this would mean reparsing the provided address and separating it to host:port.

We can create an issue to do it along with the other gRPC endpoint for consistency instead of touching both in here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Co-authored-by: Rootul P <[email protected]>
Copy link
Member

@evan-forbes evan-forbes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Collaborator

@rootulp rootulp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback is optional not blocking

Comment on lines +10 to 17
"github.com/tendermint/tendermint/libs/pubsub"

"github.com/tendermint/tendermint/crypto/encoding"

"github.com/tendermint/tendermint/proto/tendermint/crypto"

"github.com/tendermint/tendermint/libs/rand"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these newlines seem unintentional

Suggested change
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/proto/tendermint/crypto"
"github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/proto/tendermint/crypto"
"github.com/tendermint/tendermint/libs/rand"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +199 to +202
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's unecessary to safe-guard against this b/c the default NewBlockAPI initializes fields correctly but if you want to inform developers of a code error then this should throw an error instead of silently returning.

var err error
// stop the events subscription
if blockAPI.newBlockSubscription != nil {
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to do error handling immediately after the error b/c in this case the code would log

"gRPC streaming API has been stopped"

even though there may have been an error during the unsubscribe.

@@ -1,9 +1,19 @@
//nolint:dupl
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disable this linter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't spend time refactoring the testing suite so it has multiple duplicates. I'll open a good first issue for it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rach-id rach-id merged commit 18a7372 into v0.34.x-celestia Jan 14, 2025
20 of 21 checks passed
@rach-id rach-id deleted the grpc-endpoints branch January 14, 2025 19:06
rach-id added a commit that referenced this pull request Jan 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Expose all celestia-node required endpoints through gRPC
6 participants