From 2ad7cbb8eca1a98f6cc2a5a585cba2961a898407 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 11:32:45 -0800 Subject: [PATCH 01/11] Feature: Add in standard logging for configuration of target, and what the batching logic is doing periodically. Helps with issues. --- target_postgres/singer_stream.py | 40 ++++++++++++++++++++++++++++ target_postgres/target_tools.py | 45 ++++++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 8 deletions(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 5b23a868..dcbce303 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -4,11 +4,14 @@ import arrow from jsonschema import Draft4Validator, FormatChecker from jsonschema.exceptions import ValidationError +import singer from target_postgres import json_schema from target_postgres.exceptions import SingerStreamError from target_postgres.pysize import get_size +LOGGER = singer.get_LOGGER() + SINGER_RECEIVED_AT = '_sdc_received_at' SINGER_BATCHED_AT = '_sdc_batched_at' @@ -58,6 +61,14 @@ def __init__(self, self.__size = 0 self.__lifetime_max_version = None + self.__debug_reporting_interval = int(self.max_rows / 10) + + LOGGER.debug('Stream `{}` created. `max_rows`: {} `max_buffer_size`: {}'.format( + self.stream, + self.max_rows, + self.max_buffer_size + )) + def update_schema(self, schema, key_properties): # In order to determine whether a value _is in_ properties _or not_ we need to flatten `$ref`s etc. self.schema = json_schema.simplify(schema) @@ -104,10 +115,22 @@ def count(self): @property def buffer_full(self): if self.__count >= self.max_rows: + LOGGER.debug('Stream `{}` cutting batch due to row count being {:.2%} {}/{}'.format( + self.stream, + self.__count / self.max_rows, + self.__count, + self.max_rows + )) return True if self.__count > 0: if self.__size >= self.max_buffer_size: + LOGGER.debug('Stream `{}` cutting batch due to bytes being {:.2%} {}/{}'.format( + self.stream, + self.__size / self.max_buffer_size, + self.__size, + self.max_buffer_size + )) return True return False @@ -125,6 +148,21 @@ def __update_version(self, version): self.flush_buffer() self.__lifetime_max_version = version + def _debug_report_on_buffer_sizes(self): + if self.__count % self.__debug_reporting_interval == 0: + LOGGER.debug('Stream `{}` has {:.2%} {}/{} rows filled'.format( + self.stream, + self.__count / self.max_rows, + self.__count, + self.max_rows + )) + LOGGER.debug('Stream `{}` has {:.2%} {}/{} bytes filled'.format( + self.stream, + self.__size / self.max_buffer_size, + self.__size, + self.max_buffer_size + )) + def add_record_message(self, record_message): add_record = True @@ -150,6 +188,8 @@ def add_record_message(self, record_message): self.invalid_records_threshold), self.invalid_records) + self._debug_report_on_buffer_sizes() + def peek_buffer(self): return self.__buffer diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 32ddb843..5032a90c 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -42,6 +42,8 @@ def stream_to_target(stream, target, config={}): state_tracker = StreamTracker(target, state_support) _run_sql_hook('before_run_sql', config, target) + line_stats = {} # dict of + try: if not config.get('disable_collection', False): _async_send_usage_stats() @@ -52,6 +54,16 @@ def stream_to_target(stream, target, config={}): max_batch_size = config.get('max_batch_size', 104857600) # 100MB batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50)) + LOGGER.info('Streaming to target with the following configuration: {}'.format( + { + 'state_support': state_support, + 'invalid_records_detect': invalid_records_detect, + 'invalid_records_threshold': invalid_records_threshold, + 'max_batch_rows': max_batch_rows, + 'max_batch_size': max_batch_size, + 'batch_detection_threshold': batch_detection_threshold + })) + line_count = 0 for line in stream: _line_handler(state_tracker, @@ -62,10 +74,16 @@ def stream_to_target(stream, target, config={}): max_batch_size, line ) - if line_count > 0 and line_count % batch_detection_threshold == 0: - state_tracker.flush_streams() + line_count += 1 + if line_count % batch_detection_threshold == 0: + LOGGER.debug('Attempting to flush streams at `line_count` {}, with records distribution of: {}'.format( + line_count, + _records_distribution(line_count, line_stats) + )) + state_tracker.flush_streams() + state_tracker.flush_streams(force=True) _run_sql_hook('after_run_sql', config, target) @@ -78,6 +96,13 @@ def stream_to_target(stream, target, config={}): _report_invalid_records(state_tracker.streams) +def _records_distribution(count, stats): + return { (k, '{:.2%} ({})'.format( + v / count, + v + )) for k, v in stats } + + def _report_invalid_records(streams): for stream_buffer in streams.values(): if stream_buffer.peek_invalid_records(): @@ -87,7 +112,7 @@ def _report_invalid_records(streams): )) -def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, +def _line_handler(line_stats, state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, max_batch_size, line): try: line_data = json.loads(line) @@ -98,7 +123,9 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records if 'type' not in line_data: raise TargetError('`type` is a required key: {}'.format(line)) - if line_data['type'] == 'SCHEMA': + line_type = line_data['type'] + + if line_type == 'SCHEMA': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) @@ -132,12 +159,12 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records state_tracker.register_stream(stream, buffered_stream) else: state_tracker.streams[stream].update_schema(schema, key_properties) - elif line_data['type'] == 'RECORD': + elif line_type == 'RECORD': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) state_tracker.handle_record_message(line_data['stream'], line_data) - elif line_data['type'] == 'ACTIVATE_VERSION': + elif line_type == 'ACTIVATE_VERSION': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) if 'version' not in line_data: @@ -149,13 +176,15 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records stream_buffer = state_tracker.streams[line_data['stream']] state_tracker.flush_stream(line_data['stream']) target.activate_version(stream_buffer, line_data['version']) - elif line_data['type'] == 'STATE': + elif line_type == 'STATE': state_tracker.handle_state_message(line_data) else: raise TargetError('Unknown message type {} in message {}'.format( - line_data['type'], + line_type, line)) + line_stats[line_type] = line_stats.get(line_type, 0) + 1 + def _send_usage_stats(): try: From 8e7e2a080ca1fb137343454a7709b8f027ea04c3 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 11:33:18 -0800 Subject: [PATCH 02/11] Feature: When debug is enabled, warn when we drop records due to the version being mismatched with the current buffer's max version --- target_postgres/singer_stream.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index dcbce303..f4e83224 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -143,7 +143,13 @@ def __update_version(self, version): if version is None or (self.__lifetime_max_version is not None and self.__lifetime_max_version >= version): return None - ## TODO: log warning about earlier records detected + if self.__count: + LOGGER.debug('WARNING: Stream `{}` dropping {} records due to version being updated from: `{}` to: `{}`'.format( + self.stream, + self.__count, + self.__lifetime_max_version, + version + )) self.flush_buffer() self.__lifetime_max_version = version @@ -169,6 +175,11 @@ def add_record_message(self, record_message): self.__update_version(record_message.get('version')) if self.__lifetime_max_version != record_message.get('version'): + LOGGER.debug('WARNING: Stream `{}` dropping record due to version mismatch. Expected: `{}`, Got: `{}`'.format( + self.stream, + self.__lifetime_max_version, + record_message.get('version') + )) return None try: From 04b5dfa5e2737a2af8cdec9f289f6018d27e2c88 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 11:34:12 -0800 Subject: [PATCH 03/11] Refactor: BufferedSingerStream never uses return value of flush_buffer --- target_postgres/singer_stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index f4e83224..37b599b6 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -232,11 +232,9 @@ def get_batch(self): return records def flush_buffer(self): - _buffer = self.__buffer self.__buffer = [] self.__size = 0 self.__count = 0 - return _buffer def peek_invalid_records(self): return self.invalid_records From 3d1b6ce8480dc4cac260f57f6f401f5cc2b6816a Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 11:35:03 -0800 Subject: [PATCH 04/11] Feature: Debug log whenever stream's buffer is being flushed --- target_postgres/singer_stream.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 37b599b6..b04a71fa 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -232,6 +232,10 @@ def get_batch(self): return records def flush_buffer(self): + LOGGER.debug('Stream `{}` flushing buffer...'.format( + self.stream + )) + self.__buffer = [] self.__size = 0 self.__count = 0 From 507a9c798d586b51be998387faa1b947741ddb66 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 11:40:21 -0800 Subject: [PATCH 05/11] Refactor: BufferedSingerStream defaults Falsey buffer size values to make initialization logic and reporting cleaner --- target_postgres/singer_stream.py | 14 ++++++++++---- target_postgres/target_tools.py | 8 +++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index b04a71fa..bdb3bc77 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -22,6 +22,9 @@ SINGER_LEVEL = '_sdc_level_{}_id' SINGER_VALUE = '_sdc_value' +DEFAULT__MAX_ROWS = 200000 +DEFAULT__MAX_BUFFER_SIZE = 104857600 # 100MB + class BufferedSingerStream(): def __init__(self, @@ -31,13 +34,16 @@ def __init__(self, *args, invalid_records_detect=None, invalid_records_threshold=None, - max_rows=200000, - max_buffer_size=104857600, # 100MB + max_rows=DEFAULT__MAX_ROWS, + max_buffer_size=DEFAULT__MAX_BUFFER_SIZE, **kwargs): """ :param invalid_records_detect: Defaults to True when value is None :param invalid_records_threshold: Defaults to 0 when value is None + :param max_rows: Defaults to 200000 when value is Falsey + :param max_buffer_size: Defaults to 100MB when value if Falsey """ + self.schema = None self.key_properties = None self.validator = None @@ -45,8 +51,8 @@ def __init__(self, self.stream = stream self.invalid_records = [] - self.max_rows = max_rows - self.max_buffer_size = max_buffer_size + self.max_rows = max_rows or DEFAULT__MAX_ROWS + self.max_buffer_size = max_buffer_size or DEFAULT__MAX_BUFFER_SIZE self.invalid_records_detect = invalid_records_detect self.invalid_records_threshold = invalid_records_threshold diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 5032a90c..57101d34 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -150,11 +150,9 @@ def _line_handler(line_stats, state_tracker, target, invalid_records_detect, inv schema, key_properties, invalid_records_detect=invalid_records_detect, - invalid_records_threshold=invalid_records_threshold) - if max_batch_rows: - buffered_stream.max_rows = max_batch_rows - if max_batch_size: - buffered_stream.max_buffer_size = max_batch_size + invalid_records_threshold=invalid_records_threshold, + max_rows=max_batch_rows, + max_buffer_size=max_batch_size) state_tracker.register_stream(stream, buffered_stream) else: From 562e3162629abc954e2541b2a3c2849c79887c4c Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 17:38:28 -0600 Subject: [PATCH 06/11] -- --- target_postgres/singer_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index bdb3bc77..5723168d 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -10,7 +10,7 @@ from target_postgres.exceptions import SingerStreamError from target_postgres.pysize import get_size -LOGGER = singer.get_LOGGER() +LOGGER = singer.get_logger() SINGER_RECEIVED_AT = '_sdc_received_at' From 64e44cb7f99d12f9812321a82302e684d9d556e0 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 17:41:05 -0600 Subject: [PATCH 07/11] -- --- target_postgres/target_tools.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 57101d34..aa7b0760 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -66,7 +66,8 @@ def stream_to_target(stream, target, config={}): line_count = 0 for line in stream: - _line_handler(state_tracker, + _line_handler(line_stats, + state_tracker, target, invalid_records_detect, invalid_records_threshold, From 805f0f947384ea0545cb58d1d6adaa7021b5d3f1 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 17:42:55 -0600 Subject: [PATCH 08/11] -- --- target_postgres/target_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index aa7b0760..3c19b462 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -101,7 +101,7 @@ def _records_distribution(count, stats): return { (k, '{:.2%} ({})'.format( v / count, v - )) for k, v in stats } + )) for k, v in stats.items() } def _report_invalid_records(streams): From dbfcde600ee7d2227087b02b34782aaf35ca83cf Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 17:55:32 -0600 Subject: [PATCH 09/11] -- --- target_postgres/singer_stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 5723168d..c594b6b5 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -1,4 +1,5 @@ from copy import deepcopy +import math import uuid import arrow @@ -67,7 +68,7 @@ def __init__(self, self.__size = 0 self.__lifetime_max_version = None - self.__debug_reporting_interval = int(self.max_rows / 10) + self.__debug_reporting_interval = math.ceil(self.max_rows / 10.0) LOGGER.debug('Stream `{}` created. `max_rows`: {} `max_buffer_size`: {}'.format( self.stream, From e2830bddc3c4dabffeb95b34f1c5fe93615ecb15 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 18:03:30 -0600 Subject: [PATCH 10/11] Housekeeping: More logging for integration tests --- .circleci/integration/target-config.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/integration/target-config.json b/.circleci/integration/target-config.json index db40257f..09a682da 100644 --- a/.circleci/integration/target-config.json +++ b/.circleci/integration/target-config.json @@ -1,4 +1,5 @@ { "postgres_database": "target_postgres_test", - "postgres_username": "postgres" + "postgres_username": "postgres", + "logging_level": "DEBUG" } From d57c7bd570f851059dc288ac4f8aaa01983cfe93 Mon Sep 17 00:00:00 2001 From: AlexanderMann Date: Sun, 10 Nov 2019 18:14:08 -0600 Subject: [PATCH 11/11] Feature: Note when stream is complete and process is ending --- target_postgres/target_tools.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 3c19b462..7a136e9f 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -85,6 +85,8 @@ def stream_to_target(stream, target, config={}): )) state_tracker.flush_streams() + LOGGER.debug('Forcing flush of streams. Input depleted.') + state_tracker.flush_streams(force=True) _run_sql_hook('after_run_sql', config, target)