diff --git a/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py b/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py index 7d11266f..a10d89a7 100644 --- a/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py +++ b/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py @@ -49,16 +49,16 @@ def create_processed_events_source_kafka(t_env): kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "") kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "") table_name = "process_events_kafka" - pattern = "yyyy-MM-dd HH:mm:ss" + pattern = "yyyy-MM-dd''T''HH:mm:ss.SSS''Z''" sink_ddl = f""" CREATE TABLE {table_name} ( ip VARCHAR, - event_timestamp VARCHAR, + event_time VARCHAR, referrer VARCHAR, host VARCHAR, url VARCHAR, geodata VARCHAR, - window_timestamp AS TO_TIMESTAMP(event_timestamp, '{pattern}'), + window_timestamp AS TO_TIMESTAMP(event_time, '{pattern}'), WATERMARK FOR window_timestamp AS window_timestamp - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka',