Skip to content

Commit

Permalink
fix aggregation_job
Browse files Browse the repository at this point in the history
1. change the pattern to `"yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"`
2. change event_timestamp col name to event_time to match schema

The above changes make the aggregation job work.
  • Loading branch information
divakaivan authored Dec 16, 2024
1 parent a3a98a7 commit 2da3d0f
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ def create_processed_events_source_kafka(t_env):
kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
table_name = "process_events_kafka"
pattern = "yyyy-MM-dd HH:mm:ss"
pattern = "yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"
sink_ddl = f"""
CREATE TABLE {table_name} (
ip VARCHAR,
event_timestamp VARCHAR,
event_time VARCHAR,
referrer VARCHAR,
host VARCHAR,
url VARCHAR,
geodata VARCHAR,
window_timestamp AS TO_TIMESTAMP(event_timestamp, '{pattern}'),
window_timestamp AS TO_TIMESTAMP(event_time, '{pattern}'),
WATERMARK FOR window_timestamp AS window_timestamp - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
Expand Down

0 comments on commit 2da3d0f

Please sign in to comment.