Skip to content

Commit

Permalink
fix(kuma-cp): improve ZoneInsight subscription management (#8153)
Browse files Browse the repository at this point in the history
* fix(finalizer): watch multiple online subs
* test(finalizer): add tests with multiple online subs
* refactor: remove GetLastSubscription from generic.Insight
* test: prevent conflicts in BeforeEach by waiting for finalizer to stop
* test: add conflict retry in insight updates

Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont authored Nov 1, 2023
1 parent 8cc520e commit d256a5e
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 143 deletions.
21 changes: 20 additions & 1 deletion api/generic/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,35 @@ import (
"google.golang.org/protobuf/proto"
)

func AllSubscriptions[S Subscription, T interface{ GetSubscriptions() []S }](t T) []Subscription {
var subs []Subscription
for _, s := range t.GetSubscriptions() {
subs = append(subs, s)
}
return subs
}

func GetSubscription[S Subscription, T interface{ GetSubscriptions() []S }](t T, id string) Subscription {
for _, s := range t.GetSubscriptions() {
if s.GetId() == id {
return s
}
}
return nil
}

type Insight interface {
proto.Message
IsOnline() bool
GetLastSubscription() Subscription
GetSubscription(id string) Subscription
AllSubscriptions() []Subscription
UpdateSubscription(Subscription) error
}

type Subscription interface {
proto.Message
GetId() string
GetGeneration() uint32
IsOnline() bool
SetDisconnectTime(time time.Time)
}
30 changes: 17 additions & 13 deletions api/mesh/v1alpha1/dataplane_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ func (x *DataplaneInsight) IsOnline() bool {
return false
}

func (x *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
func (x *DataplaneInsight) AllSubscriptions() []generic.Subscription {
return generic.AllSubscriptions[*DiscoverySubscription](x)
}

func (x *DataplaneInsight) GetSubscription(id string) generic.Subscription {
return generic.GetSubscription[*DiscoverySubscription](x, id)
}

func (x *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time, issuedBackend string, supportedBackends []string) error {
Expand Down Expand Up @@ -93,13 +92,14 @@ func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for DataplaneInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -126,6 +126,10 @@ func (x *DiscoverySubscription) SetDisconnectTime(t time.Time) {
x.DisconnectTime = util_proto.MustTimestampProto(t)
}

func (x *DiscoverySubscription) IsOnline() bool {
return x.GetConnectTime() != nil && x.GetDisconnectTime() == nil
}

func (x *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range x.GetSubscriptions() {
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/dataplane_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ var _ = Describe("DataplaneHelpers", func() {
})).To(Succeed())

// then
_, subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.(*DiscoverySubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
26 changes: 13 additions & 13 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ import (

var _ generic.Insight = &ZoneIngressInsight{}

func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
func (x *ZoneIngressInsight) GetSubscription(id string) generic.Subscription {
return generic.GetSubscription[*DiscoverySubscription](x, id)
}

func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
Expand All @@ -26,13 +21,14 @@ func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneIngressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -57,6 +53,10 @@ func (x *ZoneIngressInsight) IsOnline() bool {
return false
}

func (x *ZoneIngressInsight) AllSubscriptions() []generic.Subscription {
return generic.AllSubscriptions[*DiscoverySubscription](x)
}

func (x *ZoneIngressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return (*DiscoverySubscription)(nil)
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Ingress Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*mesh_proto.DiscoverySubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
26 changes: 13 additions & 13 deletions api/mesh/v1alpha1/zoneegressinsight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ import (

var _ generic.Insight = &ZoneEgressInsight{}

func (x *ZoneEgressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
func (x *ZoneEgressInsight) GetSubscription(id string) generic.Subscription {
return generic.GetSubscription[*DiscoverySubscription](x, id)
}

func (x *ZoneEgressInsight) UpdateSubscription(s generic.Subscription) error {
Expand All @@ -26,13 +21,14 @@ func (x *ZoneEgressInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneEgressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -57,6 +53,10 @@ func (x *ZoneEgressInsight) IsOnline() bool {
return false
}

func (x *ZoneEgressInsight) AllSubscriptions() []generic.Subscription {
return generic.AllSubscriptions[*DiscoverySubscription](x)
}

func (x *ZoneEgressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return (*DiscoverySubscription)(nil)
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/zoneegressinsight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Egress Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*mesh_proto.DiscoverySubscription)).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
30 changes: 17 additions & 13 deletions api/system/v1alpha1/zone_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@ func NewSubscriptionStatus() *KDSSubscriptionStatus {
}
}

func (x *ZoneInsight) GetSubscription(id string) (int, *KDSSubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
func (x *ZoneInsight) GetSubscription(id string) generic.Subscription {
return generic.GetSubscription[*KDSSubscription](x, id)
}

func (x *ZoneInsight) GetLastSubscription() generic.Subscription {
Expand All @@ -44,10 +39,18 @@ func (x *ZoneInsight) IsOnline() bool {
return false
}

func (x *ZoneInsight) AllSubscriptions() []generic.Subscription {
return generic.AllSubscriptions[*KDSSubscription](x)
}

func (x *KDSSubscription) SetDisconnectTime(time time.Time) {
x.DisconnectTime = timestamppb.New(time)
}

func (x *KDSSubscription) IsOnline() bool {
return x.GetConnectTime() != nil && x.GetDisconnectTime() == nil
}

func (x *ZoneInsight) Sum(v func(*KDSSubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range x.GetSubscriptions() {
Expand All @@ -64,13 +67,14 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneInsight", s)
}
i, old := x.GetSubscription(kdsSubscription.Id)
if old != nil {
x.Subscriptions[i] = kdsSubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, kdsSubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == kdsSubscription.Id {
x.Subscriptions[i] = kdsSubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, kdsSubscription)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions api/system/v1alpha1/zone_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*system_proto.KDSSubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gc/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func setupFinalizer(rt runtime.Runtime) error {
return errors.Errorf("unknown Kuma CP mode %s", rt.Config().Mode)
}

finalizer, err := NewSubscriptionFinalizer(rt.ResourceManager(), rt.Tenants(), newTicker, rt.Metrics(), resourceTypes...)
finalizer, err := NewSubscriptionFinalizer(rt.ResourceManager(), rt.Tenants(), newTicker, rt.Metrics(), rt.Extensions(), resourceTypes...)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d256a5e

Please sign in to comment.