From 17961595eebd78d0f21efbbea75c31579a219a3a Mon Sep 17 00:00:00 2001 From: Vincent Gromakowski Date: Mon, 28 Oct 2024 15:30:53 +0100 Subject: [PATCH 01/12] improve example --- .../stacks/streaming_governance_stack.py | 309 ++++++++++++++++++ .../tests/test_example.py | 2 +- 2 files changed, 310 insertions(+), 1 deletion(-) create mode 100644 examples/datazone-msk-governance/stacks/streaming_governance_stack.py diff --git a/examples/datazone-msk-governance/stacks/streaming_governance_stack.py b/examples/datazone-msk-governance/stacks/streaming_governance_stack.py new file mode 100644 index 000000000..5c5739b2f --- /dev/null +++ b/examples/datazone-msk-governance/stacks/streaming_governance_stack.py @@ -0,0 +1,309 @@ +from aws_cdk import ( + BundlingOptions, + CfnParameterProps, + Duration, + RemovalPolicy, + Stack, + aws_lambda as ldba, + aws_ec2 as ec2, + aws_iam as iam, + aws_emrserverless as emrserverless, + aws_glue as glue, + aws_datazone as datazone, + aws_lambda_python_alpha as python, + aws_s3_assets as assets, + DockerImage, + BundlingOutput, + aws_kinesisanalyticsv2 as kda, +) +from constructs import Construct +from cdklabs import aws_data_solutions_framework as dsf + + +class StreamingGovernanceStack(Stack): + + def __init__(self, scope: Construct, construct_id: str, domain_id: str, datazone_portal_role_name: str, environment_id: str ='', **kwargs) -> None: + super().__init__(scope, construct_id, **kwargs) + + stack = Stack.of(self) + producer_topic = 'producer-data-product' + consumer_topic = 'consumer-data-product' + + # Set the flag to remove all resources on delete + self.node.set_context("@data-solutions-framework-on-aws/removeDataOnDestroy", True) + + ### Central components for streaming governance + + msk_asset_type = dsf.governance.DataZoneMskAssetType(self, + "DataZoneMskAssetType", + domain_id=domain_id, + removal_policy=RemovalPolicy.DESTROY) + + central_authorizer = dsf.governance.DataZoneMskCentralAuthorizer(self, + 'CentralAuthorizer', + domain_id=domain_id, + removal_policy=RemovalPolicy.DESTROY) + + dsf.governance.DataZoneMskEnvironmentAuthorizer(self, + 'EnvironmentAuthorizer', + domain_id=domain_id, + grant_msk_managed_vpc=True) + + ### Components for producer and consumer environments + + vpc = dsf.utils.DataVpc(self, + 'EnvironmentsVpc', + vpc_cidr='10.0.0.0/16') + + default_security_group = ec2.SecurityGroup.from_security_group_id(self, 'DefaultSecurityGroup', vpc.vpc.vpc_default_security_group) + + msk_cluster = dsf.streaming.MskServerless(self, "MskServerless", + cluster_name='serverless-cluster', + vpc=vpc.vpc, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS), + removal_policy=RemovalPolicy.DESTROY) + + datazone_portal_role = iam.Role.from_role_name(self, 'DataZonePortalRole', datazone_portal_role_name) + + ### Producer environment + + msk_cluster.add_topic('ProducerTopic', + topic_definition=dsf.streaming.MskTopic( + num_partitions=1, + topic=producer_topic)) + + producer_schema_registry = glue.CfnRegistry(self, 'ProducerRegistry', + name='producer-registry') + + producer_role = iam.Role(self, 'ProducerRole', + assumed_by=iam.ServicePrincipal('lambda.amazonaws.com'), + managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSLambdaBasicExecutionRole')], + inline_policies={ + 'network': iam.PolicyDocument( + statements=[ + iam.PolicyStatement(actions=[ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DeleteNetworkInterface'], + resources=['*'])]), + 'datazone': iam.PolicyDocument( + statements=[ + iam.PolicyStatement(actions=['datazone:PostLineageEvent'], + resources=[f'arn:{stack.partition}:datazone:{stack.region}:{stack.account}:domain/{domain_id}'])]), + 'gsr': iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + 'glue:GetRegistry', + 'glue:CreateRegistry', + 'glue:UpdateRegistry', + 'glue:ListRegistries'], + resources=[producer_schema_registry.attr_arn]), + iam.PolicyStatement( + actions=[ + 'glue:CreateSchema', + 'glue:UpdateSchema', + 'glue:GetSchema', + 'glue:ListSchemas', + 'glue:RegisterSchemaVersion', + 'glue:DeleteSchemaVersions', + 'glue:GetSchemaVersion', + 'glue:GetSchemaByDefinition', + 'glue:GetSchemaVersionsDiff', + 'glue:ListSchemaVersions', + 'glue:CheckSchemaVersionValidity', + 'glue:PutSchemaVersionMetadata', + 'glue:RemoveSchemaVersionMetadata', + 'glue:QuerySchemaVersionMetadata',], + resources=['*'])])}) + + msk_cluster.grant_produce(producer_topic, producer_role) + + producer_dz_project = datazone.CfnProject(self, 'ProducerProject', domain_identifier=domain_id, name='producer') + + datazone.CfnProjectMembership(self, 'AdminProducerMembership', + designation='PROJECT_OWNER', + domain_identifier=domain_id, + member=datazone.CfnProjectMembership.MemberProperty(user_identifier=datazone_portal_role.role_arn), + project_identifier=producer_dz_project.attr_id) + + dsf.governance.DataZoneGsrMskDataSource(self, + 'ProducerGsrDataSource', + cluster_name=msk_cluster.cluster_name, + domain_id=domain_id, + project_id=producer_dz_project.attr_id, + registry_name=producer_schema_registry.name, + enable_schema_registry_event=True, + removal_policy=RemovalPolicy.DESTROY) + + producer_lambda = python.PythonFunction(self, 'ProducerLambda', + entry='./resources/lambda', + runtime=ldba.Runtime.PYTHON_3_9, + index='producer/index.py', + handler='lambda_handler', + # bundling=python.BundlingOptions(asset_excludes=["consumer"]), + vpc=vpc.vpc, + vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS), + security_groups=[default_security_group], + role=producer_role, + memory_size=512, + timeout=Duration.minutes(15), + environment={ + 'KAFKA_CLUSTER_NAME': msk_cluster.cluster_name, + 'KAFKA_AUTH': 'iam', + 'KAFKA_BOOTSTRAP': msk_cluster.cluster_boostrap_brokers, + 'KAFKA_TOPIC': producer_topic, + 'GLUE_REGISTRY_NAME': producer_schema_registry.name, + 'DZ_DOMAIN_ID': domain_id, + }) + + msk_cluster.broker_security_group.add_ingress_rule(peer=default_security_group, connection=ec2.Port.tcp(9098)) + + ### Consumer environment + + consumer_dz_project = datazone.CfnProject(self, 'ConsumerProject', domain_identifier=domain_id, name='consumer') + + datazone.CfnProjectMembership(self, 'AdminConsumerMembership', + designation='PROJECT_OWNER', + domain_identifier=domain_id, + member=datazone.CfnProjectMembership.MemberProperty(user_identifier=datazone_portal_role.role_arn), + project_identifier=consumer_dz_project.attr_id) + + # msk_cluster.add_topic('ConsumerTopic', + # topic_definition=dsf.streaming.MskTopic( + # num_partitions=1, + # topic=consumer_topic)) + + # consumer_schema_registry = glue.CfnRegistry(self, 'ConsumerRegistry', + # name='consumer-registry') + + # dsf.governance.DataZoneGsrMskDataSource(self, + # 'ConsumerGsrDataSource', + # cluster_name=msk_cluster.cluster_name, + # domain_id=domain_id, + # project_id=consumer_dz_project.attr_id, + # registry_name=consumer_schema_registry.name, + # enable_schema_registry_event=True) + + flink_app_asset = assets.Asset(self, 'FlinkAppAsset', + path="resources/flink", # Path to the Flink application folder + bundling=BundlingOptions( + image=DockerImage.from_registry('maven:3.8.6-openjdk-11'), # Maven Docker image + output_type=BundlingOutput.SINGLE_FILE, # This ensures the jar is copied as-is to the S3 bucket + command=[ + 'sh', + '-c', + 'mvn clean package && cp target/[^original-]*.jar /asset-output/'])) + + # consumer_role = dsf.processing.SparkEmrServerlessRuntime.create_execution_role(self, 'ConsumerRole') + consumer_role = iam.Role(self, 'ConsumerRole', + assumed_by=iam.CompositePrincipal( + iam.ServicePrincipal('kinesisanalytics.amazonaws.com'), + # iam.ServicePrincipal('emr-serverless.amazonaws.com'), + iam.ServicePrincipal('datazone.amazonaws.com')), + inline_policies={ + 'consumerPolicy': iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=['datazone:PostLineageEvent'], + resources=[f'arn:{stack.partition}:datazone:{stack.region}:{stack.account}:domain/{domain_id}']), + # iam.PolicyStatement( + # actions=[ + # 'glue:GetRegistry', + # 'glue:CreateRegistry', + # 'glue:UpdateRegistry', + # 'glue:ListRegistries' + # ], + # resources=[consumer_schema_registry.attr_arn]), + iam.PolicyStatement( + actions=[ + 'glue:CreateSchema', + 'glue:UpdateSchema', + 'glue:GetSchema', + 'glue:ListSchemas', + 'glue:RegisterSchemaVersion', + 'glue:DeleteSchemaVersions', + 'glue:GetSchemaVersion', + 'glue:GetSchemaByDefinition', + 'glue:GetSchemaVersionsDiff', + 'glue:ListSchemaVersions', + 'glue:CheckSchemaVersionValidity', + 'glue:PutSchemaVersionMetadata', + 'glue:RemoveSchemaVersionMetadata', + 'glue:QuerySchemaVersionMetadata',], + resources=['*']), + iam.PolicyStatement( + actions=[ + 'ec2:DescribeVpcs', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + 'ec2:DescribeDhcpOptions', + 'ec2:CreateNetworkInterface', + 'ec2:CreateNetworkInterfacePermission', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DeleteNetworkInterface', + ], + resources=['*']), + iam.PolicyStatement( + actions=[ + 's3:GetObject', + 's3:GetObjectVersion' + ], + resources=[ + flink_app_asset.bucket.bucket_arn, + flink_app_asset.bucket.arn_for_objects(flink_app_asset.s3_object_key) + ] + )])}) + + msk_cluster.grant_produce(consumer_topic, consumer_role) + + asset_grant = flink_app_asset.bucket.grant_read(identity=consumer_role, objects_key_pattern=flink_app_asset.s3_object_key) + + managed_flink_application = kda.CfnApplication(self, 'Managed Flink Application', + application_name='flink-consumer', + runtime_environment='FLINK-1_18', + service_execution_role=consumer_role.role_arn, + application_configuration=kda.CfnApplication.ApplicationConfigurationProperty( + application_code_configuration=kda.CfnApplication.ApplicationCodeConfigurationProperty( + code_content=kda.CfnApplication.CodeContentProperty( + s3_content_location=kda.CfnApplication.S3ContentLocationProperty( + bucket_arn=f'arn:aws:s3:::{flink_app_asset.s3_bucket_name}', + file_key=flink_app_asset.s3_object_key + ), + ), + code_content_type="ZIPFILE"), + application_snapshot_configuration=kda.CfnApplication.ApplicationSnapshotConfigurationProperty( + snapshots_enabled=True), + vpc_configurations=[kda.CfnApplication.VpcConfigurationProperty( + subnet_ids=vpc.vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS).subnet_ids, + security_group_ids=[vpc.vpc.vpc_default_security_group])], + environment_properties=kda.CfnApplication.EnvironmentPropertiesProperty( + property_groups=[kda.CfnApplication.PropertyGroupProperty( + property_group_id="FlinkApplicationProperties", + property_map={ + 'bootstrap.servers': msk_cluster.cluster_boostrap_brokers, + 'source.topic': producer_topic, + 'sourceClusterName': msk_cluster.cluster_name, + 'datazoneDomainID': domain_id, + 'lineageTransport': 'datazone', + 'region': self.region, + 'sourceRegistry': producer_schema_registry.name + })]), + flink_application_configuration=kda.CfnApplication.FlinkApplicationConfigurationProperty( + parallelism_configuration=kda.CfnApplication.ParallelismConfigurationProperty( + parallelism=1, + configuration_type="CUSTOM", + parallelism_per_kpu=1, + auto_scaling_enabled=False), + monitoring_configuration=kda.CfnApplication.MonitoringConfigurationProperty( + configuration_type="CUSTOM", + metrics_level="APPLICATION", + log_level="INFO")))) + + # dsf.governance.DataZoneHelpers.create_subscription_target(self, 'ConsumerSubscriptionTarget', + # custom_asset_type=msk_asset_type.msk_custom_asset_type, + # name='MskTopicsTarget', + # provider='dsf', + # environment_id=environment_id, + # authorized_principals=[consumer_role], + # manage_access_role=datazone_portal_role) diff --git a/examples/datazone-msk-governance/tests/test_example.py b/examples/datazone-msk-governance/tests/test_example.py index e0a17ae17..536a485a9 100644 --- a/examples/datazone-msk-governance/tests/test_example.py +++ b/examples/datazone-msk-governance/tests/test_example.py @@ -78,4 +78,4 @@ def test_nag_errors(results): errors = Annotations.from_stack(results[0]).find_error('*', Match.string_like_regexp('AwsSolutions-.*')) for error in errors: print(error) - assert(len(errors) == 0) \ No newline at end of file + assert(len(errors) == 0) From 93e49d5f2289cc1ffaaf658ef80198f2b9d23aaf Mon Sep 17 00:00:00 2001 From: Vincent Gromakowski Date: Tue, 29 Oct 2024 23:30:20 +0100 Subject: [PATCH 02/12] add example tests --- LICENSE | 2 +- .../stacks/streaming_governance_stack.py | 309 ------------------ framework/LICENSE | 2 +- 3 files changed, 2 insertions(+), 311 deletions(-) delete mode 100644 examples/datazone-msk-governance/stacks/streaming_governance_stack.py diff --git a/LICENSE b/LICENSE index 5046ef26a..d64569567 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2021-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/examples/datazone-msk-governance/stacks/streaming_governance_stack.py b/examples/datazone-msk-governance/stacks/streaming_governance_stack.py deleted file mode 100644 index 5c5739b2f..000000000 --- a/examples/datazone-msk-governance/stacks/streaming_governance_stack.py +++ /dev/null @@ -1,309 +0,0 @@ -from aws_cdk import ( - BundlingOptions, - CfnParameterProps, - Duration, - RemovalPolicy, - Stack, - aws_lambda as ldba, - aws_ec2 as ec2, - aws_iam as iam, - aws_emrserverless as emrserverless, - aws_glue as glue, - aws_datazone as datazone, - aws_lambda_python_alpha as python, - aws_s3_assets as assets, - DockerImage, - BundlingOutput, - aws_kinesisanalyticsv2 as kda, -) -from constructs import Construct -from cdklabs import aws_data_solutions_framework as dsf - - -class StreamingGovernanceStack(Stack): - - def __init__(self, scope: Construct, construct_id: str, domain_id: str, datazone_portal_role_name: str, environment_id: str ='', **kwargs) -> None: - super().__init__(scope, construct_id, **kwargs) - - stack = Stack.of(self) - producer_topic = 'producer-data-product' - consumer_topic = 'consumer-data-product' - - # Set the flag to remove all resources on delete - self.node.set_context("@data-solutions-framework-on-aws/removeDataOnDestroy", True) - - ### Central components for streaming governance - - msk_asset_type = dsf.governance.DataZoneMskAssetType(self, - "DataZoneMskAssetType", - domain_id=domain_id, - removal_policy=RemovalPolicy.DESTROY) - - central_authorizer = dsf.governance.DataZoneMskCentralAuthorizer(self, - 'CentralAuthorizer', - domain_id=domain_id, - removal_policy=RemovalPolicy.DESTROY) - - dsf.governance.DataZoneMskEnvironmentAuthorizer(self, - 'EnvironmentAuthorizer', - domain_id=domain_id, - grant_msk_managed_vpc=True) - - ### Components for producer and consumer environments - - vpc = dsf.utils.DataVpc(self, - 'EnvironmentsVpc', - vpc_cidr='10.0.0.0/16') - - default_security_group = ec2.SecurityGroup.from_security_group_id(self, 'DefaultSecurityGroup', vpc.vpc.vpc_default_security_group) - - msk_cluster = dsf.streaming.MskServerless(self, "MskServerless", - cluster_name='serverless-cluster', - vpc=vpc.vpc, - subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS), - removal_policy=RemovalPolicy.DESTROY) - - datazone_portal_role = iam.Role.from_role_name(self, 'DataZonePortalRole', datazone_portal_role_name) - - ### Producer environment - - msk_cluster.add_topic('ProducerTopic', - topic_definition=dsf.streaming.MskTopic( - num_partitions=1, - topic=producer_topic)) - - producer_schema_registry = glue.CfnRegistry(self, 'ProducerRegistry', - name='producer-registry') - - producer_role = iam.Role(self, 'ProducerRole', - assumed_by=iam.ServicePrincipal('lambda.amazonaws.com'), - managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSLambdaBasicExecutionRole')], - inline_policies={ - 'network': iam.PolicyDocument( - statements=[ - iam.PolicyStatement(actions=[ - 'ec2:CreateNetworkInterface', - 'ec2:DescribeNetworkInterfaces', - 'ec2:DeleteNetworkInterface'], - resources=['*'])]), - 'datazone': iam.PolicyDocument( - statements=[ - iam.PolicyStatement(actions=['datazone:PostLineageEvent'], - resources=[f'arn:{stack.partition}:datazone:{stack.region}:{stack.account}:domain/{domain_id}'])]), - 'gsr': iam.PolicyDocument( - statements=[ - iam.PolicyStatement( - actions=[ - 'glue:GetRegistry', - 'glue:CreateRegistry', - 'glue:UpdateRegistry', - 'glue:ListRegistries'], - resources=[producer_schema_registry.attr_arn]), - iam.PolicyStatement( - actions=[ - 'glue:CreateSchema', - 'glue:UpdateSchema', - 'glue:GetSchema', - 'glue:ListSchemas', - 'glue:RegisterSchemaVersion', - 'glue:DeleteSchemaVersions', - 'glue:GetSchemaVersion', - 'glue:GetSchemaByDefinition', - 'glue:GetSchemaVersionsDiff', - 'glue:ListSchemaVersions', - 'glue:CheckSchemaVersionValidity', - 'glue:PutSchemaVersionMetadata', - 'glue:RemoveSchemaVersionMetadata', - 'glue:QuerySchemaVersionMetadata',], - resources=['*'])])}) - - msk_cluster.grant_produce(producer_topic, producer_role) - - producer_dz_project = datazone.CfnProject(self, 'ProducerProject', domain_identifier=domain_id, name='producer') - - datazone.CfnProjectMembership(self, 'AdminProducerMembership', - designation='PROJECT_OWNER', - domain_identifier=domain_id, - member=datazone.CfnProjectMembership.MemberProperty(user_identifier=datazone_portal_role.role_arn), - project_identifier=producer_dz_project.attr_id) - - dsf.governance.DataZoneGsrMskDataSource(self, - 'ProducerGsrDataSource', - cluster_name=msk_cluster.cluster_name, - domain_id=domain_id, - project_id=producer_dz_project.attr_id, - registry_name=producer_schema_registry.name, - enable_schema_registry_event=True, - removal_policy=RemovalPolicy.DESTROY) - - producer_lambda = python.PythonFunction(self, 'ProducerLambda', - entry='./resources/lambda', - runtime=ldba.Runtime.PYTHON_3_9, - index='producer/index.py', - handler='lambda_handler', - # bundling=python.BundlingOptions(asset_excludes=["consumer"]), - vpc=vpc.vpc, - vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS), - security_groups=[default_security_group], - role=producer_role, - memory_size=512, - timeout=Duration.minutes(15), - environment={ - 'KAFKA_CLUSTER_NAME': msk_cluster.cluster_name, - 'KAFKA_AUTH': 'iam', - 'KAFKA_BOOTSTRAP': msk_cluster.cluster_boostrap_brokers, - 'KAFKA_TOPIC': producer_topic, - 'GLUE_REGISTRY_NAME': producer_schema_registry.name, - 'DZ_DOMAIN_ID': domain_id, - }) - - msk_cluster.broker_security_group.add_ingress_rule(peer=default_security_group, connection=ec2.Port.tcp(9098)) - - ### Consumer environment - - consumer_dz_project = datazone.CfnProject(self, 'ConsumerProject', domain_identifier=domain_id, name='consumer') - - datazone.CfnProjectMembership(self, 'AdminConsumerMembership', - designation='PROJECT_OWNER', - domain_identifier=domain_id, - member=datazone.CfnProjectMembership.MemberProperty(user_identifier=datazone_portal_role.role_arn), - project_identifier=consumer_dz_project.attr_id) - - # msk_cluster.add_topic('ConsumerTopic', - # topic_definition=dsf.streaming.MskTopic( - # num_partitions=1, - # topic=consumer_topic)) - - # consumer_schema_registry = glue.CfnRegistry(self, 'ConsumerRegistry', - # name='consumer-registry') - - # dsf.governance.DataZoneGsrMskDataSource(self, - # 'ConsumerGsrDataSource', - # cluster_name=msk_cluster.cluster_name, - # domain_id=domain_id, - # project_id=consumer_dz_project.attr_id, - # registry_name=consumer_schema_registry.name, - # enable_schema_registry_event=True) - - flink_app_asset = assets.Asset(self, 'FlinkAppAsset', - path="resources/flink", # Path to the Flink application folder - bundling=BundlingOptions( - image=DockerImage.from_registry('maven:3.8.6-openjdk-11'), # Maven Docker image - output_type=BundlingOutput.SINGLE_FILE, # This ensures the jar is copied as-is to the S3 bucket - command=[ - 'sh', - '-c', - 'mvn clean package && cp target/[^original-]*.jar /asset-output/'])) - - # consumer_role = dsf.processing.SparkEmrServerlessRuntime.create_execution_role(self, 'ConsumerRole') - consumer_role = iam.Role(self, 'ConsumerRole', - assumed_by=iam.CompositePrincipal( - iam.ServicePrincipal('kinesisanalytics.amazonaws.com'), - # iam.ServicePrincipal('emr-serverless.amazonaws.com'), - iam.ServicePrincipal('datazone.amazonaws.com')), - inline_policies={ - 'consumerPolicy': iam.PolicyDocument( - statements=[ - iam.PolicyStatement( - actions=['datazone:PostLineageEvent'], - resources=[f'arn:{stack.partition}:datazone:{stack.region}:{stack.account}:domain/{domain_id}']), - # iam.PolicyStatement( - # actions=[ - # 'glue:GetRegistry', - # 'glue:CreateRegistry', - # 'glue:UpdateRegistry', - # 'glue:ListRegistries' - # ], - # resources=[consumer_schema_registry.attr_arn]), - iam.PolicyStatement( - actions=[ - 'glue:CreateSchema', - 'glue:UpdateSchema', - 'glue:GetSchema', - 'glue:ListSchemas', - 'glue:RegisterSchemaVersion', - 'glue:DeleteSchemaVersions', - 'glue:GetSchemaVersion', - 'glue:GetSchemaByDefinition', - 'glue:GetSchemaVersionsDiff', - 'glue:ListSchemaVersions', - 'glue:CheckSchemaVersionValidity', - 'glue:PutSchemaVersionMetadata', - 'glue:RemoveSchemaVersionMetadata', - 'glue:QuerySchemaVersionMetadata',], - resources=['*']), - iam.PolicyStatement( - actions=[ - 'ec2:DescribeVpcs', - 'ec2:DescribeSubnets', - 'ec2:DescribeSecurityGroups', - 'ec2:DescribeDhcpOptions', - 'ec2:CreateNetworkInterface', - 'ec2:CreateNetworkInterfacePermission', - 'ec2:DescribeNetworkInterfaces', - 'ec2:DeleteNetworkInterface', - ], - resources=['*']), - iam.PolicyStatement( - actions=[ - 's3:GetObject', - 's3:GetObjectVersion' - ], - resources=[ - flink_app_asset.bucket.bucket_arn, - flink_app_asset.bucket.arn_for_objects(flink_app_asset.s3_object_key) - ] - )])}) - - msk_cluster.grant_produce(consumer_topic, consumer_role) - - asset_grant = flink_app_asset.bucket.grant_read(identity=consumer_role, objects_key_pattern=flink_app_asset.s3_object_key) - - managed_flink_application = kda.CfnApplication(self, 'Managed Flink Application', - application_name='flink-consumer', - runtime_environment='FLINK-1_18', - service_execution_role=consumer_role.role_arn, - application_configuration=kda.CfnApplication.ApplicationConfigurationProperty( - application_code_configuration=kda.CfnApplication.ApplicationCodeConfigurationProperty( - code_content=kda.CfnApplication.CodeContentProperty( - s3_content_location=kda.CfnApplication.S3ContentLocationProperty( - bucket_arn=f'arn:aws:s3:::{flink_app_asset.s3_bucket_name}', - file_key=flink_app_asset.s3_object_key - ), - ), - code_content_type="ZIPFILE"), - application_snapshot_configuration=kda.CfnApplication.ApplicationSnapshotConfigurationProperty( - snapshots_enabled=True), - vpc_configurations=[kda.CfnApplication.VpcConfigurationProperty( - subnet_ids=vpc.vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS).subnet_ids, - security_group_ids=[vpc.vpc.vpc_default_security_group])], - environment_properties=kda.CfnApplication.EnvironmentPropertiesProperty( - property_groups=[kda.CfnApplication.PropertyGroupProperty( - property_group_id="FlinkApplicationProperties", - property_map={ - 'bootstrap.servers': msk_cluster.cluster_boostrap_brokers, - 'source.topic': producer_topic, - 'sourceClusterName': msk_cluster.cluster_name, - 'datazoneDomainID': domain_id, - 'lineageTransport': 'datazone', - 'region': self.region, - 'sourceRegistry': producer_schema_registry.name - })]), - flink_application_configuration=kda.CfnApplication.FlinkApplicationConfigurationProperty( - parallelism_configuration=kda.CfnApplication.ParallelismConfigurationProperty( - parallelism=1, - configuration_type="CUSTOM", - parallelism_per_kpu=1, - auto_scaling_enabled=False), - monitoring_configuration=kda.CfnApplication.MonitoringConfigurationProperty( - configuration_type="CUSTOM", - metrics_level="APPLICATION", - log_level="INFO")))) - - # dsf.governance.DataZoneHelpers.create_subscription_target(self, 'ConsumerSubscriptionTarget', - # custom_asset_type=msk_asset_type.msk_custom_asset_type, - # name='MskTopicsTarget', - # provider='dsf', - # environment_id=environment_id, - # authorized_principals=[consumer_role], - # manage_access_role=datazone_portal_role) diff --git a/framework/LICENSE b/framework/LICENSE index 5046ef26a..d64569567 100644 --- a/framework/LICENSE +++ b/framework/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2021-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 2832206a61c0c4b39565c7727b3447e995c6c039 Mon Sep 17 00:00:00 2001 From: Vincent Gromakowski Date: Wed, 30 Oct 2024 10:24:40 +0100 Subject: [PATCH 03/12] add example tests --- LICENSE | 2 +- framework/LICENSE | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/LICENSE b/LICENSE index d64569567..5046ef26a 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2021-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/framework/LICENSE b/framework/LICENSE index d64569567..5046ef26a 100644 --- a/framework/LICENSE +++ b/framework/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2021-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From d53805c1a528ac904310d440f2034443bf2853eb Mon Sep 17 00:00:00 2001 From: lmouhib Date: Wed, 30 Oct 2024 08:48:16 +0000 Subject: [PATCH 04/12] exclude path from pytest --- examples/datazone-msk-governance/pytest.ini | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 examples/datazone-msk-governance/pytest.ini diff --git a/examples/datazone-msk-governance/pytest.ini b/examples/datazone-msk-governance/pytest.ini new file mode 100644 index 000000000..3e7be7f60 --- /dev/null +++ b/examples/datazone-msk-governance/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +norecursedirs = datazone-msk-governance/resources \ No newline at end of file From 0513e3d058b555b7ab854c7f84dd0933a23f354c Mon Sep 17 00:00:00 2001 From: lmouhib Date: Wed, 30 Oct 2024 08:59:58 +0000 Subject: [PATCH 05/12] fix pytest.ini --- examples/datazone-msk-governance/pytest.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/datazone-msk-governance/pytest.ini b/examples/datazone-msk-governance/pytest.ini index 3e7be7f60..f8ef78ff4 100644 --- a/examples/datazone-msk-governance/pytest.ini +++ b/examples/datazone-msk-governance/pytest.ini @@ -1,2 +1,2 @@ [pytest] -norecursedirs = datazone-msk-governance/resources \ No newline at end of file +norecursedirs = resources \ No newline at end of file From ab22e02f7d9beb3f3182b0dc43cc1b849e1d91b9 Mon Sep 17 00:00:00 2001 From: lmouhib Date: Wed, 30 Oct 2024 09:20:07 +0000 Subject: [PATCH 06/12] fix stack --- examples/datazone-msk-governance/pytest.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/datazone-msk-governance/pytest.ini b/examples/datazone-msk-governance/pytest.ini index f8ef78ff4..e057b8286 100644 --- a/examples/datazone-msk-governance/pytest.ini +++ b/examples/datazone-msk-governance/pytest.ini @@ -1,2 +1,2 @@ [pytest] -norecursedirs = resources \ No newline at end of file +norecursedirs = ./resources \ No newline at end of file From ae35ac74614e1473c6714979dc212a7f91dee5e2 Mon Sep 17 00:00:00 2001 From: Vincent Gromakowski Date: Wed, 30 Oct 2024 15:24:52 +0100 Subject: [PATCH 07/12] adding tests --- examples/datazone-msk-governance/pytest.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/datazone-msk-governance/pytest.ini b/examples/datazone-msk-governance/pytest.ini index e057b8286..755b2ef84 100644 --- a/examples/datazone-msk-governance/pytest.ini +++ b/examples/datazone-msk-governance/pytest.ini @@ -1,2 +1,2 @@ [pytest] -norecursedirs = ./resources \ No newline at end of file +norecursedirs = ./resources ./cdk.out From a60d0cc2eb8fb6dbeb8b66c086c71e9dd676fe39 Mon Sep 17 00:00:00 2001 From: Vincent Gromakowski Date: Wed, 30 Oct 2024 17:28:33 +0100 Subject: [PATCH 08/12] fix example tests --- examples/datazone-msk-governance/pytest.ini | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 examples/datazone-msk-governance/pytest.ini diff --git a/examples/datazone-msk-governance/pytest.ini b/examples/datazone-msk-governance/pytest.ini deleted file mode 100644 index 755b2ef84..000000000 --- a/examples/datazone-msk-governance/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -norecursedirs = ./resources ./cdk.out From d33cbbbed83d6af251e88f31092edeff39fb0b00 Mon Sep 17 00:00:00 2001 From: vgkowski Date: Mon, 4 Nov 2024 12:21:06 +0100 Subject: [PATCH 09/12] add MSK bootstrap brokers to DataZone asset type --- .../lib/datazone/datazone-gsr-msk-datasource.ts | 1 + .../lib/datazone/datazone-msk-asset-type.ts | 5 +++++ .../resources/datazone-gsr-msk-datasource/index.mjs | 12 +++++++++--- .../governance/datazone-gsr-msk-datasource.test.ts | 5 ++++- .../unit/governance/datazone-msk-asset-type.test.ts | 2 +- 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/framework/src/governance/lib/datazone/datazone-gsr-msk-datasource.ts b/framework/src/governance/lib/datazone/datazone-gsr-msk-datasource.ts index 1d10e5a25..30e010d6d 100644 --- a/framework/src/governance/lib/datazone/datazone-gsr-msk-datasource.ts +++ b/framework/src/governance/lib/datazone/datazone-gsr-msk-datasource.ts @@ -154,6 +154,7 @@ export class DataZoneGsrMskDataSource extends TrackedConstruct { effect: Effect.ALLOW, actions: [ 'kafka:ListClustersV2', + 'kafka:GetBootstrapBrokers', ], resources: ['*'], }), diff --git a/framework/src/governance/lib/datazone/datazone-msk-asset-type.ts b/framework/src/governance/lib/datazone/datazone-msk-asset-type.ts index fe7b2772c..897f46562 100644 --- a/framework/src/governance/lib/datazone/datazone-msk-asset-type.ts +++ b/framework/src/governance/lib/datazone/datazone-msk-asset-type.ts @@ -79,6 +79,11 @@ export class DataZoneMskAssetType extends TrackedConstruct { type: 'String', required: true, }, + { + name: 'bootstrap_brokers', + type: 'String', + required: true, + }, ], required: true, }, diff --git a/framework/src/governance/lib/datazone/resources/datazone-gsr-msk-datasource/index.mjs b/framework/src/governance/lib/datazone/resources/datazone-gsr-msk-datasource/index.mjs index 69d9b5866..accdc0f70 100644 --- a/framework/src/governance/lib/datazone/resources/datazone-gsr-msk-datasource/index.mjs +++ b/framework/src/governance/lib/datazone/resources/datazone-gsr-msk-datasource/index.mjs @@ -3,7 +3,7 @@ import { DataZoneClient, GetAssetCommand, CreateAssetCommand, CreateAssetRevisionCommand, DeleteAssetCommand } from "@aws-sdk/client-datazone"; import { GlueClient, ListSchemasCommand, GetSchemaVersionCommand, GetSchemaCommand } from "@aws-sdk/client-glue"; -import { KafkaClient, ListClustersV2Command, DescribeClusterV2Command } from "@aws-sdk/client-kafka"; +import { KafkaClient, ListClustersV2Command, DescribeClusterV2Command, GetBootstrapBrokersCommand } from "@aws-sdk/client-kafka"; import { SSMClient, GetParametersByPathCommand, DeleteParameterCommand, PutParameterCommand } from "@aws-sdk/client-ssm"; @@ -34,6 +34,7 @@ export const handler = async () => { let clusterArn; let clusterUuid; let clusterType; + let bootstrapBrokers; try { // Step 1: Retrieve existing parameters @@ -53,7 +54,7 @@ export const handler = async () => { } console.log(assetMap); - // Step 2: List all Kafka clusters and find the ARN for the specified cluster + // Step 2: List all Kafka clusters, find the ARN and the bootstrap brokers for the specified cluster try { const listClustersCommand = new ListClustersV2Command({}); const listClustersResponse = await kafkaClient.send(listClustersCommand); @@ -77,6 +78,10 @@ export const handler = async () => { } console.log(`Cluster type for ${clusterName} is ${clusterType}`); + + const getBootstrapBrokersCommand = new GetBootstrapBrokersCommand({ ClusterArn: clusterArn }); + const getBootstrapBrokersResponse = await kafkaClient.send(getBootstrapBrokersCommand); + bootstrapBrokers = getBootstrapBrokersResponse.BootstrapBrokerStringSaslIam; } catch (err) { console.error('Error handling Kafka cluster:', err); @@ -139,7 +144,8 @@ export const handler = async () => { typeIdentifier: 'MskSourceReferenceFormType', content: JSON.stringify({ cluster_arn: clusterArn, - cluster_type: clusterType // Ensure clusterType is correctly included + cluster_type: clusterType, + bootstrap_brokers: bootstrapBrokers }), }, { diff --git a/framework/test/unit/governance/datazone-gsr-msk-datasource.test.ts b/framework/test/unit/governance/datazone-gsr-msk-datasource.test.ts index edcba348c..87c6bc762 100644 --- a/framework/test/unit/governance/datazone-gsr-msk-datasource.test.ts +++ b/framework/test/unit/governance/datazone-gsr-msk-datasource.test.ts @@ -151,7 +151,10 @@ describe('Creating a DataZone-GSR-MSK-Datasource with default configuration', () }, }), Match.objectLike({ - Action: 'kafka:ListClustersV2', + Action: [ + 'kafka:ListClustersV2', + 'kafka:GetBootstrapBrokers', + ], Effect: 'Allow', Resource: '*', }), diff --git a/framework/test/unit/governance/datazone-msk-asset-type.test.ts b/framework/test/unit/governance/datazone-msk-asset-type.test.ts index fceaa000f..2f3eed51e 100644 --- a/framework/test/unit/governance/datazone-msk-asset-type.test.ts +++ b/framework/test/unit/governance/datazone-msk-asset-type.test.ts @@ -89,7 +89,7 @@ describe ('Creating a DataZoneMskAssetType with default configuration', () => { }, { name: 'MskSourceReferenceFormType', - model: '\n structure MskSourceReferenceFormType {\n @required\ncluster_arn: String\n@required\ncluster_type: String\n }\n ', + model: '\n structure MskSourceReferenceFormType {\n @required\ncluster_arn: String\n@required\ncluster_type: String\n@required\nbootstrap_brokers: String\n }\n ', required: true, }, { From ea7df8c0563737fe60b6d5c473d0c50647a89c3e Mon Sep 17 00:00:00 2001 From: vgkowski Date: Mon, 4 Nov 2024 15:32:32 +0100 Subject: [PATCH 10/12] add metadata version --- .../datazone-msk-authorizer-metadata-collector/index.mjs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/framework/src/governance/lib/datazone/resources/datazone-msk-authorizer-metadata-collector/index.mjs b/framework/src/governance/lib/datazone/resources/datazone-msk-authorizer-metadata-collector/index.mjs index 2c3fda50d..7d232a35c 100644 --- a/framework/src/governance/lib/datazone/resources/datazone-msk-authorizer-metadata-collector/index.mjs +++ b/framework/src/governance/lib/datazone/resources/datazone-msk-authorizer-metadata-collector/index.mjs @@ -117,7 +117,8 @@ export const handler = async(event) => { Region: consumerRegion, Account: consumerAccountId, RolesArn: consumerRolesArn, - } + }, + MetadataVersion: 1 }; console.log(`Metadata collection results: ${JSON.stringify({ results }, null, 2)}`); From 3d4680bab88fc6d3a7e4b7c5bdc7ed86fadaf840 Mon Sep 17 00:00:00 2001 From: vgkowski Date: Mon, 4 Nov 2024 15:42:03 +0100 Subject: [PATCH 11/12] remove config from lambda --- .../resources/lambda/config.yaml | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 examples/datazone-msk-governance/resources/lambda/config.yaml diff --git a/examples/datazone-msk-governance/resources/lambda/config.yaml b/examples/datazone-msk-governance/resources/lambda/config.yaml deleted file mode 100644 index 782b26045..000000000 --- a/examples/datazone-msk-governance/resources/lambda/config.yaml +++ /dev/null @@ -1,18 +0,0 @@ -kafka: - bootstrap_servers: "xxxxxx.kafka-serverless.us-east-1.amazonaws.com:9098" - topic: "user" # Kafka topic name - cluster_name: "StreamingGovernanceStack" # Kafka cluster name - client_id: "example-client" # Kafka client ID - authentication: "iam" # Options: "unauthenticated", "sasl/scram", "iam" - group_id: "openlineage-consumer-group" # Kafka consumer group ID (this will also be the job name in OpenLineage) - auto_offset_reset: "earliest" # Offset reset: 'earliest' or 'latest' - -gsr: - registry_name: "governance-msk-registry" # Glue Schema Registry name - schema_file: "user.avsc" # Schema file (if applicable) - region: "us-east-1" # AWS region for Glue Schema Registry - -openlineage: - transport_type: "datazone" # Transport type for OpenLineage: e.g., 'datazone', 'print' - domain_id: "dzd_xxxxxx" # DataZone domain ID - region: "us-east-1" \ No newline at end of file From 5e5e167651aeb648ca3b4e64704a8135329f0a5e Mon Sep 17 00:00:00 2001 From: vgkowski Date: Mon, 4 Nov 2024 15:59:53 +0100 Subject: [PATCH 12/12] add instructions in README example --- examples/datazone-msk-governance/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/datazone-msk-governance/README.md b/examples/datazone-msk-governance/README.md index b39d0c9d0..687298a51 100644 --- a/examples/datazone-msk-governance/README.md +++ b/examples/datazone-msk-governance/README.md @@ -76,7 +76,8 @@ cdk deploy ## Create a custom environment in the consumer project 1. Enable the [custom AWS service blueprint](https://docs.aws.amazon.com/datazone/latest/userguide/enable-custom-blueprint.html) -2. Create a [custom environment](https://docs.aws.amazon.com/datazone/latest/userguide/create-custom-environment.html) in the `consumer` project +2. Create a [custom environment](https://docs.aws.amazon.com/datazone/latest/userguide/create-custom-environment.html) in the `consumer` project. +You can use any IAM Role that can be assumed by DataZone but for simplicity the CDK Stack provides the `StreamingGovernanceStack-ConsumerRole`. ## Create a subscription target on the custom environment