From 1569468deae640ff242be96183c5099e788fc9f4 Mon Sep 17 00:00:00 2001 From: "radoslaw.chrzanowski" Date: Mon, 7 Nov 2022 08:29:31 +0100 Subject: [PATCH] ingress gateways --- .../servicemesh/envoycontrol/ControlPlane.kt | 14 ++ .../servicemesh/envoycontrol/groups/Groups.kt | 23 +- .../envoycontrol/groups/MetadataNodeGroup.kt | 56 +++-- .../envoycontrol/groups/NodeMetadata.kt | 5 + .../snapshot/EnvoySnapshotFactory.kt | 58 ++++- .../snapshot/SnapshotProperties.kt | 1 + .../IngressGatewayPortMappingsCache.kt | 29 +++ .../resource/clusters/EnvoyClustersFactory.kt | 15 +- .../EnvoyIngressGatewayListenersFactory.kt | 73 +++++++ .../EnvoyIngresGatewayIngressRoutesFactory.kt | 170 +++++++++++++++ .../SyncableLocalServiceStateCreator.kt | 69 ++++++ .../envoycontrol/EnvoySnapshotFactoryTest.kt | 91 +++++++- .../snapshot/SnapshotUpdaterTest.kt | 3 + .../infrastructure/ControlPlaneConfig.kt | 17 +- .../synchronization/StateController.kt | 7 +- .../ConsulLocalClusterStateChanges.kt | 1 + .../envoycontrol/IngressGatewayTest.kt | 198 ++++++++++++++++++ .../config/EnvoyControlTestConfiguration.kt | 5 + .../config/envoy/EnvoyContainer.kt | 13 +- .../config/envoy/EnvoyExtension.kt | 7 +- .../main/resources/envoy/config_gateway.yaml | 104 +++++++++ 21 files changed, 921 insertions(+), 38 deletions(-) create mode 100644 envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/IngressGatewayPortMappingsCache.kt create mode 100644 envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/EnvoyIngressGatewayListenersFactory.kt create mode 100644 envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngresGatewayIngressRoutesFactory.kt create mode 100644 envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/SyncableLocalServiceStateCreator.kt create mode 100644 envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/IngressGatewayTest.kt create mode 100644 envoy-control-tests/src/main/resources/envoy/config_gateway.yaml diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index ac431e297..076feb1a5 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -29,8 +29,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.NoopSnapshotChangeAudit import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotChangeAuditor import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotUpdater import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotsVersions +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyIngressGatewayListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEgressRoutesFactory @@ -97,6 +99,7 @@ class ControlPlane private constructor( var groupSnapshotParallelExecutorSupplier: () -> Executor? = { null } var metrics: EnvoyControlMetrics = DefaultEnvoyControlMetrics(meterRegistry = meterRegistry) var envoyHttpFilters: EnvoyHttpFilters = EnvoyHttpFilters.emptyFilters + var ingressGatewayPortMappingsCache: IngressGatewayPortMappingsCache? = null var snapshotChangeAuditor: SnapshotChangeAuditor = NoopSnapshotChangeAuditor var nodeGroup: NodeGroup = MetadataNodeGroup( @@ -138,6 +141,10 @@ class ControlPlane private constructor( ) } + if (ingressGatewayPortMappingsCache == null) { + ingressGatewayPortMappingsCache = IngressGatewayPortMappingsCache() + } + val groupSnapshotProperties = properties.server.groupSnapshotUpdateScheduler val groupSnapshotScheduler = buildGroupSnapshotScheduler(groupSnapshotProperties) @@ -176,6 +183,9 @@ class ControlPlane private constructor( snapshotProperties, envoyHttpFilters ), + ingressGatewayListenersFactory = EnvoyIngressGatewayListenersFactory( + mappingsCache = ingressGatewayPortMappingsCache!! + ), // Remember when LDS change we have to send RDS again snapshotsVersions = snapshotsVersions, properties = snapshotProperties, @@ -352,6 +362,10 @@ class ControlPlane private constructor( this.envoyHttpFilters = envoyHttpFilters return this } + fun withIngressGatewayPortMappingsCache(mappingsCache: IngressGatewayPortMappingsCache): ControlPlaneBuilder { + this.ingressGatewayPortMappingsCache = mappingsCache + return this + } private fun NettyServerBuilder.withEnvoyServices(discoveryServer: V3DiscoveryServer): NettyServerBuilder { return this.addService(discoveryServer.aggregatedDiscoveryServiceImpl) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/Groups.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/Groups.kt index ef6eff51b..9db31a170 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/Groups.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/Groups.kt @@ -8,13 +8,16 @@ sealed class Group { abstract val listenersConfig: ListenersConfig? } +sealed class IngressGatewayGroup: Group() +sealed class SidecarGroup: Group() + data class ServicesGroup( override val communicationMode: CommunicationMode, override val serviceName: String = "", override val discoveryServiceName: String? = null, override val proxySettings: ProxySettings = ProxySettings(), override val listenersConfig: ListenersConfig? = null -) : Group() +) : SidecarGroup() data class AllServicesGroup( override val communicationMode: CommunicationMode, @@ -22,7 +25,23 @@ data class AllServicesGroup( override val discoveryServiceName: String? = null, override val proxySettings: ProxySettings = ProxySettings(), override val listenersConfig: ListenersConfig? = null -) : Group() +) : SidecarGroup() + +data class ServicesIngressGatewayGroup( + override val communicationMode: CommunicationMode, + override val serviceName: String = "", + override val discoveryServiceName: String? = null, + override val proxySettings: ProxySettings = ProxySettings(), + override val listenersConfig: ListenersConfig? = null +) : IngressGatewayGroup() + +data class AllServicesIngressGatewayGroup( + override val communicationMode: CommunicationMode, + override val serviceName: String = "", + override val discoveryServiceName: String? = null, + override val proxySettings: ProxySettings = ProxySettings(), + override val listenersConfig: ListenersConfig? = null +) : IngressGatewayGroup() data class ListenersConfig( val ingressHost: String, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/MetadataNodeGroup.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/MetadataNodeGroup.kt index f25228335..b8340453c 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/MetadataNodeGroup.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/MetadataNodeGroup.kt @@ -131,27 +131,47 @@ class MetadataNodeGroup( private fun createV3Group(node: NodeV3): Group { val nodeMetadata = NodeMetadata(node.metadata, properties) val serviceName = serviceName(nodeMetadata) - val discoveryServiceName = nodeMetadata.discoveryServiceName + val discoveryServiceName = nodeMetadata.discoveryServiceName ?: serviceName val proxySettings = proxySettings(nodeMetadata) val listenersConfig = createListenersConfig(node.id, node.metadata) - return when { - hasAllServicesDependencies(nodeMetadata) -> - AllServicesGroup( - nodeMetadata.communicationMode, - serviceName, - discoveryServiceName, - proxySettings, - listenersConfig - ) - else -> - ServicesGroup( - nodeMetadata.communicationMode, - serviceName, - discoveryServiceName, - proxySettings, - listenersConfig - ) + return when (nodeMetadata.mode) { + Mode.SERVICE -> when { + hasAllServicesDependencies(nodeMetadata) -> + AllServicesGroup( + nodeMetadata.communicationMode, + serviceName, + discoveryServiceName, + proxySettings, + listenersConfig + ) + else -> + ServicesGroup( + nodeMetadata.communicationMode, + serviceName, + discoveryServiceName, + proxySettings, + listenersConfig + ) + } + Mode.INGRESS_GATEWAY -> when { + hasAllServicesDependencies(nodeMetadata) -> + AllServicesIngressGatewayGroup( + nodeMetadata.communicationMode, + serviceName, + discoveryServiceName, + proxySettings, + listenersConfig + ) + else -> + ServicesIngressGatewayGroup( + nodeMetadata.communicationMode, + serviceName, + discoveryServiceName, + proxySettings, + listenersConfig + ) + } } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadata.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadata.kt index 369dd3475..5f1070873 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadata.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadata.kt @@ -28,6 +28,7 @@ class NodeMetadata(metadata: Struct, properties: SnapshotProperties) { .fieldsMap["discovery_service_name"] ?.stringValue + val mode = metadata.fieldsMap["mode"]?.stringValue?.let { Mode.valueOf(it) } ?: Mode.SERVICE val communicationMode = getCommunicationMode(metadata.fieldsMap["ads"]) val proxySettings: ProxySettings = ProxySettings(metadata.fieldsMap["proxy_settings"], properties) @@ -677,6 +678,10 @@ enum class CommunicationMode { ADS, XDS } +enum class Mode { + SERVICE, INGRESS_GATEWAY +} + data class OAuth( val provider: String = "", val verification: Verification = Verification.OFFLINE, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt index 1357b6eac..9456fd148 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt @@ -9,27 +9,34 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesIngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesIngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.IngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.SidecarGroup import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyIngressGatewayListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEgressRoutesFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIngressRoutesFactory import java.util.SortedMap +@Suppress("LongParameterList") class EnvoySnapshotFactory( private val ingressRoutesFactory: EnvoyIngressRoutesFactory, private val egressRoutesFactory: EnvoyEgressRoutesFactory, private val clustersFactory: EnvoyClustersFactory, private val endpointsFactory: EnvoyEndpointsFactory, private val listenersFactory: EnvoyListenersFactory, + private val ingressGatewayListenersFactory: EnvoyIngressGatewayListenersFactory, private val snapshotsVersions: SnapshotsVersions, private val properties: SnapshotProperties, private val meterRegistry: MeterRegistry @@ -150,7 +157,10 @@ class EnvoySnapshotFactory( fun getSnapshotForGroup(group: Group, globalSnapshot: GlobalSnapshot): Snapshot { val groupSample = Timer.start(meterRegistry) - val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot) + val newSnapshotForGroup = when (group) { + is IngressGatewayGroup -> newSnapshotForIngressGroup(group, globalSnapshot) + is SidecarGroup -> newSnapshotForGroup(group, globalSnapshot) + } groupSample.stop(meterRegistry.timer("snapshot-factory.get-snapshot-for-group.time")) return newSnapshotForGroup } @@ -188,10 +198,10 @@ class EnvoySnapshotFactory( ) } return when (group) { - is ServicesGroup -> { + is ServicesGroup, is ServicesIngressGatewayGroup -> { definedServicesRoutes } - is AllServicesGroup -> { + is AllServicesGroup, is AllServicesIngressGatewayGroup -> { val servicesNames = group.proxySettings.outgoing.getServiceDependencies().map { it.service }.toSet() val allServicesRoutes = globalSnapshot.allServicesNames.subtract(servicesNames).map { RouteSpecification( @@ -227,7 +237,7 @@ class EnvoySnapshotFactory( } private fun newSnapshotForGroup( - group: Group, + group: SidecarGroup, globalSnapshot: GlobalSnapshot ): Snapshot { @@ -268,8 +278,44 @@ class EnvoySnapshotFactory( // TODO(dj): endpoints depends on prerequisite of routes -> but only to extract clusterName, // which is present only in services (not domains) so it could be implemented differently. - val endpoints = getServicesEndpointsForGroup(group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, - egressRouteSpecification) + val endpoints = getServicesEndpointsForGroup( + group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, + egressRouteSpecification + ) + + val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes) + + return createSnapshot( + clusters = clusters, + clustersVersion = version.clusters, + endpoints = endpoints, + endpointsVersions = version.endpoints, + listeners = listeners, + // TODO: java-control-plane: https://github.com/envoyproxy/java-control-plane/issues/134 + listenersVersion = version.listeners, + routes = routes, + routesVersion = version.routes + ) + } + + private fun newSnapshotForIngressGroup( + group: IngressGatewayGroup, + globalSnapshot: GlobalSnapshot + ): Snapshot { + + val clusters: List = + clustersFactory.getClustersForGroup(group, globalSnapshot) + + val serviceRouteSpecification = getServiceRouteSpecifications(group, globalSnapshot) + + val routes = emptyList() + + val listeners = ingressGatewayListenersFactory.createListeners(group, globalSnapshot) + + val endpoints = getServicesEndpointsForGroup( + group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, + serviceRouteSpecification + ) val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt index a5221c042..82f0cbf90 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt @@ -36,6 +36,7 @@ class SnapshotProperties { var deltaXdsEnabled = false var retryPolicy = RetryPolicyProperties() var tcpDumpsEnabled: Boolean = true + var dcIngressGatewayService = "envoy-ingress-gateway" } class MetricsProperties { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/IngressGatewayPortMappingsCache.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/IngressGatewayPortMappingsCache.kt new file mode 100644 index 000000000..ecc0ac849 --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/IngressGatewayPortMappingsCache.kt @@ -0,0 +1,29 @@ +package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource + +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesIngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.IngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesIngressGatewayGroup +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +typealias GatewayName = String +typealias Port = Int +typealias Cluster = String + +class IngressGatewayPortMappingsCache { + private val serviceIngressGatewaysMappings: ConcurrentMap> = ConcurrentHashMap() + private val allServicesIngressGatewayMappings: ConcurrentMap> = ConcurrentHashMap() + + fun addMapping(group: IngressGatewayGroup, mapping: Map) { + when (group) { + is AllServicesIngressGatewayGroup -> allServicesIngressGatewayMappings[group.discoveryServiceName] = mapping + is ServicesIngressGatewayGroup -> serviceIngressGatewaysMappings[group.discoveryServiceName] = mapping + } + } + + fun ingressGatewayMapping(name: GatewayName): Map = serviceIngressGatewaysMappings[name] ?: mapOf() + fun dcIngressGatewayMapping(name: GatewayName): Map = + allServicesIngressGatewayMappings[name] ?: mapOf() + + fun dcIngressGatewayNames() = allServicesIngressGatewayMappings.keys +} diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt index 1a25e5a7f..20a64fdfd 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt @@ -36,12 +36,15 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsParameters import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesIngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.DomainDependency import pl.allegro.tech.servicemesh.envoycontrol.groups.Group +import pl.allegro.tech.servicemesh.envoycontrol.groups.IngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesIngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.containsGlobalRateLimits import pl.allegro.tech.servicemesh.envoycontrol.logger @@ -184,7 +187,7 @@ class EnvoyClustersFactory( } private fun getEdsClustersForGroup(group: Group, globalSnapshot: GlobalSnapshot): List { - val clusters: Map = if (enableTlsForGroup(group)) { + val clusters: Map = if (enableTlsForGroup(group) && group !is IngressGatewayGroup) { globalSnapshot.securedClusters } else { globalSnapshot.clusters @@ -193,10 +196,10 @@ class EnvoyClustersFactory( val serviceDependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service } val clustersForGroup = when (group) { - is ServicesGroup -> serviceDependencies.mapNotNull { + is ServicesGroup, is ServicesIngressGatewayGroup -> serviceDependencies.mapNotNull { createClusterForGroup(it.value.settings, clusters[it.key]) } - is AllServicesGroup -> { + is AllServicesGroup, is AllServicesIngressGatewayGroup -> { globalSnapshot.allServicesNames.mapNotNull { val dependency = serviceDependencies[it] if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) { @@ -242,10 +245,12 @@ class EnvoyClustersFactory( .setName(tlsProperties.validationContextSecretName).build() private val tlsCertificateSecretConfig = SdsSecretConfig.newBuilder() - .setName(tlsProperties.tlsCertificateSecretName).build() + .setName(tlsProperties.tlsCertificateSecretName) + .build() private fun createTlsContextWithSdsSecretConfig(serviceName: String): UpstreamTlsContext { val sanMatch = sanUriMatcher.createSanUriMatcher(serviceName) + val gatewaySanMatch = sanUriMatcher.createSanUriMatcher(properties.dcIngressGatewayService) return UpstreamTlsContext.newBuilder() .setCommonTlsContext( CommonTlsContext.newBuilder() @@ -254,7 +259,7 @@ class EnvoyClustersFactory( CommonTlsContext.CombinedCertificateValidationContext.newBuilder() .setDefaultValidationContext( CertificateValidationContext.newBuilder() - .addAllMatchSubjectAltNames(listOf(sanMatch)) + .addAllMatchSubjectAltNames(listOf(sanMatch, gatewaySanMatch)) .build() ) .setValidationContextSdsSecretConfig(validationContextSecretConfig) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/EnvoyIngressGatewayListenersFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/EnvoyIngressGatewayListenersFactory.kt new file mode 100644 index 000000000..4af2bad05 --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/EnvoyIngressGatewayListenersFactory.kt @@ -0,0 +1,73 @@ +package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners + +import io.envoyproxy.envoy.config.core.v3.Address +import io.envoyproxy.envoy.config.core.v3.SocketAddress +import io.envoyproxy.envoy.config.listener.v3.Filter +import io.envoyproxy.envoy.config.listener.v3.FilterChain +import io.envoyproxy.envoy.config.listener.v3.Listener +import io.envoyproxy.envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesIngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.IngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.ListenersConfig +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesIngressGatewayGroup +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache +import com.google.protobuf.Any as ProtobufAny + +@Suppress("MagicNumber") +class EnvoyIngressGatewayListenersFactory( + private val mappingsCache: IngressGatewayPortMappingsCache +) { + + fun createListeners(group: IngressGatewayGroup, globalSnapshot: GlobalSnapshot): List { + if (group.listenersConfig == null) { + return listOf() + } + val listenersConfig: ListenersConfig = group.listenersConfig!! + + return createIngressListener(group, listenersConfig, globalSnapshot) + } + + private fun createIngressListener( + group: IngressGatewayGroup, + listenersConfig: ListenersConfig, + globalSnapshot: GlobalSnapshot + ): List { + + val clusters = when (group) { + is AllServicesIngressGatewayGroup -> globalSnapshot.allServicesNames.sorted() + is ServicesIngressGatewayGroup -> group.proxySettings.outgoing.getServiceDependencies().map { it.service } + .sorted() + } + val mappings = mutableMapOf() + val listeners = clusters.mapIndexed { index, cluster -> + val port = listenersConfig.ingressPort + index + val listener = Listener.newBuilder() + .setName("ingress_listener_for_$cluster") + .setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder() + .setPortValue(port) + .setAddress(listenersConfig.ingressHost) + ) + ) + listener.addFilterChains( + FilterChain.newBuilder() + .addFilters( + Filter.newBuilder().setName("envoy.filters.network.tcp_proxy") + .setTypedConfig( + ProtobufAny.pack( + TcpProxy.newBuilder().setCluster(cluster) + .setStatPrefix("${cluster}_tcp") + .build() + ) + ) + ) + ) + mappings[cluster] = port + listener.build() + } + mappingsCache.addMapping(group, mappings) + return listeners + } +} diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngresGatewayIngressRoutesFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngresGatewayIngressRoutesFactory.kt new file mode 100644 index 000000000..e647029ed --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngresGatewayIngressRoutesFactory.kt @@ -0,0 +1,170 @@ +package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes + +import com.google.protobuf.BoolValue +import io.envoyproxy.controlplane.cache.TestResources +import io.envoyproxy.envoy.config.core.v3.HeaderValue +import io.envoyproxy.envoy.config.core.v3.HeaderValueOption +import io.envoyproxy.envoy.config.route.v3.DirectResponseAction +import io.envoyproxy.envoy.config.route.v3.HeaderMatcher +import io.envoyproxy.envoy.config.route.v3.InternalRedirectPolicy +import io.envoyproxy.envoy.config.route.v3.Route +import io.envoyproxy.envoy.config.route.v3.RouteAction +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration +import io.envoyproxy.envoy.config.route.v3.RouteMatch +import io.envoyproxy.envoy.config.route.v3.VirtualHost +import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties + +class EnvoyIngresGatewayIngressRoutesFactory( + private val properties: SnapshotProperties +) { + private val wildcardRoute = VirtualHost.newBuilder() + .setName("wildcard-route") + .addDomains("*") + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder() + .setPrefix("/") + ) + .setDirectResponse( + DirectResponseAction.newBuilder() + .setStatus(properties.egress.clusterNotFoundStatusCode) + ) + ) + .build() + + private val upstreamAddressHeader = HeaderValueOption.newBuilder().setHeader( + HeaderValue.newBuilder().setKey("x-envoy-upstream-remote-address") + .setValue("%UPSTREAM_REMOTE_ADDRESS%").build() + ) + + /** + * @see TestResources.createRoute + */ + fun createIngressRouteConfig( + serviceName: String, + routes: Collection, + addUpstreamAddressHeader: Boolean, + routeName: String = "default_routes" + ): RouteConfiguration { + val virtualHosts = routes + .filter { it.routeDomains.isNotEmpty() } + .map { routeSpecification -> + addMultipleRoutes( + VirtualHost.newBuilder() + .setName(routeSpecification.clusterName) + .addAllDomains(routeSpecification.routeDomains), + routeSpecification + ).build() + } + + var routeConfiguration = RouteConfiguration.newBuilder() + .setName(routeName) + .addAllVirtualHosts( + virtualHosts + wildcardRoute + ).also { + if (properties.incomingPermissions.enabled) { + it.addRequestHeadersToAdd( + HeaderValueOption.newBuilder() + .setHeader( + HeaderValue.newBuilder() + .setKey(properties.incomingPermissions.serviceNameHeader) + .setValue(serviceName) + ).setAppend(BoolValue.of(true)) + ) + } + } + + if (addUpstreamAddressHeader) { + routeConfiguration = routeConfiguration.addResponseHeadersToAdd(upstreamAddressHeader) + } + + return routeConfiguration.build() + } + + private fun addMultipleRoutes( + addAllDomains: VirtualHost.Builder, + routeSpecification: RouteSpecification + ): VirtualHost.Builder { + routeSpecification.settings.retryPolicy.let { + buildRouteForRetryPolicy(addAllDomains, routeSpecification) + } + buildDefaultRoute(addAllDomains, routeSpecification) + return addAllDomains + } + + private fun buildRouteForRetryPolicy( + addAllDomains: VirtualHost.Builder, + routeSpecification: RouteSpecification + ): VirtualHost.Builder? { + val regexAsAString = routeSpecification.settings.retryPolicy.methods?.joinToString(separator = "|") + val routeMatchBuilder = RouteMatch + .newBuilder() + .setPrefix("/") + .also { routeMatcher -> + regexAsAString?.let { + routeMatcher.addHeaders(buildMethodHeaderMatcher(it)) + } + } + + return addAllDomains.addRoutes( + Route.newBuilder() + .setMatch(routeMatchBuilder.build()) + .setRoute(createRouteAction(routeSpecification, shouldAddRetryPolicy = true)) + .build() + ) + } + + private fun buildMethodHeaderMatcher(regexAsAString: String) = HeaderMatcher.newBuilder() + .setName(":method") + .setSafeRegexMatch( + RegexMatcher.newBuilder() + .setRegex(regexAsAString) + .setGoogleRe2(RegexMatcher.GoogleRE2.getDefaultInstance()) + .build() + ) + + private fun buildDefaultRoute( + addAllDomains: VirtualHost.Builder, + routeSpecification: RouteSpecification + ) { + addAllDomains.addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder() + .setPrefix("/") + .build() + ) + .setRoute( + createRouteAction(routeSpecification) + ).build() + ) + } + + private fun createRouteAction( + routeSpecification: RouteSpecification, + shouldAddRetryPolicy: Boolean = false + ): RouteAction.Builder { + val routeAction = RouteAction.newBuilder() + .setCluster(routeSpecification.clusterName) + + routeSpecification.settings.timeoutPolicy.let { timeoutPolicy -> + timeoutPolicy.idleTimeout?.let { routeAction.setIdleTimeout(it) } + timeoutPolicy.requestTimeout?.let { routeAction.setTimeout(it) } + } + + if (shouldAddRetryPolicy) { + routeSpecification.settings.retryPolicy.let { policy -> + routeAction.setRetryPolicy(RequestPolicyMapper.mapToEnvoyRetryPolicyBuilder(policy)) + } + } + + if (routeSpecification.settings.handleInternalRedirect) { + routeAction.internalRedirectPolicy = InternalRedirectPolicy.newBuilder().build() + } + + return routeAction + } +} diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/SyncableLocalServiceStateCreator.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/SyncableLocalServiceStateCreator.kt new file mode 100644 index 000000000..f43de23b6 --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/SyncableLocalServiceStateCreator.kt @@ -0,0 +1,69 @@ +package pl.allegro.tech.servicemesh.envoycontrol.synchronization + +import pl.allegro.tech.servicemesh.envoycontrol.services.LocalClusterStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance +import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances +import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.Port +import java.util.concurrent.ConcurrentHashMap + +class SyncableLocalServiceStateCreator( + private val localClusterStateChanges: LocalClusterStateChanges, + private val mappingsCache: IngressGatewayPortMappingsCache +) { + + fun createSyncableLocalState(): ServicesState { + val localState = localClusterStateChanges.latestServiceState.get() + val dcIngressGatewaysNames = mappingsCache.dcIngressGatewayNames() + val (dcIngressGateways, services) = localState.allInstances() + .partition { dcIngressGatewaysNames.contains(it.serviceName) } + val servicesMap = services.map { + val instances = dcIngressGateways.mapNotNull { gatewayServiceInstances -> + val port = mappingsCache.dcIngressGatewayMapping(gatewayServiceInstances.serviceName)[it.serviceName] + if (port != null) { + mergeDcIngressGatewayWithServiceInstances(gatewayServiceInstances.instances, it.instances, port) + } else { + null + } + }.flatten().toSet().ifEmpty { it.instances } + ServiceInstances(serviceName = it.serviceName, instances = instances) + }.associateBy { it.serviceName } + + return ServicesState(ConcurrentHashMap(servicesMap)) + } + + private fun mergeDcIngressGatewayWithServiceInstances( + ingressGatewayInstances: Set, + serviceInstances: Set, + port: Port + ): Set { + var isCanary = false + var isReqular = false + val tags = mutableSetOf() + var maxWeight = 0 + for (instance in serviceInstances) { + tags.addAll(instance.tags) + if (!isCanary) { + isCanary = instance.canary + } + if (!isReqular) { + isReqular = instance.regular + } + if (instance.weight > maxWeight) { + maxWeight = instance.weight + } + } + return ingressGatewayInstances.map { + ServiceInstance( + id = it.id, + tags = tags, + address = it.address, + port = port, + regular = isReqular, + canary = isCanary, + weight = maxWeight + ) + }.toSet() + } +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt index 41fe92da5..065fa53a3 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt @@ -16,6 +16,7 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import pl.allegro.tech.servicemesh.envoycontrol.groups.AccessLogFilterSettings import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesIngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.Group @@ -24,14 +25,17 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.ListenersConfig import pl.allegro.tech.servicemesh.envoycontrol.groups.Outgoing import pl.allegro.tech.servicemesh.envoycontrol.groups.ProxySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesIngressGatewayGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.with import pl.allegro.tech.servicemesh.envoycontrol.snapshot.EnvoySnapshotFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotsVersions import pl.allegro.tech.servicemesh.envoycontrol.snapshot.outgoingTimeoutPolicy +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyIngressGatewayListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEgressRoutesFactory @@ -272,6 +276,50 @@ class EnvoySnapshotFactoryTest { assertThat(snapshot.endpoints().resources()).doesNotContainKey("rl_service") } + @Test + fun `should create ingress snapshot`() { + // given + val properties = SnapshotProperties() + val envoySnapshotFactory = createSnapshotFactory(properties) + + val cluster1 = createCluster(properties, clusterName = "cluster-1") + val cluster2 = createCluster(properties, clusterName = "cluster-2") + val servicesGroup: Group = createServicesIngressGatewayGroup( + dependencies = arrayOf( + cluster1.name to null, + cluster2.name to null, + ), snapshotProperties = properties + ) + + val allServicesGroup: Group = createAllServicesIngressGatewayGroup( + dependencies = arrayOf( + "*" to null, + ), snapshotProperties = properties + ) + + val globalSnapshot = createGlobalSnapshot(cluster1, cluster2) + + listOf(servicesGroup, allServicesGroup).forEach { group -> + + // when + val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot) + + // then + val ingressListenersCluster1 = snapshot.listeners().resources()["ingress_listener_for_cluster-1"] + val ingressListenersCluster2 = snapshot.listeners().resources()["ingress_listener_for_cluster-2"] + val socketAddressCluster1 = ingressListenersCluster1!!.address.socketAddress + val socketAddressCluster2 = ingressListenersCluster2!!.address.socketAddress + assertThat(socketAddressCluster1.address).isEqualTo(INGRESS_HOST) + assertThat(socketAddressCluster1.portValue).isEqualTo(INGRESS_PORT + 0) + assertThat(socketAddressCluster2.address).isEqualTo(INGRESS_HOST) + assertThat(socketAddressCluster2.portValue).isEqualTo(INGRESS_PORT + 1) + listOf(ingressListenersCluster1, ingressListenersCluster2).map { it.filterChainsList.firstOrNull()!!.filtersList } + .forEach { filterList -> + assertThat(filterList?.get(0)?.name).isEqualTo("envoy.filters.network.tcp_proxy") + } + } + } + private fun GlobalSnapshot.withEndpoint(clusterName: String): GlobalSnapshot = copy( endpoints = SnapshotResources.create(listOf(ClusterLoadAssignment.newBuilder() .setClusterName(clusterName) @@ -328,6 +376,45 @@ class EnvoySnapshotFactoryTest { ) } + private fun createServicesIngressGatewayGroup( + mode: CommunicationMode = CommunicationMode.XDS, + serviceName: String = DEFAULT_SERVICE_NAME, + discoveryServiceName: String = DEFAULT_DISCOVERY_SERVICE_NAME, + dependencies: Array> = emptyArray(), + rateLimitEndpoints: List = emptyList(), + snapshotProperties: SnapshotProperties + ): ServicesIngressGatewayGroup { + val listenersConfig = createListenersConfig(snapshotProperties) + return ServicesIngressGatewayGroup( + mode, + serviceName, + discoveryServiceName, + ProxySettings().with( + serviceDependencies = serviceDependencies(*dependencies), + rateLimitEndpoints = rateLimitEndpoints + ), + listenersConfig + ) + } + + private fun createAllServicesIngressGatewayGroup( + mode: CommunicationMode = CommunicationMode.XDS, + serviceName: String = DEFAULT_SERVICE_NAME, + discoveryServiceName: String = DEFAULT_DISCOVERY_SERVICE_NAME, + dependencies: Array> = emptyArray(), + snapshotProperties: SnapshotProperties + ): AllServicesIngressGatewayGroup { + val listenersConfig = createListenersConfig(snapshotProperties) + return AllServicesIngressGatewayGroup( + mode, + serviceName, + discoveryServiceName, + ProxySettings().with( + serviceDependencies = serviceDependencies(*dependencies)), + listenersConfig + ) + } + private fun createListenersConfig(snapshotProperties: SnapshotProperties): ListenersConfig { return ListenersConfig( ingressHost = INGRESS_HOST, @@ -353,6 +440,7 @@ class EnvoySnapshotFactoryTest { val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator()) val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties) val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters) + val ingressListenersFactory = EnvoyIngressGatewayListenersFactory(IngressGatewayPortMappingsCache()) val snapshotsVersions = SnapshotsVersions() val meterRegistry: MeterRegistry = SimpleMeterRegistry() @@ -362,6 +450,7 @@ class EnvoySnapshotFactoryTest { clustersFactory, endpointsFactory, listenersFactory, + ingressListenersFactory, snapshotsVersions, properties, meterRegistry @@ -379,7 +468,7 @@ class EnvoySnapshotFactoryTest { } private fun createCluster( - defaultProperties: SnapshotProperties, + defaultProperties: SnapshotProperties = SnapshotProperties(), clusterName: String = CLUSTER_NAME, serviceName: String = DEFAULT_SERVICE_NAME, idleTimeout: Long = DEFAULT_IDLE_TIMEOUT diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 85f5105a2..e235e0d19 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -47,8 +47,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceName import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyIngressGatewayListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEgressRoutesFactory @@ -1319,6 +1321,7 @@ class SnapshotUpdaterTest { snapshotProperties, EnvoyHttpFilters.emptyFilters ), + ingressGatewayListenersFactory = EnvoyIngressGatewayListenersFactory(IngressGatewayPortMappingsCache()), // Remember when LDS change we have to send RDS again snapshotsVersions = SnapshotsVersions(), properties = snapshotProperties, diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index 356311372..711767753 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -38,12 +38,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.InvalidPor import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.IpAddressFilter import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServiceInstancesFilter import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.IngressGatewayPortMappingsCache import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.synchronization.SyncableLocalServiceStateCreator import reactor.core.scheduler.Schedulers import java.net.URI @Configuration +@Suppress("LongParameterList") class ControlPlaneConfig { init { Schedulers.enableMetrics() @@ -64,11 +67,13 @@ class ControlPlaneConfig { meterRegistry: MeterRegistry, globalStateChanges: GlobalStateChanges, metrics: EnvoyControlMetrics, - envoyHttpFilters: EnvoyHttpFilters + envoyHttpFilters: EnvoyHttpFilters, + ingressGatewayPortMappingsCache: IngressGatewayPortMappingsCache, ): ControlPlane = ControlPlane.builder(properties, meterRegistry) .withMetrics(metrics) .withEnvoyHttpFilters(envoyHttpFilters) + .withIngressGatewayPortMappingsCache(ingressGatewayPortMappingsCache) .build(globalStateChanges.combined()) @Bean @@ -115,6 +120,16 @@ class ControlPlaneConfig { transformers ) + @Bean + fun syncableLocalServiceStateCreator( + localStateChanges: LocalClusterStateChanges, + mappingsCache: IngressGatewayPortMappingsCache + ): SyncableLocalServiceStateCreator = + SyncableLocalServiceStateCreator(localStateChanges, mappingsCache) + + @Bean + fun ingressGatewayPortMappingsCache() = IngressGatewayPortMappingsCache() + @Bean fun consulDatacenterReader(consulProperties: ConsulProperties, objectMapper: ObjectMapper): ConsulDatacenterReader = ConsulRecipes.consulRecipes() diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/StateController.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/StateController.kt index 44bde79d9..7094d647b 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/StateController.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/StateController.kt @@ -8,10 +8,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState @RestController -class StateController(val localClusterStateChanges: LocalClusterStateChanges) { +class StateController( + val localClusterStateChanges: LocalClusterStateChanges, + val syncableLocalServiceStateCreator: SyncableLocalServiceStateCreator +) { @GetMapping("/state") - fun getState(): ServicesState = localClusterStateChanges.latestServiceState.get() + fun getState(): ServicesState = syncableLocalServiceStateCreator.createSyncableLocalState() @GetMapping("/state/{serviceName}") fun getStateByServiceName(@PathVariable("serviceName") serviceName: String): ServiceInstances? = diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulLocalClusterStateChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulLocalClusterStateChanges.kt index 6a47c5492..22d75afe7 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulLocalClusterStateChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulLocalClusterStateChanges.kt @@ -19,6 +19,7 @@ class ConsulLocalClusterStateChanges( private val transformers: List = emptyList(), override val latestServiceState: AtomicReference = AtomicReference(ServicesState()) ) : LocalClusterStateChanges { + override fun stream(): Flux = consulChanges .watchState() diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/IngressGatewayTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/IngressGatewayTest.kt new file mode 100644 index 000000000..04f6f3933 --- /dev/null +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/IngressGatewayTest.kt @@ -0,0 +1,198 @@ +package pl.allegro.tech.servicemesh.envoycontrol + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import pl.allegro.tech.servicemesh.envoycontrol.assertions.isForbidden +import pl.allegro.tech.servicemesh.envoycontrol.assertions.isOk +import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted +import pl.allegro.tech.servicemesh.envoycontrol.config.Echo1EnvoyAuthConfig +import pl.allegro.tech.servicemesh.envoycontrol.config.Echo2EnvoyAuthConfig +import pl.allegro.tech.servicemesh.envoycontrol.config.GatewayEnvoyAuthConfig +import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulMultiClusterExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension +import pl.allegro.tech.servicemesh.envoycontrol.groups.Mode +import java.time.Duration + +class IngressGatewayTest { + companion object { + + private val pollingInterval = Duration.ofSeconds(1) + private val stateSampleDuration = Duration.ofSeconds(1) + + @JvmField + @RegisterExtension + val consulClusters = ConsulMultiClusterExtension() + + val properties = mapOf( + "envoy-control.envoy.snapshot.stateSampleDuration" to stateSampleDuration, + "envoy-control.envoy.snapshot.incoming-permissions.overlapping-paths-fix" to true, + "envoy-control.envoy.snapshot.incoming-permissions.tls-authentication.services-allowed-to-use-wildcard" to listOf( + "echo2" + ), + "envoy-control.envoy.snapshot.incoming-permissions.enabled" to true, + "envoy-control.sync.enabled" to true, + "envoy-control.sync.polling-interval" to pollingInterval.seconds, + "envoy-control.envoy.snapshot.outgoing-permissions.services-allowed-to-use-wildcard" to setOf( + "echo2", + "gateway" + ) + ) + + @JvmField + @RegisterExtension + val envoyControlDc1 = EnvoyControlClusteredExtension( + consulClusters.serverFirst, { properties }, listOf( + consulClusters + ) + ) + + @JvmField + @RegisterExtension + val envoyControlDc2 = EnvoyControlClusteredExtension( + consulClusters.serverSecond, { properties }, listOf( + consulClusters + ) + ) + + @JvmField + @RegisterExtension + val service1 = EchoServiceExtension() + + // language=yaml + private val echo1Yaml = """ + node: + metadata: + proxy_settings: + outgoing: + dependencies: + - service: "echo2" + - service: "gateway" + incoming: + unlistedEndpointsPolicy: log + endpoints: + - path: "/secured_endpoint" + clients: ["echo2"] + """.trimIndent() + + @JvmField + @RegisterExtension + val envoy1 = EnvoyExtension( + envoyControlDc1, + service1, + config = Echo1EnvoyAuthConfig.copy(configOverride = echo1Yaml, serviceName = "echo1") + ) + + @JvmField + @RegisterExtension + val service2 = EchoServiceExtension() + + // language=yaml + private val echo2Yaml = """ + node: + metadata: + proxy_settings: + outgoing: + dependencies: + - service: "echo1" + incoming: + unlistedEndpointsPolicy: log + endpoints: + - path: "/secured_endpoint" + clients: ["*"] + - path: "/secured_endpoint_not_echo1" + clients: ["echo4"] + """.trimIndent() + + @JvmField + @RegisterExtension + val envoy2 = EnvoyExtension( + envoyControlDc2, + service2, + config = Echo2EnvoyAuthConfig.copy(configOverride = echo2Yaml, serviceName = "echo2") + ) + + // language=yaml + private val gatewayYaml = """ + node: + metadata: + proxy_settings: + outgoing: + dependencies: + - service: "*" + """.trimIndent() + + @JvmField + @RegisterExtension + val gatewayEnvoy = EnvoyExtension( + envoyControlDc2, + config = GatewayEnvoyAuthConfig.copy(configOverride = gatewayYaml, serviceName = "gateway"), + mode = Mode.INGRESS_GATEWAY + ) + } + + private val adminEnvoy1 = envoy1.container.admin() + private val adminEnvoy2 = envoy2.container.admin() + private val adminGateway = gatewayEnvoy.container.admin() + + fun init() { + consulClusters.serverFirst.operations.registerServiceWithEnvoyOnIngress( + envoy1, + name = "echo1", + tags = listOf("mtls:enabled", "envoy") + ) + consulClusters.serverSecond.operations.registerServiceWithEnvoyOnIngress( + envoy2, + name = "echo2", + tags = listOf("mtls:enabled", "envoy") + ) + consulClusters.serverSecond.operations.registerServiceWithEnvoyOnIngress(gatewayEnvoy, name = "gateway") + envoy1.waitForAvailableEndpoints("echo2") + gatewayEnvoy.waitForAvailableEndpoints("echo2") + } + + @Test + fun `dc ingress gateway should pass traffic to correct upstream and make tls termination on final destination`() { + init() + val response = envoy1.egressOperations.callService("echo2", pathAndQuery = "/secured_endpoint") + assertThat(response).isOk() + + untilAsserted(wait = Duration.ofSeconds(15)) { + assertThat(adminEnvoy1.statValue("cluster.echo2.upstream_cx_http2_total")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy1.statValue("cluster.echo2.ssl.handshake")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy1.statValue("cluster.echo2.ssl.fail_verify_error")?.toInt()).isEqualTo(0) + assertThat(adminEnvoy1.statValue("cluster.echo2.upstream_rq_2xx")?.toInt()).isEqualTo(1) + + assertThat(adminEnvoy2.statValue("http.ingress_https.downstream_cx_ssl_total")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy2.statValue("http.ingress_https.downstream_rq_2xx")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy2.statValue("http.ingress_https.rbac.allowed")?.toInt()).isEqualTo(1) + + assertThat(adminGateway.statValue("tcp.echo2_tcp.downstream_cx_total")?.toInt()).isEqualTo(1) + assertThat(adminGateway.statValue("cluster.echo2.upstream_cx_active")?.toInt()).isEqualTo(1) + } + } + + @Test + fun `dc ingress gateway should pass traffic to correct upstream and deny on rbac for unlisted client`() { + init() + + val response = envoy1.egressOperations.callService("echo2", pathAndQuery = "/secured_endpoint_not_echo1") + assertThat(response).isForbidden() + + untilAsserted(wait = Duration.ofSeconds(15)) { + assertThat(adminEnvoy1.statValue("cluster.echo2.upstream_cx_http2_total")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy1.statValue("cluster.echo2.ssl.handshake")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy1.statValue("cluster.echo2.ssl.fail_verify_error")?.toInt()).isEqualTo(0) + assertThat(adminEnvoy1.statValue("cluster.echo2.upstream_rq_403")?.toInt()).isEqualTo(1) + + assertThat(adminEnvoy2.statValue("http.ingress_https.downstream_cx_ssl_total")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy2.statValue("http.ingress_https.downstream_rq_4xx")?.toInt()).isEqualTo(1) + assertThat(adminEnvoy2.statValue("http.ingress_https.rbac.denied")?.toInt()).isEqualTo(1) + + assertThat(adminGateway.statValue("tcp.echo2_tcp.downstream_cx_total")?.toInt()).isEqualTo(1) + assertThat(adminGateway.statValue("cluster.echo2.upstream_cx_active")?.toInt()).isEqualTo(1) + } + } +} diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt index 9a968b2be..edd959825 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt @@ -65,6 +65,11 @@ val Echo3EnvoyAuthConfig = Echo1EnvoyAuthConfig.copy( certificateChain = "/app/fullchain_echo3.pem", privateKey = "/app/privkey_echo3.pem" ) +val GatewayEnvoyAuthConfig = EnvoyConfig(filePath = "envoy/config_gateway.yaml", + serviceName = "gateway", + certificateChain = "/app/fullchain_echo3.pem", + privateKey = "/app/privkey_echo3.pem" +) val AdsWithDisabledEndpointPermissions = EnvoyConfig("envoy/config_ads_disabled_endpoint_permissions.yaml") val AdsWithStaticListeners = EnvoyConfig("envoy/config_ads_static_listeners.yaml") val AdsWithNoDependencies = EnvoyConfig("envoy/config_ads_no_dependencies.yaml") diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyContainer.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyContainer.kt index 99fe91166..553368149 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyContainer.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyContainer.kt @@ -5,9 +5,11 @@ import org.springframework.core.io.ClassPathResource import org.testcontainers.containers.BindMode import org.testcontainers.containers.Container import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.images.builder.dockerfile.DockerfileBuilder import pl.allegro.tech.servicemesh.envoycontrol.config.EnvoyConfig import pl.allegro.tech.servicemesh.envoycontrol.config.containers.SSLGenericContainer +import pl.allegro.tech.servicemesh.envoycontrol.groups.Mode import pl.allegro.tech.servicemesh.envoycontrol.logger as loggerDelegate class EnvoyContainer( @@ -16,6 +18,7 @@ class EnvoyContainer( private val envoyControl1XdsPort: Int, private val envoyControl2XdsPort: Int = envoyControl1XdsPort, private val logLevel: String = "info", + private val mode: Mode = Mode.SERVICE, image: String = DEFAULT_IMAGE ) : SSLGenericContainer( dockerfileBuilder = DockerfileBuilder() @@ -54,7 +57,14 @@ class EnvoyContainer( withClasspathResourceMapping(EXTRA_DIR, EXTRA_DIR_DEST, BindMode.READ_ONLY) } withEnv(ENVOY_UID_ENV_NAME, "0") - withExposedPorts(EGRESS_LISTENER_CONTAINER_PORT, INGRESS_LISTENER_CONTAINER_PORT, ADMIN_PORT) + when (mode) { + Mode.SERVICE -> withExposedPorts( + EGRESS_LISTENER_CONTAINER_PORT, + INGRESS_LISTENER_CONTAINER_PORT, + ADMIN_PORT + ) + Mode.INGRESS_GATEWAY -> withExposedPorts(INGRESS_LISTENER_CONTAINER_PORT, ADMIN_PORT) + } withPrivilegedMode(true) withCommand( @@ -70,6 +80,7 @@ class EnvoyContainer( "--config-yaml", config.configOverride, "-l", logLevel ) + waitingFor(Wait.forListeningPort()) } fun addIptablesRedirect(redirectToPort: Int, destinationPort: Int) { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyExtension.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyExtension.kt index cc5f22468..8c87c01a9 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyExtension.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyExtension.kt @@ -13,12 +13,14 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.EnvoyConfig import pl.allegro.tech.servicemesh.envoycontrol.config.RandomConfigFile import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlExtensionBase import pl.allegro.tech.servicemesh.envoycontrol.config.service.ServiceExtension +import pl.allegro.tech.servicemesh.envoycontrol.groups.Mode import pl.allegro.tech.servicemesh.envoycontrol.logger class EnvoyExtension( private val envoyControl: EnvoyControlExtensionBase, private val localService: ServiceExtension<*>? = null, - private val config: EnvoyConfig = RandomConfigFile + private val config: EnvoyConfig = RandomConfigFile, + mode: Mode = Mode.SERVICE ) : BeforeAllCallback, AfterAllCallback, AfterEachCallback { companion object { @@ -28,7 +30,8 @@ class EnvoyExtension( val container: EnvoyContainer = EnvoyContainer( config, { localService?.container()?.ipAddress() ?: "127.0.0.1" }, - envoyControl.app.grpcPort + envoyControl.app.grpcPort, + mode = mode ).withNetwork(Network.SHARED) val ingressOperations: IngressOperations = IngressOperations(container) diff --git a/envoy-control-tests/src/main/resources/envoy/config_gateway.yaml b/envoy-control-tests/src/main/resources/envoy/config_gateway.yaml new file mode 100644 index 000000000..698f88c6b --- /dev/null +++ b/envoy-control-tests/src/main/resources/envoy/config_gateway.yaml @@ -0,0 +1,104 @@ +admin: + access_log_path: /dev/null + address: + socket_address: { address: 0.0.0.0, port_value: 10000 } +dynamic_resources: + lds_config: + resource_api_version: V3 + ads: {} + cds_config: + resource_api_version: V3 + ads: {} + ads_config: + transport_api_version: V3 + api_type: GRPC + grpc_services: + envoy_grpc: + cluster_name: envoy-control-xds +node: + cluster: test-cluster + id: test-id + metadata: + service_name: "SERVICE_NAME" + ads: true + ingress_host: "0.0.0.0" + ingress_port: 5001 + egress_host: "0.0.0.0" + egress_port: 5000 + use_remote_address: true + generate_request_id: true + preserve_external_request_id: true + access_log_enabled: false + add_upstream_external_address_header: true + has_static_secrets_defined: true + mode: INGRESS_GATEWAY + resources_dir: "/etc/envoy/extra" + proxy_settings: + outgoing: + dependencies: + - service: "*" + +static_resources: + secrets: + - name: server_cert + tls_certificate: + certificate_chain: + filename: CERTIFICATE_CHAIN + private_key: + filename: PRIVATE_KEY + - name: validation_context + validation_context: + trusted_ca: + filename: TRUSTED_CA + clusters: + - connect_timeout: 1s + load_assignment: + cluster_name: envoy-control-xds + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: HOST_IP + port_value: HOST_PORT + - endpoint: + address: + socket_address: + address: HOST_IP + port_value: HOST2_PORT + http2_protocol_options: {} + name: envoy-control-xds + - name: envoy-original-destination + type: ORIGINAL_DST + lb_policy: CLUSTER_PROVIDED + original_dst_lb_config: + use_http_header: true + connect_timeout: + seconds: 1 + http_protocol_options: + allow_absolute_url: true + - name: local_service + type: STATIC + load_assignment: + cluster_name: local_service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: LOCAL_SERVICE_IP + port_value: 5678 + connect_timeout: 1s + - name: this_admin + type: STATIC + load_assignment: + cluster_name: this_admin + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 10000 + connect_timeout: + seconds: 1