Skip to content

Commit

Permalink
feat(rpc/flipt): add reference to read request protocols
Browse files Browse the repository at this point in the history
refactor(storage): thread reference on storage request structs

refactor(storage): rename ListPredicates to PageParameter

refactor(storage): rename NewListRequest to ListWithOptions

refactor(storage): rename List to ListWithParameters

refactor(storage): rename ReferencePredicate to ReferenceRequest

refactor(storage): rename NamespacePredicate to NamespaceRequest

refactor(storage): rename ResourcePredicate to ResourceRequest

refactor(storage): rename IDPredicate to IDRequest

chore(rpc/flipt): proto-gen-go comment version updates

refactor(storage): abstract reference threading behind new ReferencedSnapshotStore interface
  • Loading branch information
GeorgeMac committed Jan 7, 2024
1 parent 0ef0054 commit 0036b52
Show file tree
Hide file tree
Showing 58 changed files with 2,630 additions and 2,104 deletions.
209 changes: 154 additions & 55 deletions go.work.sum

Large diffs are not rendered by default.

72 changes: 36 additions & 36 deletions internal/common/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ func (m *StoreMock) String() string {
return "mock"
}

func (m *StoreMock) GetNamespace(ctx context.Context, key string) (*flipt.Namespace, error) {
args := m.Called(ctx, key)
func (m *StoreMock) GetNamespace(ctx context.Context, ns storage.NamespaceRequest) (*flipt.Namespace, error) {
args := m.Called(ctx, ns)
return args.Get(0).(*flipt.Namespace), args.Error(1)
}

func (m *StoreMock) ListNamespaces(ctx context.Context, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Namespace], error) {
args := m.Called(ctx, opts)
func (m *StoreMock) ListNamespaces(ctx context.Context, req *storage.ListRequest[storage.ReferenceRequest]) (storage.ResultSet[*flipt.Namespace], error) {
args := m.Called(ctx, req)
return args.Get(0).(storage.ResultSet[*flipt.Namespace]), args.Error(1)
}

func (m *StoreMock) CountNamespaces(ctx context.Context) (uint64, error) {
args := m.Called(ctx)
func (m *StoreMock) CountNamespaces(ctx context.Context, p storage.ReferenceRequest) (uint64, error) {
args := m.Called(ctx, p)
return args.Get(0).(uint64), args.Error(1)
}

Expand All @@ -48,18 +48,18 @@ func (m *StoreMock) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespac
return args.Error(0)
}

func (m *StoreMock) GetFlag(ctx context.Context, namespaceKey, key string) (*flipt.Flag, error) {
args := m.Called(ctx, namespaceKey, key)
func (m *StoreMock) GetFlag(ctx context.Context, flag storage.ResourceRequest) (*flipt.Flag, error) {
args := m.Called(ctx, flag)
return args.Get(0).(*flipt.Flag), args.Error(1)
}

func (m *StoreMock) ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Flag], error) {
args := m.Called(ctx, namespaceKey, opts)
func (m *StoreMock) ListFlags(ctx context.Context, req *storage.ListRequest[storage.NamespaceRequest]) (storage.ResultSet[*flipt.Flag], error) {
args := m.Called(ctx, req)
return args.Get(0).(storage.ResultSet[*flipt.Flag]), args.Error(1)
}

func (m *StoreMock) CountFlags(ctx context.Context, namespaceKey string) (uint64, error) {
args := m.Called(ctx, namespaceKey)
func (m *StoreMock) CountFlags(ctx context.Context, ns storage.NamespaceRequest) (uint64, error) {
args := m.Called(ctx, ns)
return args.Get(0).(uint64), args.Error(1)
}

Expand Down Expand Up @@ -93,18 +93,18 @@ func (m *StoreMock) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantReq
return args.Error(0)
}

func (m *StoreMock) GetSegment(ctx context.Context, namespaceKey string, key string) (*flipt.Segment, error) {
args := m.Called(ctx, namespaceKey, key)
func (m *StoreMock) GetSegment(ctx context.Context, segment storage.ResourceRequest) (*flipt.Segment, error) {
args := m.Called(ctx, segment)
return args.Get(0).(*flipt.Segment), args.Error(1)
}

func (m *StoreMock) ListSegments(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Segment], error) {
args := m.Called(ctx, namespaceKey, opts)
func (m *StoreMock) ListSegments(ctx context.Context, req *storage.ListRequest[storage.NamespaceRequest]) (storage.ResultSet[*flipt.Segment], error) {
args := m.Called(ctx, req)
return args.Get(0).(storage.ResultSet[*flipt.Segment]), args.Error(1)
}

func (m *StoreMock) CountSegments(ctx context.Context, namespaceKey string) (uint64, error) {
args := m.Called(ctx, namespaceKey)
func (m *StoreMock) CountSegments(ctx context.Context, ns storage.NamespaceRequest) (uint64, error) {
args := m.Called(ctx, ns)
return args.Get(0).(uint64), args.Error(1)
}

Expand Down Expand Up @@ -138,18 +138,18 @@ func (m *StoreMock) DeleteConstraint(ctx context.Context, r *flipt.DeleteConstra
return args.Error(0)
}

func (m *StoreMock) ListRollouts(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Rollout], error) {
args := m.Called(ctx, namespaceKey, flagKey, opts)
func (m *StoreMock) ListRollouts(ctx context.Context, req *storage.ListRequest[storage.ResourceRequest]) (storage.ResultSet[*flipt.Rollout], error) {
args := m.Called(ctx, req)
return args.Get(0).(storage.ResultSet[*flipt.Rollout]), args.Error(1)
}

func (m *StoreMock) CountRollouts(ctx context.Context, namespaceKey string, flagKey string) (uint64, error) {
args := m.Called(ctx, namespaceKey, flagKey)
func (m *StoreMock) CountRollouts(ctx context.Context, flag storage.ResourceRequest) (uint64, error) {
args := m.Called(ctx, flag)
return args.Get(0).(uint64), args.Error(1)
}

func (m *StoreMock) GetRollout(ctx context.Context, namespaceKey string, key string) (*flipt.Rollout, error) {
args := m.Called(ctx, namespaceKey, key)
func (m *StoreMock) GetRollout(ctx context.Context, ns storage.NamespaceRequest, id string) (*flipt.Rollout, error) {
args := m.Called(ctx, ns, id)
return args.Get(0).(*flipt.Rollout), args.Error(1)
}

Expand All @@ -173,18 +173,18 @@ func (m *StoreMock) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsReq
return args.Error(0)
}

func (m *StoreMock) GetRule(ctx context.Context, namespaceKey string, id string) (*flipt.Rule, error) {
args := m.Called(ctx, namespaceKey, id)
func (m *StoreMock) GetRule(ctx context.Context, ns storage.NamespaceRequest, id string) (*flipt.Rule, error) {
args := m.Called(ctx, ns, id)
return args.Get(0).(*flipt.Rule), args.Error(1)
}

func (m *StoreMock) ListRules(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Rule], error) {
args := m.Called(ctx, namespaceKey, flagKey, opts)
func (m *StoreMock) ListRules(ctx context.Context, req *storage.ListRequest[storage.ResourceRequest]) (storage.ResultSet[*flipt.Rule], error) {
args := m.Called(ctx, req)
return args.Get(0).(storage.ResultSet[*flipt.Rule]), args.Error(1)
}

func (m *StoreMock) CountRules(ctx context.Context, namespaceKey, flagKey string) (uint64, error) {
args := m.Called(ctx, namespaceKey, flagKey)
func (m *StoreMock) CountRules(ctx context.Context, flag storage.ResourceRequest) (uint64, error) {
args := m.Called(ctx, flag)
return args.Get(0).(uint64), args.Error(1)
}

Expand Down Expand Up @@ -223,17 +223,17 @@ func (m *StoreMock) DeleteDistribution(ctx context.Context, r *flipt.DeleteDistr
return args.Error(0)
}

func (m *StoreMock) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) ([]*storage.EvaluationRule, error) {
args := m.Called(ctx, namespaceKey, flagKey)
func (m *StoreMock) GetEvaluationRules(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRule, error) {
args := m.Called(ctx, flag)
return args.Get(0).([]*storage.EvaluationRule), args.Error(1)
}

func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, ruleID string) ([]*storage.EvaluationDistribution, error) {
args := m.Called(ctx, ruleID)
func (m *StoreMock) GetEvaluationDistributions(ctx context.Context, rule storage.IDRequest) ([]*storage.EvaluationDistribution, error) {
args := m.Called(ctx, rule)
return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1)
}

func (m *StoreMock) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*storage.EvaluationRollout, error) {
args := m.Called(ctx, namespaceKey, flagKey)
func (m *StoreMock) GetEvaluationRollouts(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRollout, error) {
args := m.Called(ctx, flag)
return args.Get(0).([]*storage.EvaluationRollout), args.Error(1)
}
16 changes: 10 additions & 6 deletions internal/server/evaluation/data/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type EvaluationStore interface {
ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Flag], error)
ListFlags(ctx context.Context, req *storage.ListRequest[storage.NamespaceRequest]) (storage.ResultSet[*flipt.Flag], error)
storage.EvaluationStore
}

Expand Down Expand Up @@ -92,6 +92,7 @@ func toEvaluationRolloutType(r flipt.RolloutType) evaluation.EvaluationRolloutTy
func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluation.EvaluationNamespaceSnapshotRequest) (*evaluation.EvaluationNamespaceSnapshot, error) {
var (
namespaceKey = r.Key
reference = r.Reference
resp = &evaluation.EvaluationNamespaceSnapshot{
Namespace: &evaluation.EvaluationNamespace{ // TODO: should we get from store?
Key: namespaceKey,
Expand All @@ -107,8 +108,10 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio
for remaining {
res, err := srv.store.ListFlags(
ctx,
namespaceKey,
storage.WithPageToken(nextPage),
storage.ListWithOptions(
storage.NewNamespace(namespaceKey, storage.WithReference(reference)),
storage.ListWithQueryParamOptions[storage.NamespaceRequest](storage.WithPageToken(nextPage)),
),
)
if err != nil {
return nil, fmt.Errorf("getting flags: %w", err)
Expand All @@ -129,8 +132,9 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio
UpdatedAt: f.UpdatedAt,
}

flagKey := storage.NewResource(namespaceKey, f.Key, storage.WithReference(reference))
if f.Type == flipt.FlagType_VARIANT_FLAG_TYPE {
rules, err := srv.store.GetEvaluationRules(ctx, namespaceKey, f.Key)
rules, err := srv.store.GetEvaluationRules(ctx, flagKey)
if err != nil {
return nil, fmt.Errorf("getting rules for flag %q: %w", f.Key, err)
}
Expand Down Expand Up @@ -167,7 +171,7 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio
rule.Segments = append(rule.Segments, ss)
}

distributions, err := srv.store.GetEvaluationDistributions(ctx, r.ID)
distributions, err := srv.store.GetEvaluationDistributions(ctx, storage.NewID(r.ID, storage.WithReference(reference)))
if err != nil {
return nil, fmt.Errorf("getting distributions for rule %q: %w", r.ID, err)
}
Expand All @@ -190,7 +194,7 @@ func (srv *Server) EvaluationSnapshotNamespace(ctx context.Context, r *evaluatio
}

if f.Type == flipt.FlagType_BOOLEAN_FLAG_TYPE {
rollouts, err := srv.store.GetEvaluationRollouts(ctx, namespaceKey, f.Key)
rollouts, err := srv.store.GetEvaluationRollouts(ctx, flagKey)
if err != nil {
return nil, fmt.Errorf("getting rollout rules for flag %q: %w", f.Key, err)
}
Expand Down
17 changes: 6 additions & 11 deletions internal/server/evaluation/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
errs "go.flipt.io/flipt/errors"
"go.flipt.io/flipt/internal/server/metrics"
fliptotel "go.flipt.io/flipt/internal/server/otel"
"go.flipt.io/flipt/internal/storage"
"go.flipt.io/flipt/rpc/flipt"
rpcevaluation "go.flipt.io/flipt/rpc/flipt/evaluation"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -21,7 +22,7 @@ import (
// Variant evaluates a request for a multi-variate flag and entity.
// It adapts the 'v2' evaluation API and proxies the request to the 'v1' evaluation API.
func (s *Server) Variant(ctx context.Context, r *rpcevaluation.EvaluationRequest) (*rpcevaluation.VariantEvaluationResponse, error) {
flag, err := s.store.GetFlag(ctx, r.NamespaceKey, r.FlagKey)
flag, err := s.store.GetFlag(ctx, storage.NewResource(r.NamespaceKey, r.FlagKey, storage.WithReference(r.Reference)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -53,13 +54,7 @@ func (s *Server) Variant(ctx context.Context, r *rpcevaluation.EvaluationRequest
}

func (s *Server) variant(ctx context.Context, flag *flipt.Flag, r *rpcevaluation.EvaluationRequest) (*rpcevaluation.VariantEvaluationResponse, error) {
resp, err := s.evaluator.Evaluate(ctx, flag, &flipt.EvaluationRequest{
RequestId: r.RequestId,
FlagKey: r.FlagKey,
EntityId: r.EntityId,
Context: r.Context,
NamespaceKey: r.NamespaceKey,
})
resp, err := s.evaluator.Evaluate(ctx, flag, r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +88,7 @@ func (s *Server) variant(ctx context.Context, flag *flipt.Flag, r *rpcevaluation

// Boolean evaluates a request for a boolean flag and entity.
func (s *Server) Boolean(ctx context.Context, r *rpcevaluation.EvaluationRequest) (*rpcevaluation.BooleanEvaluationResponse, error) {
flag, err := s.store.GetFlag(ctx, r.NamespaceKey, r.FlagKey)
flag, err := s.store.GetFlag(ctx, storage.NewResource(r.NamespaceKey, r.FlagKey, storage.WithReference(r.Reference)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,7 +122,7 @@ func (s *Server) Boolean(ctx context.Context, r *rpcevaluation.EvaluationRequest
}

func (s *Server) boolean(ctx context.Context, flag *flipt.Flag, r *rpcevaluation.EvaluationRequest) (*rpcevaluation.BooleanEvaluationResponse, error) {
rollouts, err := s.store.GetEvaluationRollouts(ctx, r.NamespaceKey, flag.Key)
rollouts, err := s.store.GetEvaluationRollouts(ctx, storage.NewResource(r.NamespaceKey, flag.Key, storage.WithReference(r.Reference)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -256,7 +251,7 @@ func (s *Server) Batch(ctx context.Context, b *rpcevaluation.BatchEvaluationRequ
}

for _, req := range b.GetRequests() {
f, err := s.store.GetFlag(ctx, req.NamespaceKey, req.FlagKey)
f, err := s.store.GetFlag(ctx, storage.NewResource(req.NamespaceKey, req.FlagKey, storage.WithReference(b.Reference)))
if err != nil {
var errnf errs.ErrNotFound
if errors.As(err, &errnf) {
Expand Down
14 changes: 7 additions & 7 deletions internal/server/evaluation/evaluation_store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ func (e *evaluationStoreMock) String() string {
return "mock"
}

func (e *evaluationStoreMock) GetFlag(ctx context.Context, namespaceKey, key string) (*flipt.Flag, error) {
args := e.Called(ctx, namespaceKey, key)
func (e *evaluationStoreMock) GetFlag(ctx context.Context, flag storage.ResourceRequest) (*flipt.Flag, error) {
args := e.Called(ctx, flag)
return args.Get(0).(*flipt.Flag), args.Error(1)
}

func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) ([]*storage.EvaluationRule, error) {
args := e.Called(ctx, namespaceKey, flagKey)
func (e *evaluationStoreMock) GetEvaluationRules(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRule, error) {
args := e.Called(ctx, flag)
return args.Get(0).([]*storage.EvaluationRule), args.Error(1)
}

func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID string) ([]*storage.EvaluationDistribution, error) {
func (e *evaluationStoreMock) GetEvaluationDistributions(ctx context.Context, ruleID storage.IDRequest) ([]*storage.EvaluationDistribution, error) {
args := e.Called(ctx, ruleID)
return args.Get(0).([]*storage.EvaluationDistribution), args.Error(1)
}

func (e *evaluationStoreMock) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*storage.EvaluationRollout, error) {
args := e.Called(ctx, namespaceKey, flagKey)
func (e *evaluationStoreMock) GetEvaluationRollouts(ctx context.Context, flag storage.ResourceRequest) ([]*storage.EvaluationRollout, error) {
args := e.Called(ctx, flag)
return args.Get(0).([]*storage.EvaluationRollout), args.Error(1)
}
Loading

0 comments on commit 0036b52

Please sign in to comment.