Skip to content

Commit

Permalink
complete unary requests
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Jan 1, 2025
1 parent 8d855a6 commit 0f79f38
Show file tree
Hide file tree
Showing 11 changed files with 734 additions and 102 deletions.
64 changes: 41 additions & 23 deletions s2/account_service_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
type listBasinsServiceRequest struct {
Client pb.AccountServiceClient
Req *ListBasinsRequest
Resp *ListBasinsResponse
}

func (r *listBasinsServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *listBasinsServiceRequest) Send(ctx context.Context) (any, error) {
func (r *listBasinsServiceRequest) Send(ctx context.Context) error {
req := &pb.ListBasinsRequest{
Prefix: r.Req.Prefix,
StartAfter: r.Req.StartAfter,
Expand All @@ -28,42 +29,44 @@ func (r *listBasinsServiceRequest) Send(ctx context.Context) (any, error) {

pbResp, err := r.Client.ListBasins(ctx, req)
if err != nil {
return nil, err
return err
}

pbBasins := pbResp.GetBasins()
basinInfos := make([]BasinInfo, 0, len(pbBasins))
for _, pbInfo := range pbBasins {
info, err := basinInfoFromProto(pbInfo)
if err != nil {
return nil, err
return err
}
basinInfos = append(basinInfos, info)
}

return &ListBasinsResponse{
r.Resp = &ListBasinsResponse{
Basins: basinInfos,
HasMore: pbResp.GetHasMore(),
}, nil
}
return nil
}

type createBasinServiceRequest struct {
Client pb.AccountServiceClient
Req *CreateBasinRequest
ReqID uuid.UUID
Info *BasinInfo
}

func (r *createBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *createBasinServiceRequest) Send(ctx context.Context) (any, error) {
func (r *createBasinServiceRequest) Send(ctx context.Context) error {
var basinConfig *pb.BasinConfig
if r.Req.Config != nil {
var err error
basinConfig, err = basinConfigIntoProto(r.Req.Config)
if err != nil {
return nil, err
return err
}
}

Expand All @@ -76,15 +79,16 @@ func (r *createBasinServiceRequest) Send(ctx context.Context) (any, error) {

pbResp, err := r.Client.CreateBasin(ctx, req)
if err != nil {
return nil, err
return err
}

info, err := basinInfoFromProto(pbResp.GetInfo())
if err != nil {
return nil, err
return err
}

return &info, nil
r.Info = &info
return nil
}

type deleteBasinServiceRequest struct {
Expand All @@ -96,7 +100,7 @@ func (r *deleteBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *deleteBasinServiceRequest) Send(ctx context.Context) (any, error) {
func (r *deleteBasinServiceRequest) Send(ctx context.Context) error {
req := &pb.DeleteBasinRequest{
Basin: r.Req.Basin,
}
Expand All @@ -105,31 +109,32 @@ func (r *deleteBasinServiceRequest) Send(ctx context.Context) (any, error) {
if err != nil {
statusErr, ok := status.FromError(err)
if ok && statusErr.Code() == codes.NotFound && r.Req.IfExists {
return struct{}{}, nil
return nil
}

return struct{}{}, err
return err
}

return struct{}{}, nil
return nil
}

type reconfigureBasinServiceRequest struct {
Client pb.AccountServiceClient
Req *ReconfigureBasinRequest
Client pb.AccountServiceClient
Req *ReconfigureBasinRequest
UpdatedConfig *BasinConfig
}

func (r *reconfigureBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) (any, error) {
func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) error {
var basinConfig *pb.BasinConfig
if r.Req.Config != nil {
var err error
basinConfig, err = basinConfigIntoProto(r.Req.Config)
if err != nil {
return nil, err
return err
}
}

Expand All @@ -146,30 +151,43 @@ func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) (any, error)

pbResp, err := r.Client.ReconfigureBasin(ctx, req)
if err != nil {
return nil, err
return err
}

config, err := basinConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
}

return basinConfigFromProto(pbResp.GetConfig())
r.UpdatedConfig = config
return nil
}

type getBasinConfigRequest struct {
Client pb.AccountServiceClient
Basin string
Config *BasinConfig
}

func (r *getBasinConfigRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *getBasinConfigRequest) Send(ctx context.Context) (any, error) {
func (r *getBasinConfigRequest) Send(ctx context.Context) error {
req := &pb.GetBasinConfigRequest{
Basin: r.Basin,
}

pbResp, err := r.Client.GetBasinConfig(ctx, req)
if err != nil {
return nil, err
return err
}

config, err := basinConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
}

return basinConfigFromProto(pbResp.GetConfig())
r.Config = config
return nil
}
133 changes: 126 additions & 7 deletions s2/basin_service_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@ package s2
import (
"context"

"github.com/google/uuid"
"github.com/s2-streamstore/s2-sdk-go/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

type listStreamsServiceRequest struct {
Client pb.BasinServiceClient
Req *ListStreamsRequest
Resp *ListStreamsResponse
}

func (r *listStreamsServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *listStreamsServiceRequest) Send(ctx context.Context) (any, error) {
func (r *listStreamsServiceRequest) Send(ctx context.Context) error {
req := &pb.ListStreamsRequest{
Prefix: r.Req.Prefix,
StartAfter: r.Req.StartAfter,
Expand All @@ -24,7 +29,7 @@ func (r *listStreamsServiceRequest) Send(ctx context.Context) (any, error) {

pbResp, err := r.Client.ListStreams(ctx, req)
if err != nil {
return nil, err
return err
}

pbStreams := pbResp.GetStreams()
Expand All @@ -33,30 +38,144 @@ func (r *listStreamsServiceRequest) Send(ctx context.Context) (any, error) {
streamInfos = append(streamInfos, streamInfoFromProto(pbInfo))
}

return &ListStreamsResponse{
r.Resp = &ListStreamsResponse{
Streams: streamInfos,
HasMore: pbResp.GetHasMore(),
}, nil
}
return nil
}

type createStreamServiceRequest struct {
Client pb.BasinServiceClient
Req *CreateStreamRequest
ReqID uuid.UUID
Info *StreamInfo
}

func (r *createStreamServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *createStreamServiceRequest) Send(ctx context.Context) error {
config, err := streamConfigIntoProto(r.Req.Config)
if err != nil {
return err
}

req := &pb.CreateStreamRequest{
Stream: r.Req.Stream,
Config: config,
}

ctx = ctxWithHeader(ctx, "s2-request-token", r.ReqID.String())

pbResp, err := r.Client.CreateStream(ctx, req)
if err != nil {
return err
}

info := streamInfoFromProto(pbResp.GetInfo())
r.Info = &info
return nil
}

type deleteStreamServiceRequest struct {
Client pb.BasinServiceClient
Req *DeleteStreamRequest
}

func (r *deleteStreamServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *deleteStreamServiceRequest) Send(ctx context.Context) error {
req := &pb.DeleteStreamRequest{
Stream: r.Req.Stream,
}

_, err := r.Client.DeleteStream(ctx, req)
if err != nil {
statusErr, ok := status.FromError(err)
if ok && statusErr.Code() == codes.NotFound && r.Req.IfExists {
return nil
}

return err
}

return nil
}

type reconfigureStreamServiceRequest struct {
Client pb.BasinServiceClient
Req *ReconfigureStreamRequest
UpdatedConfig *StreamConfig
}

func (r *reconfigureStreamServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *reconfigureStreamServiceRequest) Send(ctx context.Context) error {
var streamConfig *pb.StreamConfig
if r.Req.Config != nil {
var err error
streamConfig, err = streamConfigIntoProto(r.Req.Config)
if err != nil {
return err
}
}

var mask *fieldmaskpb.FieldMask
if r.Req.Mask != nil {
mask = &fieldmaskpb.FieldMask{Paths: r.Req.Mask}
}

req := &pb.ReconfigureStreamRequest{
Stream: r.Req.Stream,
Config: streamConfig,
Mask: mask,
}

pbResp, err := r.Client.ReconfigureStream(ctx, req)
if err != nil {
return err
}

config, err := streamConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
}

r.UpdatedConfig = config
return nil
}

type getStreamConfigServiceRequest struct {
Client pb.BasinServiceClient
Stream string
Config *StreamConfig
}

func (r *getStreamConfigServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *getStreamConfigServiceRequest) Send(ctx context.Context) (any, error) {
func (r *getStreamConfigServiceRequest) Send(ctx context.Context) error {
req := &pb.GetStreamConfigRequest{
Stream: r.Stream,
}

pbResp, err := r.Client.GetStreamConfig(ctx, req)
if err != nil {
return nil, err
return err
}

config, err := streamConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
}

return streamConfigFromProto(pbResp.GetConfig())
r.Config = config
return nil
}
Loading

0 comments on commit 0f79f38

Please sign in to comment.