Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): move protocol information to mesh context #8479

Merged
merged 24 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions pkg/plugins/policies/core/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/metadata"
"github.com/kumahq/kuma/pkg/xds/envoy/tags"
Expand Down Expand Up @@ -75,18 +74,6 @@ func GatherTargetedClusters(
return targetedClusters
}

// InferProtocol infers protocol for the destination listener.
// It will only return HTTP when all endpoints are tagged with HTTP.
func InferProtocol(routing core_xds.Routing, serviceName string) core_mesh.Protocol {
var allEndpoints []core_xds.Endpoint
outboundEndpoints := core_xds.EndpointList(routing.OutboundTargets[serviceName])
allEndpoints = append(allEndpoints, outboundEndpoints...)
externalEndpoints := routing.ExternalServiceOutboundTargets[serviceName]
allEndpoints = append(allEndpoints, externalEndpoints...)

return generator.InferServiceProtocol(allEndpoints)
}

func HasExternalService(routing core_xds.Routing, serviceName string) bool {
// We assume that all the targets are either ExternalServices or not
// therefore we check only the first one
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/policies/core/xds/meshroute/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func GenerateClusters(

for _, serviceName := range services.Sorted() {
service := services[serviceName]
protocol := generator.InferProtocol(proxy, service.Clusters())
protocol := meshCtx.GetServiceProtocol(serviceName)
tlsReady := service.TLSReady()

for _, cluster := range service.Clusters() {
Expand Down
9 changes: 8 additions & 1 deletion pkg/plugins/policies/core/xds/meshroute/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
core_xds "github.com/kumahq/kuma/pkg/core/xds"
plugins_xds "github.com/kumahq/kuma/pkg/plugins/policies/core/xds"
"github.com/kumahq/kuma/pkg/util/pointer"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/envoy/tags"
envoy_tags "github.com/kumahq/kuma/pkg/xds/envoy/tags"
Expand All @@ -19,6 +20,7 @@ func MakeTCPSplit(
clusterCache map[common_api.TargetRefHash]string,
servicesAcc envoy_common.ServicesAccumulator,
refs []common_api.BackendRef,
meshCtx xds_context.MeshContext,
) []envoy_common.Split {
return makeSplit(
proxy,
Expand All @@ -31,6 +33,7 @@ func MakeTCPSplit(
clusterCache,
servicesAcc,
refs,
meshCtx,
)
}

Expand All @@ -39,6 +42,7 @@ func MakeHTTPSplit(
clusterCache map[common_api.TargetRefHash]string,
servicesAcc envoy_common.ServicesAccumulator,
refs []common_api.BackendRef,
meshCtx xds_context.MeshContext,
) []envoy_common.Split {
return makeSplit(
proxy,
Expand All @@ -49,6 +53,7 @@ func MakeHTTPSplit(
clusterCache,
servicesAcc,
refs,
meshCtx,
)
}

Expand All @@ -58,6 +63,7 @@ func makeSplit(
clusterCache map[common_api.TargetRefHash]string,
servicesAcc envoy_common.ServicesAccumulator,
refs []common_api.BackendRef,
meshCtx xds_context.MeshContext,
) []envoy_common.Split {
var split []envoy_common.Split

Expand All @@ -73,7 +79,8 @@ func makeSplit(
continue
}

if _, ok := protocols[plugins_xds.InferProtocol(proxy.Routing, service)]; !ok {
protocol := meshCtx.GetServiceProtocol(service)
if _, ok := protocols[protocol]; !ok {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kumahq/kuma/pkg/test/matchers"
"github.com/kumahq/kuma/pkg/test/resources/builders"
test_model "github.com/kumahq/kuma/pkg/test/resources/model"
"github.com/kumahq/kuma/pkg/test/resources/samples"
xds_builders "github.com/kumahq/kuma/pkg/test/xds/builders"
xds_samples "github.com/kumahq/kuma/pkg/test/xds/samples"
"github.com/kumahq/kuma/pkg/util/pointer"
Expand Down Expand Up @@ -951,7 +952,12 @@ var _ = Describe("MeshAccessLog", func() {
Items: given.routes,
}

xdsCtx := xds_samples.SampleContextWith(resources)
xdsCtx := *xds_builders.Context().
WithMesh(samples.MeshDefaultBuilder()).
WithResources(resources).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
AddServiceProtocol("other-service", core_mesh.ProtocolHTTP).
Build()
proxy := xds_builders.Proxy().
WithMetadata(&core_xds.DataplaneMetadata{
AccessLogSocketPath: "/tmp/foo",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,11 @@ var _ = Describe("MeshCircuitBreaker", func() {
Items: []*core_mesh.MeshGatewayRouteResource{samples.BackendGatewayRoute()},
}

xdsCtx := xds_samples.SampleContextWith(resources)
xdsCtx := *xds_builders.Context().
WithMesh(samples.MeshDefaultBuilder()).
WithResources(resources).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build()
proxy := xds_builders.Proxy().
WithDataplane(samples.GatewayDataplaneBuilder()).
WithPolicies(xds_builders.MatchedPolicies().WithGatewayPolicy(api.MeshCircuitBreakerType, given.rules)).
Expand Down
14 changes: 9 additions & 5 deletions pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *

clusters := policies_xds.GatherClusters(rs)

if err := applyToOutbounds(policies.ToRules, clusters.Outbound, clusters.OutboundSplit, proxy.Dataplane, proxy.Routing); err != nil {
if err := applyToOutbounds(policies.ToRules, clusters.Outbound, clusters.OutboundSplit, proxy.Dataplane, ctx.Mesh); err != nil {
return err
}

Expand All @@ -47,13 +47,17 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *
return nil
}

func applyToOutbounds(rules core_rules.ToRules, outboundClusters map[string]*envoy_cluster.Cluster, outboundSplitClusters map[string][]*envoy_cluster.Cluster, dataplane *core_mesh.DataplaneResource, routing core_xds.Routing) error {
func applyToOutbounds(
rules core_rules.ToRules,
outboundClusters map[string]*envoy_cluster.Cluster,
outboundSplitClusters map[string][]*envoy_cluster.Cluster,
dataplane *core_mesh.DataplaneResource,
meshCtx xds_context.MeshContext,
) error {
targetedClusters := policies_xds.GatherTargetedClusters(dataplane.Spec.Networking.GetOutbound(), outboundSplitClusters, outboundClusters)

for cluster, serviceName := range targetedClusters {
protocol := policies_xds.InferProtocol(routing, serviceName)

if err := configure(dataplane, rules.Rules, core_rules.MeshService(serviceName), protocol, cluster); err != nil {
if err := configure(dataplane, rules.Rules, core_rules.MeshService(serviceName), meshCtx.GetServiceProtocol(serviceName), cluster); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ var _ = Describe("MeshHealthCheck", func() {
resources.Add(&r)
}

context := xds_samples.SampleContext()
context := *xds_builders.Context().
WithMesh(samples.MeshDefaultBuilder()).
WithResources(xds_context.NewResources()).
AddServiceProtocol(httpServiceTag, core_mesh.ProtocolHTTP).
AddServiceProtocol(tcpServiceTag, core_mesh.ProtocolTCP).
AddServiceProtocol(grpcServiceTag, core_mesh.ProtocolGRPC).
AddServiceProtocol(splitHttpServiceTag, core_mesh.ProtocolHTTP).
Build()
proxy := xds_builders.Proxy().
WithDataplane(
samples.DataplaneBackendBuilder().
Expand Down Expand Up @@ -246,7 +253,11 @@ var _ = Describe("MeshHealthCheck", func() {
Items: []*core_mesh.MeshGatewayRouteResource{samples.BackendGatewayRoute()},
}

xdsCtx := xds_samples.SampleContextWith(resources)
xdsCtx := *xds_builders.Context().
WithMesh(samples.MeshDefaultBuilder()).
WithResources(resources).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build()
proxy := xds_builders.Proxy().
WithDataplane(samples.GatewayDataplaneBuilder()).
WithPolicies(xds_builders.MatchedPolicies().WithGatewayPolicy(api.MeshHealthCheckType, given.rules)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
core_rules "github.com/kumahq/kuma/pkg/plugins/policies/core/rules"
plugins_xds "github.com/kumahq/kuma/pkg/plugins/policies/core/xds"
meshroute_xds "github.com/kumahq/kuma/pkg/plugins/policies/core/xds/meshroute"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/xds"
"github.com/kumahq/kuma/pkg/util/pointer"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
envoy_listeners "github.com/kumahq/kuma/pkg/xds/envoy/listeners"
envoy_listeners_v3 "github.com/kumahq/kuma/pkg/xds/envoy/listeners/v3"
Expand All @@ -26,6 +26,7 @@ func generateListeners(
proxy *core_xds.Proxy,
rules []ToRouteRule,
servicesAcc envoy_common.ServicesAccumulator,
meshCtx xds_context.MeshContext,
) (*core_xds.ResourceSet, error) {
resources := core_xds.NewResourceSet()
// ClusterCache (cluster hash -> cluster name) protects us from creating excessive amount of clusters.
Expand All @@ -48,10 +49,10 @@ func generateListeners(
NormalizePath: true,
}))

protocol := plugins_xds.InferProtocol(proxy.Routing, serviceName)
protocol := meshCtx.GetServiceProtocol(serviceName)
var routes []xds.OutboundRoute
for _, route := range prepareRoutes(rules, serviceName, protocol) {
split := meshroute_xds.MakeHTTPSplit(proxy, clusterCache, servicesAcc, route.BackendRefs)
split := meshroute_xds.MakeHTTPSplit(proxy, clusterCache, servicesAcc, route.BackendRefs, meshCtx)
lahabana marked this conversation as resolved.
Show resolved Hide resolved
if split == nil {
continue
}
Expand All @@ -63,7 +64,9 @@ func generateListeners(
[]common_api.BackendRef{{
TargetRef: filter.RequestMirror.BackendRef,
Weight: pointer.To[uint](1), // any non-zero value
}})
}},
meshCtx,
)
}
}
routes = append(routes, xds.OutboundRoute{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ func ApplyToOutbounds(
xdsCtx xds_context.Context,
rules []ToRouteRule,
) error {
servicesAcc := envoy_common.NewServicesAccumulator(xdsCtx.Mesh.ServiceTLSReadiness)
tlsReady := xdsCtx.Mesh.GetTLSReadiness()
servicesAcc := envoy_common.NewServicesAccumulator(tlsReady)

listeners, err := generateListeners(proxy, rules, servicesAcc)
listeners, err := generateListeners(proxy, rules, servicesAcc, xdsCtx.Mesh)
if err != nil {
return errors.Wrap(err, "couldn't generate listener resources")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "backend", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -241,7 +243,9 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "backend", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -322,7 +326,10 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "other-tcp", mesh_proto.ProtocolTag, core_mesh.ProtocolTCP, "region", "eu"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
AddServiceProtocol("other-tcp", core_mesh.ProtocolTCP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -360,7 +367,9 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "backend", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -436,7 +445,9 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "backend", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -481,7 +492,9 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "backend", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down Expand Up @@ -536,7 +549,10 @@ var _ = Describe("MeshHTTPRoute", func() {
WithWeight(1).
WithTags(mesh_proto.ServiceTag, "payments", mesh_proto.ProtocolTag, core_mesh.ProtocolHTTP, "region", "us", "version", "v1", "env", "dev"))
return outboundsTestCase{
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).Build(),
xdsContext: *xds_builders.Context().WithEndpointMap(outboundTargets).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
AddServiceProtocol("payments", core_mesh.ProtocolHTTP).
Build(),
proxy: xds_builders.Proxy().
WithDataplane(samples.DataplaneWebBuilder()).
WithRouting(xds_builders.Routing().WithOutboundTargets(outboundTargets)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ var _ = Describe("MeshLoadBalancingStrategy", func() {
xdsCtx := *xds_builders.Context().
WithResources(resources).
WithEndpointMap(given.endpointMap).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build()
proxy := xds_builders.Proxy().
WithZone("test-zone").
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *
listeners := xds.GatherListeners(rs)
routes := xds.GatherRoutes(rs)

if err := applyToOutbounds(policies.ToRules, listeners.Outbound, proxy.Dataplane, proxy.Routing); err != nil {
if err := applyToOutbounds(policies.ToRules, listeners.Outbound, proxy.Dataplane, ctx.Mesh); err != nil {
return err
}

Expand All @@ -53,15 +53,15 @@ func applyToOutbounds(
rules core_rules.ToRules,
outboundListeners map[mesh_proto.OutboundInterface]*envoy_listener.Listener,
dataplane *core_mesh.DataplaneResource,
routing core_xds.Routing,
meshCtx xds_context.MeshContext,
) error {
for _, outbound := range dataplane.Spec.Networking.GetOutbound() {
oface := dataplane.Spec.Networking.ToOutboundInterface(outbound)
serviceName := outbound.GetService()

configurer := plugin_xds.Configurer{
Retry: core_rules.ComputeConf[api.Conf](rules.Rules, core_rules.MeshService(serviceName)),
Protocol: xds.InferProtocol(routing, serviceName),
Protocol: meshCtx.GetServiceProtocol(serviceName),
}

listener, ok := outboundListeners[oface]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ var _ = Describe("MeshRetry", func() {
resourceSet.Add(&r)
}

context := xds_samples.SampleContext()
context := *xds_builders.Context().
WithMesh(samples.MeshDefaultBuilder()).
WithResources(xds_context.NewResources()).
AddServiceProtocol("http-service", core_mesh.ProtocolHTTP).
AddServiceProtocol("tcp-service", core_mesh.ProtocolTCP).
AddServiceProtocol("grpc-service", core_mesh.ProtocolGRPC).
AddServiceProtocol("backend", core_mesh.ProtocolHTTP).
Build()

proxy := xds_builders.Proxy().
WithDataplane(builders.Dataplane().
WithName("backend").
Expand Down
Loading
Loading