Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <[email protected]>
  • Loading branch information
skriss committed Mar 1, 2024
1 parent 3db1043 commit 85e46e6
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func (x *xdsServer) Start(ctx context.Context) error {

switch x.config.Type {
case contour_v1alpha1.EnvoyServerType:
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, x.snapshotHandler.MuxCache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, x.snapshotHandler.GetCache(), contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
case contour_v1alpha1.ContourServerType:
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(x.resources)...), grpcServer)
default:
Expand Down
1 change: 0 additions & 1 deletion internal/envoy/v3/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func bootstrapConfig(c *envoy.BootstrapConfig) *envoy_config_bootstrap_v3.Bootst
AltStatName: strings.Join([]string{c.Namespace, "contour", strconv.Itoa(c.GetXdsGRPCPort())}, "_"),
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: ClusterDiscoveryTypeForAddress(c.GetXdsAddress(), envoy_config_cluster_v3.Cluster_STRICT_DNS),
DnsRefreshRate: durationpb.New(time.Hour),
LbPolicy: envoy_config_cluster_v3.Cluster_ROUND_ROBIN,
LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{
ClusterName: "contour",
Expand Down
11 changes: 10 additions & 1 deletion internal/xdscache/v3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ func (c *ClusterCache) Contents() []proto.Message {
}

func (c *ClusterCache) ContentsByName() map[string]proto.Message {
panic("not implemented")
c.mu.Lock()
defer c.mu.Unlock()

values := make(map[string]proto.Message, len(c.values))

for name, val := range c.values {
values[name] = val
}

return values
}

func (c *ClusterCache) Query(names []string) []proto.Message {
Expand Down
14 changes: 13 additions & 1 deletion internal/xdscache/v3/endpointslicetranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) {
if changed {
e.Debug("cluster load assignments changed, notifying waiters")
e.Notify()
if e.Observer != nil {
e.Observer.Refresh()
}
} else {
e.Debug("cluster load assignments did not change")
}
Expand Down Expand Up @@ -446,7 +449,16 @@ func (e *EndpointSliceTranslator) Contents() []proto.Message {
}

func (e *EndpointSliceTranslator) ContentsByName() map[string]proto.Message {
panic("not implemented")
e.mu.Lock()
defer e.mu.Unlock()

values := make(map[string]proto.Message, len(e.entries))

for name, val := range e.entries {
values[name] = val
}

return values
}

func (e *EndpointSliceTranslator) Query(names []string) []proto.Message {
Expand Down
11 changes: 10 additions & 1 deletion internal/xdscache/v3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,16 @@ func (c *ListenerCache) Contents() []proto.Message {
}

func (c *ListenerCache) ContentsByName() map[string]proto.Message {
panic("not implemented")
c.mu.Lock()
defer c.mu.Unlock()

values := make(map[string]proto.Message, len(c.values))

for name, val := range c.values {
values[name] = val
}

return values
}

// Query returns the proto.Messages in the ListenerCache that match
Expand Down
11 changes: 10 additions & 1 deletion internal/xdscache/v3/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,16 @@ func (c *RouteCache) Contents() []proto.Message {
}

func (c *RouteCache) ContentsByName() map[string]proto.Message {
panic("not implemented")
c.mu.Lock()
defer c.mu.Unlock()

values := make(map[string]proto.Message, len(c.values))

for name, val := range c.values {
values[name] = val
}

return values
}

// Query searches the RouteCache for the named RouteConfiguration entries.
Expand Down
11 changes: 10 additions & 1 deletion internal/xdscache/v3/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,16 @@ func (c *SecretCache) Contents() []proto.Message {
}

func (c *SecretCache) ContentsByName() map[string]proto.Message {
panic("not implemented")
c.mu.Lock()
defer c.mu.Unlock()

values := make(map[string]proto.Message, len(c.values))

for name, val := range c.values {
values[name] = val
}

return values
}

func (c *SecretCache) Query(names []string) []proto.Message {
Expand Down
32 changes: 15 additions & 17 deletions internal/xdscache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,18 @@ import (
// SnapshotHandler responds to DAG builds via the OnChange()
// event and generates and caches go-control-plane Snapshots.
type SnapshotHandler struct {
// SnapshotCache contains go-control-plane Snapshots
// and is used by the go-control-plane xDS server.
SnapshotCache envoy_cache_v3.SnapshotCache

LinearCache *envoy_cache_v3.LinearCache

MuxCache envoy_cache_v3.Cache

// resources contains the Contour xDS resource caches.
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
log logrus.FieldLogger
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
snapshotCache envoy_cache_v3.SnapshotCache
edsCache *envoy_cache_v3.LinearCache
muxCache *envoy_cache_v3.MuxCache
log logrus.FieldLogger
}

// NewSnapshotHandler returns an instance of SnapshotHandler.
func NewSnapshotHandler(resources []xdscache.ResourceCache, snapshotCache envoy_cache_v3.SnapshotCache, logger logrus.FieldLogger) *SnapshotHandler {
edsCache := envoy_cache_v3.NewLinearCache(envoy_resource_v3.EndpointType)

muxCache := envoy_cache_v3.MuxCache{
muxCache := &envoy_cache_v3.MuxCache{
Caches: map[string]envoy_cache_v3.Cache{
envoy_resource_v3.ListenerType: snapshotCache,
envoy_resource_v3.ClusterType: snapshotCache,
Expand All @@ -66,13 +60,17 @@ func NewSnapshotHandler(resources []xdscache.ResourceCache, snapshotCache envoy_

return &SnapshotHandler{
resources: parseResources(resources),
SnapshotCache: snapshotCache,
MuxCache: &muxCache,
LinearCache: edsCache,
snapshotCache: snapshotCache,
edsCache: edsCache,
muxCache: muxCache,
log: logger,
}
}

func (s *SnapshotHandler) GetCache() envoy_cache_v3.Cache {
return s.muxCache
}

// Refresh is called when the EndpointsTranslator updates values
// in its cache. It updates the ClusterLoadAssignments linear cache.
func (s *SnapshotHandler) Refresh() {
Expand All @@ -83,7 +81,7 @@ func (s *SnapshotHandler) Refresh() {
resources[name] = val
}

s.LinearCache.SetResources(resources)
s.edsCache.SetResources(resources)
}

// OnChange is called when the DAG is rebuilt and a new snapshot is needed.
Expand All @@ -108,7 +106,7 @@ func (s *SnapshotHandler) OnChange(*dag.DAG) {
return
}

if err := s.SnapshotCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
if err := s.snapshotCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
return
}
Expand Down

0 comments on commit 85e46e6

Please sign in to comment.