Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MeshHTTPRoute): basic MeshGateway support #8402

Merged
109 changes: 109 additions & 0 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package v1alpha1

import (
"context"

"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
plugin_gateway "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_listeners "github.com/kumahq/kuma/pkg/xds/envoy/listeners"
envoy_routes "github.com/kumahq/kuma/pkg/xds/envoy/routes"
)

var FilterChainGenerators = map[mesh_proto.MeshGateway_Listener_Protocol]plugin_gateway.FilterChainGenerator{
mesh_proto.MeshGateway_Listener_HTTP: &plugin_gateway.HTTPFilterChainGenerator{},
mesh_proto.MeshGateway_Listener_HTTPS: &plugin_gateway.HTTPSFilterChainGenerator{},
mesh_proto.MeshGateway_Listener_TCP: &plugin_gateway.TCPFilterChainGenerator{},
}

func generateGatewayListeners(
ctx xds_context.Context,
info plugin_gateway.GatewayListenerInfo,
hostInfos []plugin_gateway.GatewayHostInfo,
) (*core_xds.ResourceSet, *plugin_gateway.RuntimeResoureLimitListener, error) {
resources := core_xds.NewResourceSet()

listenerBuilder, limit := plugin_gateway.GenerateListener(info)

var gatewayHosts []plugin_gateway.GatewayHost
for _, hostInfo := range hostInfos {
gatewayHosts = append(gatewayHosts, hostInfo.Host)
}

protocol := info.Listener.Protocol
if info.Listener.CrossMesh {
protocol = mesh_proto.MeshGateway_Listener_HTTPS
}
res, filterChainBuilders, err := FilterChainGenerators[protocol].Generate(ctx, info, gatewayHosts)
if err != nil {
return nil, limit, err
}
resources.AddSet(res)

for _, filterChainBuilder := range filterChainBuilders {
listenerBuilder.Configure(envoy_listeners.FilterChain(filterChainBuilder))
}

res, err = plugin_gateway.BuildResourceSet(listenerBuilder)
if err != nil {
return nil, limit, errors.Wrapf(err, "failed to build listener resource")
}
resources.AddSet(res)

return resources, limit, nil
}

func generateGatewayClusters(
ctx context.Context,
xdsCtx xds_context.Context,
info plugin_gateway.GatewayListenerInfo,
hostInfos []plugin_gateway.GatewayHostInfo,
) (*core_xds.ResourceSet, error) {
resources := core_xds.NewResourceSet()

gen := plugin_gateway.ClusterGenerator{Zone: xdsCtx.ControlPlane.Zone}
for _, hostInfo := range hostInfos {
clusterRes, err := gen.GenerateClusters(ctx, xdsCtx, info, hostInfo.Entries, hostInfo.Host.Tags)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate clusters for dataplane %q", info.Proxy.Id)
}
resources.AddSet(clusterRes)
}

return resources, nil
}

func generateGatewayRoutes(
ctx xds_context.Context, info plugin_gateway.GatewayListenerInfo, hostInfos []plugin_gateway.GatewayHostInfo,
) (*core_xds.ResourceSet, error) {
switch info.Listener.Protocol {
case mesh_proto.MeshGateway_Listener_HTTPS,
mesh_proto.MeshGateway_Listener_HTTP:
default:
return nil, nil
}

resources := core_xds.NewResourceSet()
routeConfig := plugin_gateway.GenerateRouteConfig(info)

// Make a pass over the generators for each virtual host.
for _, hostInfo := range hostInfos {
vh, err := plugin_gateway.GenerateVirtualHost(ctx, info, hostInfo.Host, hostInfo.Entries)
if err != nil {
return nil, err
}

routeConfig.Configure(envoy_routes.VirtualHost(vh))
}

res, err := plugin_gateway.BuildResourceSet(routeConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to build route configuration resource")
}
resources.AddSet(res)

return resources, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package v1alpha1

import (
"slices"
"strings"

core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
plugin_gateway "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
"github.com/kumahq/kuma/pkg/xds/envoy/tags"
)

func GenerateEnvoyRouteEntries(host plugin_gateway.GatewayHost, toRules []ToRouteRule) []route.Entry {
var entries []route.Entry

// Index the routes by their path. There are typically multiple
// routes per path with additional matching criteria.
exactEntries := map[string][]route.Entry{}
prefixEntries := map[string][]route.Entry{}

for _, rule := range toRules[0].Rules {
var names []string
for _, orig := range toRules[0].Origin {
names = append(names, orig.GetName())
}
slices.Sort(names)
entry := makeHttpRouteEntry(strings.Join(names, "_"), rule)

// The rule matches if any of the matches is successful (it has OR
// semantics). That means that we have to duplicate the route table
// entry for each repeated match so that the rule can match any of
// the criteria.
for _, m := range rule.Matches {
routeEntry := entry // Shallow copy.
routeEntry.Match = makeRouteMatch(m)

switch {
case routeEntry.Match.ExactPath != "":
exactEntries[routeEntry.Match.ExactPath] = append(exactEntries[routeEntry.Match.ExactPath], routeEntry)
case routeEntry.Match.PrefixPath != "":
prefixEntries[routeEntry.Match.PrefixPath] = append(prefixEntries[routeEntry.Match.PrefixPath], routeEntry)
default:
entries = append(entries, routeEntry)
}
}
}

return plugin_gateway.HandlePrefixMatchesAndPopulatePolicies(host, exactEntries, prefixEntries, entries)
}

func makeHttpRouteEntry(name string, rule api.Rule) route.Entry {
entry := route.Entry{
Route: name,
}

for _, b := range *rule.Default.BackendRefs {
dest, ok := tags.TagsFromTargetRef(b.TargetRef)
if !ok {
// This should be caught by validation
continue
}
target := route.Destination{
Destination: dest,
Weight: uint32(*b.Weight),
Policies: nil,
RouteProtocol: core_mesh.ProtocolHTTP,
}

entry.Action.Forward = append(entry.Action.Forward, target)
}

return entry
}

func makeRouteMatch(ruleMatch api.Match) route.Match {
match := route.Match{}

if p := ruleMatch.Path; p != nil {
switch p.Type {
case api.Exact:
match.ExactPath = p.Value
case api.PathPrefix:
match.PrefixPath = p.Value
case api.RegularExpression:
match.RegexPath = p.Value
}
} else {
// Envoy routes require a path match, so if the route
// didn't specify, we match any path so that the additional
// match criteria will be applied.
match.PrefixPath = "/"
}

return match
}
67 changes: 61 additions & 6 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import (
"context"

"github.com/pkg/errors"

common_api "github.com/kumahq/kuma/api/common/v1alpha1"
Expand All @@ -12,6 +14,7 @@ import (
"github.com/kumahq/kuma/pkg/plugins/policies/core/rules"
"github.com/kumahq/kuma/pkg/plugins/policies/core/xds/meshroute"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
plugin_gateway "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
)
Expand Down Expand Up @@ -45,7 +48,7 @@ func (p plugin) MatchedPolicies(dataplane *core_mesh.DataplaneResource, resource
return matchers.MatchedPolicies(api.MeshHTTPRouteType, dataplane, resources)
}

func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *core_xds.Proxy) error {
func (p plugin) Apply(rs *core_xds.ResourceSet, xdsCtx xds_context.Context, proxy *core_xds.Proxy) error {
// These policies have already been merged using the custom `GetDefault`
// method and therefore are of the
// `ToRouteRule` type, where rules have been appended together.
Expand All @@ -64,19 +67,25 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *
})
}

if err := ApplyToOutbounds(proxy, rs, ctx, toRules); err != nil {
if err := ApplyToOutbounds(proxy, rs, xdsCtx, toRules); err != nil {
return err
}

ctx := context.TODO()
if err := ApplyToGateway(ctx, proxy, rs, xdsCtx, toRules); err != nil {
return err
}

return nil
}

func ApplyToOutbounds(
proxy *core_xds.Proxy,
rs *core_xds.ResourceSet,
ctx xds_context.Context,
xdsCtx xds_context.Context,
rules []ToRouteRule,
) error {
servicesAcc := envoy_common.NewServicesAccumulator(ctx.Mesh.ServiceTLSReadiness)
servicesAcc := envoy_common.NewServicesAccumulator(xdsCtx.Mesh.ServiceTLSReadiness)

listeners, err := generateListeners(proxy, rules, servicesAcc)
if err != nil {
Expand All @@ -86,17 +95,63 @@ func ApplyToOutbounds(

services := servicesAcc.Services()

clusters, err := meshroute.GenerateClusters(proxy, ctx.Mesh, services)
clusters, err := meshroute.GenerateClusters(proxy, xdsCtx.Mesh, services)
if err != nil {
return errors.Wrap(err, "couldn't generate cluster resources")
}
rs.AddSet(clusters)

endpoints, err := meshroute.GenerateEndpoints(proxy, ctx, services)
endpoints, err := meshroute.GenerateEndpoints(proxy, xdsCtx, services)
if err != nil {
return errors.Wrap(err, "couldn't generate endpoint resources")
}
rs.AddSet(endpoints)

return nil
}

func ApplyToGateway(
ctx context.Context,
proxy *core_xds.Proxy,
resources *core_xds.ResourceSet,
xdsCtx xds_context.Context,
rules []ToRouteRule,
) error {
var limits []plugin_gateway.RuntimeResoureLimitListener

for _, info := range plugin_gateway.ExtractGatewayListeners(proxy) {
var hostInfos []plugin_gateway.GatewayHostInfo
for _, info := range info.HostInfos {
hostInfos = append(hostInfos, plugin_gateway.GatewayHostInfo{
Host: info.Host,
Entries: GenerateEnvoyRouteEntries(info.Host, rules),
})
}

cdsResources, err := generateGatewayClusters(ctx, xdsCtx, info, hostInfos)
if err != nil {
return err
}
resources.AddSet(cdsResources)

ldsResources, limit, err := generateGatewayListeners(xdsCtx, info, hostInfos)
if err != nil {
return err
}
resources.AddSet(ldsResources)

if limit != nil {
limits = append(limits, *limit)
}

rdsResources, err := generateGatewayRoutes(xdsCtx, info, hostInfos)
if err != nil {
return err
}
resources.AddSet(rdsResources)
}

resources.Add(plugin_gateway.GenerateRTDS(limits))

return nil
}
Loading