From 2da3d0f89825a7541a2eef266908e79000ff7479 Mon Sep 17 00:00:00 2001 From: Ivan Ivanov <54508530+divakaivan@users.noreply.github.com> Date: Mon, 16 Dec 2024 09:08:46 +0900 Subject: [PATCH] fix aggregation_job 1. change the pattern to `"yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"` 2. change event_timestamp col name to event_time to match schema The above changes make the aggregation job work. --- .../4-apache-flink-training/src/job/aggregation_job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py b/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py index 7d11266f..a10d89a7 100644 --- a/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py +++ b/bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py @@ -49,16 +49,16 @@ def create_processed_events_source_kafka(t_env): kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "") kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "") table_name = "process_events_kafka" - pattern = "yyyy-MM-dd HH:mm:ss" + pattern = "yyyy-MM-dd''T''HH:mm:ss.SSS''Z''" sink_ddl = f""" CREATE TABLE {table_name} ( ip VARCHAR, - event_timestamp VARCHAR, + event_time VARCHAR, referrer VARCHAR, host VARCHAR, url VARCHAR, geodata VARCHAR, - window_timestamp AS TO_TIMESTAMP(event_timestamp, '{pattern}'), + window_timestamp AS TO_TIMESTAMP(event_time, '{pattern}'), WATERMARK FOR window_timestamp AS window_timestamp - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka',