Skip to content

Commit

Permalink
Improve comments based on PR review on #583
Browse files Browse the repository at this point in the history
Rename KnownResources to ACKedResources to better reflect the change

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Jan 5, 2024
1 parent 1c4abfb commit cefeb71
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 50 deletions.
10 changes: 5 additions & 5 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// SubscriptionState provides additional data on the client knowledge for the type matching the request
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
// SubscriptionState stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type SubscriptionState interface {
// GetKnownResources returns a list of resources that the client has ACK'd and their associated version.
// GetACKedResources returns a list of resources that the client has ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
GetKnownResources() map[string]string
GetACKedResources() map[string]string

// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetKnownResources()) == 0 {
if len(state.GetACKedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -45,15 +45,15 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetKnownResources() {
for name := range state.GetACKedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
Expand All @@ -63,7 +63,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[typ]:
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetKnownResources(nextVersionMap)
streams[typ].SetACKedResources(nextVersionMap)
case <-time.After(time.Second):
require.Fail(t, "failed to receive a snapshot response")
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestDeltaRemoveResources(t *testing.T) {
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
nextVersionMap := out.GetNextVersionMap()
// make sure the version maps are different since we no longer are tracking any endpoint resources
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change")
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change")
case <-time.After(time.Second):
assert.Fail(t, "failed to receive snapshot response")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error {
return err
}
resp := <-w
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())
_, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values
return err
}
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Multiple updates
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
Expand All @@ -695,7 +695,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Update/add/delete
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
Expand All @@ -715,7 +715,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Re-add previously deleted watched resource
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
Expand All @@ -732,7 +732,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Wildcard create/update
require.NoError(t, createWildcardDeltaWatch(c, w))
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState Subscripti
}

if exists {
knownResourceNames := clientState.GetKnownResources()
knownResourceNames := clientState.GetACKedResources()
diff := []string{}
for _, r := range request.GetResourceNames() {
if _, ok := knownResourceNames[r]; !ok {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
}
// Update streamState
for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-time.After(2 * time.Second):
t.Errorf("failed to receive snapshot response")
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
updatesByType[typ]++

for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-end:
cancel()
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestSnapshotCacheWatch(t *testing.T) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {
// Request additional resource with name=clusterName2 for same version
go func() {
state := stream.NewSubscriptionState(false, map[string]string{})
state.SetKnownResources(map[string]string{clusterName: fixture.version})
state.SetACKedResources(map[string]string{clusterName: fixture.version})
_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
ResourceNames: []string{clusterName, clusterName2}}, &state, watch)
require.NoError(t, err)
Expand All @@ -521,7 +521,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {

// Repeat request for with same version and make sure a watch is created
state := stream.NewSubscriptionState(false, map[string]string{})
state.SetKnownResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
state.SetACKedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil {
t.Fatal("Should create a watch")
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetKnownResources(resp.GetNextVersionMap())
watch.state.SetACKedResources(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
return nil
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.Subscriptio
// To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either:
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetKnownResources()[resource] = ""
streamState.GetACKedResources()[resource] = ""
}
delete(sv, resource)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
}

typeURL := req.GetTypeUrl()
streamState, ok := sw.streamStates[typeURL]
streamState, ok := sw.streamState[typeURL]
if !ok {
// Supports legacy wildcard mode
// Wildcard will be set to true if no resource is set
Expand All @@ -114,7 +114,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
if lastResponse, ok := sw.lastDiscoveryResponses[typeURL]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResources(lastResponse.resources)
streamState.SetACKedResources(lastResponse.resources)
}
}

Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
})
}

sw.streamStates[req.TypeUrl] = streamState
sw.streamState[req.TypeUrl] = streamState
}
}
}
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type streamWrapper struct {

// The below fields are used for tracking resource
// cache state and should be maintained per stream.
streamStates map[string]stream.SubscriptionState
streamState map[string]stream.SubscriptionState
lastDiscoveryResponses map[string]lastDiscoveryResponse
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque

// a collection of stack allocated watches per request type.
watches: newWatches(),
streamStates: make(map[string]stream.SubscriptionState),
streamState: make(map[string]stream.SubscriptionState),
lastDiscoveryResponses: make(map[string]lastDiscoveryResponse),
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
req.TypeUrl = defaultTypeURL
}

streamState := sw.streamStates[req.TypeUrl]
streamState := sw.streamState[req.TypeUrl]

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil {
Expand All @@ -121,7 +121,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResources(lastResponse.resources)
streamState.SetACKedResources(lastResponse.resources)
}
}

Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
})
}

sw.streamStates[req.TypeUrl] = streamState
sw.streamState[req.TypeUrl] = streamState

// Recompute the dynamic select cases for this stream.
sw.watches.recompute(s.ctx, reqCh)
Expand Down
27 changes: 13 additions & 14 deletions pkg/server/stream/v3/subscription.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package stream

// SubscriptionState will keep track of a resource subscription on a stream.
// SubscriptionState stores the server view of a given type subscription in a stream.
type SubscriptionState struct {
// wildcard is set if the subscription currently has a wildcard watch
// wildcard indicates if the subscription currently has a wildcard watch.
wildcard bool

// subscribedResourceNames provides the resources explicitly requested by the client
// This list might be non-empty even when set as wildcard
// This list might be non-empty even when set as wildcard.
subscribedResourceNames map[string]struct{}

// resourceVersions contains the resources acknowledged by the client and the versions
// associated to them
resourceVersions map[string]string
// ackedResources contains the resources acknowledged by the client and the acknowledged versions.
ackedResources map[string]string
}

// NewSubscriptionState initializes a stream state.
func NewSubscriptionState(wildcard bool, initialResourceVersions map[string]string) SubscriptionState {
state := SubscriptionState{
wildcard: wildcard,
subscribedResourceNames: map[string]struct{}{},
resourceVersions: initialResourceVersions,
ackedResources: initialResourceVersions,
}

if initialResourceVersions == nil {
state.resourceVersions = make(map[string]string)
state.ackedResources = make(map[string]string)
}

return state
Expand All @@ -43,16 +42,16 @@ func (s *SubscriptionState) SetSubscribedResources(subscribedResourceNames map[s
s.subscribedResourceNames = subscribedResourceNames
}

// GetKnownResources returns the list of resources acknowledged by the client
// GetACKedResources returns the list of resources acknowledged by the client
// and their acknowledged version
func (s SubscriptionState) GetKnownResources() map[string]string {
return s.resourceVersions
func (s SubscriptionState) GetACKedResources() map[string]string {
return s.ackedResources
}

// SetKnownResources sets a list of resource versions currently known by the client
// SetACKedResources sets a list of resource versions currently known by the client
// The cache can use this state to compute resources added/updated/deleted
func (s *SubscriptionState) SetKnownResources(resourceVersions map[string]string) {
s.resourceVersions = resourceVersions
func (s *SubscriptionState) SetACKedResources(resourceVersions map[string]string) {
s.ackedResources = resourceVersions
}

// SetWildcard will set the subscription to return all known resources
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetKnownResources()) == 0 {
if len(state.GetACKedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resourceMap))
}
nextVersionMap = make(map[string]string, len(resourceMap))
Expand All @@ -46,14 +46,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// we can just set it here to be used for comparison later
version := versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
for name := range state.GetKnownResources() {
for name := range state.GetACKedResources() {
if _, ok := resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
Expand All @@ -63,7 +63,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if r, ok := resourceMap[name]; ok {
nextVersion := versionMap[name]
if prevVersion != nextVersion {
Expand Down

0 comments on commit cefeb71

Please sign in to comment.