Skip to content

Commit

Permalink
Make serviceRequest generic
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Jan 1, 2025
1 parent 0f79f38 commit bf663f5
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 150 deletions.
6 changes: 6 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ tasks:
- go build -o .out/examples/{{.NAME}} ./examples/{{.NAME}}
- .out/examples/{{.NAME}}

test:
deps:
- gen
cmds:
- go test ./...

gen:
deps:
- gen:proto
Expand Down
78 changes: 70 additions & 8 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,86 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/s2-streamstore/s2-sdk-go/s2"
"google.golang.org/grpc/grpclog"
)

func main() {
client, err := s2.NewBasinClient(
"vrongmeal-basin",
func printStreamConfig(c *s2.StreamConfig) {
fmt.Printf("storage class = %s\n", c.StorageClass)
fmt.Printf("retention policy = %#v\n", c.RetentionPolicy)
}

func printBasinConfig(c *s2.BasinConfig) {
fmt.Println("default stream config :-")
if c.DefaultStreamConfig == nil {
fmt.Println(nil)
} else {
printStreamConfig(c.DefaultStreamConfig)
}
}

func run() error {
client, err := s2.NewClient(
os.Getenv("S2_AUTH_TOKEN"),
s2.WithEndpoints(&s2.Endpoints{Account: "aws.s2.dev", Basin: "aws.s2.dev"}),
// s2.WithEndpoints(&s2.Endpoints{
// Account: "vrongmeal-macbook.prawn-typhon.ts.net:4243",
// Basin: "vrongmeal-macbook.prawn-typhon.ts.net:4243",
// }),
)
if err != nil {
panic(err)
return err
}

conf, err := client.GetStreamConfig(context.TODO(), "starwars2")
basinName := "vrongmeal-basin-3"

basinInfo, err := client.CreateBasin(context.TODO(), &s2.CreateBasinRequest{
Basin: basinName,
Config: &s2.BasinConfig{
DefaultStreamConfig: &s2.StreamConfig{
StorageClass: s2.StorageClassStandard,
RetentionPolicy: s2.RetentionPolicyAge(7 * time.Second),
},
},
})
if err != nil {
return err
}

fmt.Printf("%#v\n\n", basinInfo)

basinConfig, err := client.GetBasinConfig(context.TODO(), basinName)
if err != nil {
panic(err)
return err
}

fmt.Printf("%#v\n", conf)
printBasinConfig(basinConfig)
println()

updatedBasinConfig, err := client.ReconfigureBasin(context.TODO(), &s2.ReconfigureBasinRequest{
Basin: basinName,
Config: &s2.BasinConfig{
DefaultStreamConfig: &s2.StreamConfig{
StorageClass: s2.StorageClassUnspecified,
RetentionPolicy: s2.RetentionPolicyAge(9 * time.Second),
},
},
})
if err != nil {
return err
}

printBasinConfig(updatedBasinConfig)

return nil
}

func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stderr, os.Stderr))

if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
64 changes: 23 additions & 41 deletions s2/account_service_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ import (
type listBasinsServiceRequest struct {
Client pb.AccountServiceClient
Req *ListBasinsRequest
Resp *ListBasinsResponse
}

func (r *listBasinsServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *listBasinsServiceRequest) Send(ctx context.Context) error {
func (r *listBasinsServiceRequest) Send(ctx context.Context) (*ListBasinsResponse, error) {
req := &pb.ListBasinsRequest{
Prefix: r.Req.Prefix,
StartAfter: r.Req.StartAfter,
Expand All @@ -29,44 +28,42 @@ func (r *listBasinsServiceRequest) Send(ctx context.Context) error {

pbResp, err := r.Client.ListBasins(ctx, req)
if err != nil {
return err
return nil, err
}

pbBasins := pbResp.GetBasins()
basinInfos := make([]BasinInfo, 0, len(pbBasins))
for _, pbInfo := range pbBasins {
info, err := basinInfoFromProto(pbInfo)
if err != nil {
return err
return nil, err
}
basinInfos = append(basinInfos, info)
}

r.Resp = &ListBasinsResponse{
return &ListBasinsResponse{
Basins: basinInfos,
HasMore: pbResp.GetHasMore(),
}
return nil
}, nil
}

type createBasinServiceRequest struct {
Client pb.AccountServiceClient
Req *CreateBasinRequest
ReqID uuid.UUID
Info *BasinInfo
}

func (r *createBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *createBasinServiceRequest) Send(ctx context.Context) error {
func (r *createBasinServiceRequest) Send(ctx context.Context) (*BasinInfo, error) {
var basinConfig *pb.BasinConfig
if r.Req.Config != nil {
var err error
basinConfig, err = basinConfigIntoProto(r.Req.Config)
if err != nil {
return err
return nil, err
}
}

Expand All @@ -79,16 +76,15 @@ func (r *createBasinServiceRequest) Send(ctx context.Context) error {

pbResp, err := r.Client.CreateBasin(ctx, req)
if err != nil {
return err
return nil, err
}

info, err := basinInfoFromProto(pbResp.GetInfo())
if err != nil {
return err
return nil, err
}

r.Info = &info
return nil
return &info, nil
}

type deleteBasinServiceRequest struct {
Expand All @@ -100,7 +96,7 @@ func (r *deleteBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *deleteBasinServiceRequest) Send(ctx context.Context) error {
func (r *deleteBasinServiceRequest) Send(ctx context.Context) (struct{}, error) {
req := &pb.DeleteBasinRequest{
Basin: r.Req.Basin,
}
Expand All @@ -109,32 +105,31 @@ func (r *deleteBasinServiceRequest) Send(ctx context.Context) error {
if err != nil {
statusErr, ok := status.FromError(err)
if ok && statusErr.Code() == codes.NotFound && r.Req.IfExists {
return nil
return struct{}{}, nil
}

return err
return struct{}{}, err
}

return nil
return struct{}{}, nil
}

type reconfigureBasinServiceRequest struct {
Client pb.AccountServiceClient
Req *ReconfigureBasinRequest
UpdatedConfig *BasinConfig
Client pb.AccountServiceClient
Req *ReconfigureBasinRequest
}

func (r *reconfigureBasinServiceRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelIdempotent
}

func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) error {
func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) (*BasinConfig, error) {
var basinConfig *pb.BasinConfig
if r.Req.Config != nil {
var err error
basinConfig, err = basinConfigIntoProto(r.Req.Config)
if err != nil {
return err
return nil, err
}
}

Expand All @@ -151,43 +146,30 @@ func (r *reconfigureBasinServiceRequest) Send(ctx context.Context) error {

pbResp, err := r.Client.ReconfigureBasin(ctx, req)
if err != nil {
return err
}

config, err := basinConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
return nil, err
}

r.UpdatedConfig = config
return nil
return basinConfigFromProto(pbResp.GetConfig())
}

type getBasinConfigRequest struct {
Client pb.AccountServiceClient
Basin string
Config *BasinConfig
}

func (r *getBasinConfigRequest) IdempotencyLevel() idempotencyLevel {
return idempotencyLevelNoSideEffects
}

func (r *getBasinConfigRequest) Send(ctx context.Context) error {
func (r *getBasinConfigRequest) Send(ctx context.Context) (*BasinConfig, error) {
req := &pb.GetBasinConfigRequest{
Basin: r.Basin,
}

pbResp, err := r.Client.GetBasinConfig(ctx, req)
if err != nil {
return err
}

config, err := basinConfigFromProto(pbResp.GetConfig())
if err != nil {
return err
return nil, err
}

r.Config = config
return nil
return basinConfigFromProto(pbResp.GetConfig())
}
Loading

0 comments on commit bf663f5

Please sign in to comment.