Skip to content

Commit

Permalink
feat(xds): auto reachable services based on MeshTrafficPermission (#8125
Browse files Browse the repository at this point in the history
)

Signed-off-by: Jakub Dyszkiewicz <[email protected]>
Signed-off-by: slonka <[email protected]>
Co-authored-by: slonka <[email protected]>
  • Loading branch information
jakubdyszkiewicz and slonka authored Oct 27, 2023
1 parent d4fb929 commit ed1be22
Show file tree
Hide file tree
Showing 30 changed files with 808 additions and 32 deletions.
3 changes: 3 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,9 @@ experimental:
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL
# If true, then initial full resync is going to be delayed by 0 to FullResyncInterval.
delayFullResync: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC
# If true then control plane computes reachable services automatically based on MeshTrafficPermission.
# Lack of MeshTrafficPermission is treated as Deny the traffic.
autoReachableServices: false # ENV: KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES

proxy:
gateway:
Expand Down
3 changes: 3 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ experimental:
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL
# If true, then initial full resync is going to be delayed by 0 to FullResyncInterval.
delayFullResync: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC
# If true then control plane computes reachable services automatically based on MeshTrafficPermission.
# Lack of MeshTrafficPermission is treated as Deny the traffic.
autoReachableServices: false # ENV: KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES

proxy:
gateway:
Expand Down
1 change: 1 addition & 0 deletions pkg/api-server/api_server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func tryStartApiServer(t *testApiServerConfigurer) (*api_server.ApiServer, kuma_
vips.NewPersistence(resManager, config_manager.NewConfigManager(t.store), false),
cfg.DNSServer.Domain,
80,
xds_context.AnyToAnyReachableServicesGraphBuilder,
),
customization.NewAPIList(),
registry.Global().ObjectDescriptors(model.HasWsEnabled()),
Expand Down
1 change: 1 addition & 0 deletions pkg/api-server/customization/customization_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func createTestApiServer(store store.ResourceStore, config *config_api_server.Ap
vips.NewPersistence(resManager, config_manager.NewConfigManager(store), false),
cfg.DNSServer.Domain,
cfg.DNSServer.ServiceVipPort,
xds_context.AnyToAnyReachableServicesGraphBuilder,
),
wsManager,
registry.Global().ObjectDescriptors(core_model.HasWsEnabled()),
Expand Down
11 changes: 6 additions & 5 deletions pkg/api-server/inspect_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
_ "github.com/kumahq/kuma/pkg/plugins/policies"
"github.com/kumahq/kuma/pkg/plugins/policies/meshtrafficpermission/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/kds/samples"
"github.com/kumahq/kuma/pkg/test/matchers"
Expand Down Expand Up @@ -180,7 +181,7 @@ var _ = Describe("Inspect WS", func() {
},
builders.MeshTrafficPermission().
WithTargetRef(builders.TargetRefMesh()).
AddFrom(builders.TargetRefMesh(), "ALLOW").
AddFrom(builders.TargetRefMesh(), v1alpha1.Allow).
Build(),
},
contentType: restful.MIME_JSON,
Expand Down Expand Up @@ -799,7 +800,7 @@ var _ = Describe("Inspect WS", func() {
builders.MeshTrafficPermission().
WithMesh("mesh-1").
WithTargetRef(builders.TargetRefService("backend")).
AddFrom(builders.TargetRefMesh(), "ALLOW").
AddFrom(builders.TargetRefMesh(), v1alpha1.Allow).
Build(),
},
contentType: restful.MIME_JSON,
Expand Down Expand Up @@ -915,7 +916,7 @@ var _ = Describe("Inspect WS", func() {
samples2.DataplaneWeb(),
builders.MeshTrafficPermission().
WithTargetRef(builders.TargetRefService("web")).
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east"), "DENY").
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east"), v1alpha1.Deny).
Build(),
builders.MeshAccessLog().
WithTargetRef(builders.TargetRefService("web")).
Expand Down Expand Up @@ -943,12 +944,12 @@ var _ = Describe("Inspect WS", func() {
Build(),
builders.MeshTrafficPermission().
WithTargetRef(builders.TargetRefService("web")).
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east"), "DENY").
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east"), v1alpha1.Deny).
Build(),
builders.MeshTrafficPermission().
WithName("mtp-2").
WithTargetRef(builders.TargetRefService("web")).
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east", "version", "2"), "ALLOW").
AddFrom(builders.TargetRefServiceSubset("client", "kuma.io/zone", "east", "version", "2"), v1alpha1.Allow).
Build(),
builders.MeshAccessLog().
WithTargetRef(builders.TargetRefService("web")).
Expand Down
1 change: 1 addition & 0 deletions pkg/api-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ func SetupServer(rt runtime.Runtime) error {
vips.NewPersistence(rt.ResourceManager(), rt.ConfigManager(), cfg.Experimental.UseTagFirstVirtualOutboundModel),
cfg.DNSServer.Domain,
cfg.DNSServer.ServiceVipPort,
xds_context.AnyToAnyReachableServicesGraphBuilder,
),
rt.APIInstaller(),
registry.Global().ObjectDescriptors(model.HasWsEnabled()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
"kuma.io/zone": "east"
},
"conf": {
"action": "DENY"
"action": "Deny"
},
"origins": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"version": "2"
},
"conf": {
"action": "ALLOW"
"action": "Allow"
},
"origins": [
{
Expand Down Expand Up @@ -161,7 +161,7 @@
"version": "!2"
},
"conf": {
"action": "DENY"
"action": "Deny"
},
"origins": [
{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ type ExperimentalConfig struct {
IngressTagFilters []string `json:"ingressTagFilters" envconfig:"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS"`
// KDS event based watchdog settings. It is a more optimal way to generate KDS snapshot config.
KDSEventBasedWatchdog ExperimentalKDSEventBasedWatchdog `json:"kdsEventBasedWatchdog"`
// If true then control plane computes reachable services automatically based on MeshTrafficPermission.
// Lack of MeshTrafficPermission is treated as Deny the traffic.
AutoReachableServices bool `json:"autoReachableServices" envconfig:"KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES"`
}

type ExperimentalKDSEventBasedWatchdog struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ experimental:
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL
# If true, then initial full resync is going to be delayed by 0 to FullResyncInterval.
delayFullResync: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC
# If true then control plane computes reachable services automatically based on MeshTrafficPermission.
# Lack of MeshTrafficPermission is treated as Deny the traffic.
autoReachableServices: false # ENV: KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES

proxy:
gateway:
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KDSEventBasedWatchdog.FlushInterval.Duration).To(Equal(10 * time.Second))
Expect(cfg.Experimental.KDSEventBasedWatchdog.FullResyncInterval.Duration).To(Equal(15 * time.Second))
Expect(cfg.Experimental.KDSEventBasedWatchdog.DelayFullResync).To(BeTrue())
Expect(cfg.Experimental.AutoReachableServices).To(BeTrue())

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
Expect(cfg.EventBus.BufferSize).To(Equal(uint(30)))
Expand Down Expand Up @@ -699,6 +700,7 @@ experimental:
flushInterval: 10s
fullResyncInterval: 15s
delayFullResync: true
autoReachableServices: true
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -962,6 +964,7 @@ tracing:
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL": "10s",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL": "15s",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC": "true",
"KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES": "true",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
"KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317",
"KUMA_TRACING_OPENTELEMETRY_ENABLED": "true",
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/kumahq/kuma/pkg/metrics"
metrics_store "github.com/kumahq/kuma/pkg/metrics/store"
"github.com/kumahq/kuma/pkg/multitenant"
"github.com/kumahq/kuma/pkg/plugins/policies/meshtrafficpermission/graph"
"github.com/kumahq/kuma/pkg/plugins/resources/postgres/config"
"github.com/kumahq/kuma/pkg/tokens/builtin"
tokens_access "github.com/kumahq/kuma/pkg/tokens/builtin/access"
Expand Down Expand Up @@ -500,6 +501,10 @@ func initializeConfigManager(builder *core_runtime.Builder) {
}

func initializeMeshCache(builder *core_runtime.Builder) error {
rsGraphBuilder := xds_context.AnyToAnyReachableServicesGraphBuilder
if builder.Config().Experimental.AutoReachableServices {
rsGraphBuilder = graph.Builder
}
meshContextBuilder := xds_context.NewMeshContextBuilder(
builder.ReadOnlyResourceManager(),
xds_server.MeshResourceTypes(xds_server.HashMeshExcludedResources),
Expand All @@ -508,6 +513,7 @@ func initializeMeshCache(builder *core_runtime.Builder) error {
vips.NewPersistence(builder.ReadOnlyResourceManager(), builder.ConfigManager(), builder.Config().Experimental.UseTagFirstVirtualOutboundModel),
builder.Config().DNSServer.Domain,
builder.Config().DNSServer.ServiceVipPort,
rsGraphBuilder,
)

meshSnapshotCache, err := mesh_cache.NewCache(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package graph_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestGraph(t *testing.T) {
test.RunSpecs(t, "Graph Suite")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package graph

import (
"golang.org/x/exp/maps"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/plugins/policies/core/matchers"
core_rules "github.com/kumahq/kuma/pkg/plugins/policies/core/rules"
mtp_api "github.com/kumahq/kuma/pkg/plugins/policies/meshtrafficpermission/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/controllers"
"github.com/kumahq/kuma/pkg/xds/context"
)

var log = core.Log.WithName("rs-graph")

var SupportedTags = map[string]struct{}{
controllers.KubeNamespaceTag: {},
controllers.KubeServiceTag: {},
controllers.KubePortTag: {},
}

type Graph struct {
rules map[string]core_rules.Rules
}

func NewGraph() *Graph {
return &Graph{
rules: map[string]core_rules.Rules{},
}
}

func (r *Graph) CanReach(fromTags map[string]string, toTags map[string]string) bool {
if _, crossMeshTagExist := toTags[mesh_proto.MeshTag]; crossMeshTagExist {
// we cannot compute graph for cross mesh, so it's better to allow the traffic
return true
}
rule := r.rules[toTags[mesh_proto.ServiceTag]].Compute(core_rules.SubsetFromTags(fromTags))
if rule == nil {
return false
}
action := rule.Conf.(mtp_api.Conf).Action
return action == mtp_api.Allow || action == mtp_api.AllowWithShadowDeny
}

func Builder(meshName string, resources context.Resources) context.ReachableServicesGraph {
services := BuildServices(
meshName,
resources.Dataplanes().Items,
resources.ExternalServices().Items,
resources.ZoneIngresses().Items,
)
mtps := resources.ListOrEmpty(mtp_api.MeshTrafficPermissionType).(*mtp_api.MeshTrafficPermissionResourceList)
return BuildGraph(services, mtps.Items)
}

// BuildServices we could just take result of xds_topology.VIPOutbounds, however it does not have a context of additional tags
func BuildServices(
meshName string,
dataplanes []*mesh.DataplaneResource,
externalServices []*mesh.ExternalServiceResource,
zoneIngresses []*mesh.ZoneIngressResource,
) map[string]mesh_proto.SingleValueTagSet {
services := map[string]mesh_proto.SingleValueTagSet{}
addSvc := func(tags map[string]string) {
svc := tags[mesh_proto.ServiceTag]
if _, ok := services[svc]; ok {
return
}
services[svc] = map[string]string{}
for tag := range SupportedTags {
if value := tags[tag]; value != "" {
services[svc][tag] = value
}
}
}

for _, dp := range dataplanes {
for _, tagSet := range dp.Spec.SingleValueTagSets() {
addSvc(tagSet)
}
}
for _, zi := range zoneIngresses {
for _, availableSvc := range zi.Spec.GetAvailableServices() {
if meshName != availableSvc.Mesh {
continue
}
addSvc(availableSvc.Tags)
}
}
for _, es := range externalServices {
addSvc(es.Spec.Tags)
}
return services
}

func BuildGraph(services map[string]mesh_proto.SingleValueTagSet, mtps []*mtp_api.MeshTrafficPermissionResource) *Graph {
resources := context.Resources{
MeshLocalResources: map[core_model.ResourceType]core_model.ResourceList{
mtp_api.MeshTrafficPermissionType: &mtp_api.MeshTrafficPermissionResourceList{
Items: trimNotSupportedTags(mtps),
},
},
}

graph := NewGraph()

for service, tags := range services {
// build artificial dpp for matching
dp := mesh.NewDataplaneResource()
dpTags := maps.Clone(tags)
dpTags[mesh_proto.ServiceTag] = service
dp.Spec = &mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "1.1.1.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Tags: dpTags,
Port: 1234,
},
},
},
}

matched, err := matchers.MatchedPolicies(mtp_api.MeshTrafficPermissionType, dp, resources)
if err != nil {
log.Error(err, "service could not be matched. It won't be reached by any other service", "service", service)
continue // it's better to ignore one service that to break the whole graph
}

rl, ok := matched.FromRules.Rules[core_rules.InboundListener{
Address: "1.1.1.1",
Port: 1234,
}]
if !ok {
continue
}

graph.rules[service] = rl
}

return graph
}

// trimNotSupportedTags replaces tags present in subsets of top-level target ref.
// Because we need to do policy matching on services instead of individual proxies, we have to handle subsets in a special way.
// What we do is we only support subsets with predefined tags listed in SupportedTags.
// This assumes that tags listed in SupportedTags have the same value between all instances of a given service.
// Otherwise, we trim the tags making the target ref subset wider.
//
// Alternatively, we could have computed all common tags between instances of a given service and then allow subsets with those common tags.
// However, this would require calling this function for every service.
func trimNotSupportedTags(mtps []*mtp_api.MeshTrafficPermissionResource) []*mtp_api.MeshTrafficPermissionResource {
newMtps := make([]*mtp_api.MeshTrafficPermissionResource, len(mtps))
for i, mtp := range mtps {
if len(mtp.Spec.TargetRef.Tags) > 0 {
filteredTags := map[string]string{}
for tag, val := range mtp.Spec.TargetRef.Tags {
if _, ok := SupportedTags[tag]; ok {
filteredTags[tag] = val
}
}
if len(filteredTags) != len(mtp.Spec.TargetRef.Tags) {
mtp = &mtp_api.MeshTrafficPermissionResource{
Meta: mtp.Meta,
Spec: mtp.Spec.DeepCopy(),
}
mtp.Spec.TargetRef.Tags = filteredTags
}
}
newMtps[i] = mtp
}
return newMtps
}
Loading

0 comments on commit ed1be22

Please sign in to comment.