diff --git a/.circleci/integration/target-config.json b/.circleci/integration/target-config.json index db40257f..09a682da 100644 --- a/.circleci/integration/target-config.json +++ b/.circleci/integration/target-config.json @@ -1,4 +1,5 @@ { "postgres_database": "target_postgres_test", - "postgres_username": "postgres" + "postgres_username": "postgres", + "logging_level": "DEBUG" } diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 5b23a868..c594b6b5 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -1,14 +1,18 @@ from copy import deepcopy +import math import uuid import arrow from jsonschema import Draft4Validator, FormatChecker from jsonschema.exceptions import ValidationError +import singer from target_postgres import json_schema from target_postgres.exceptions import SingerStreamError from target_postgres.pysize import get_size +LOGGER = singer.get_logger() + SINGER_RECEIVED_AT = '_sdc_received_at' SINGER_BATCHED_AT = '_sdc_batched_at' @@ -19,6 +23,9 @@ SINGER_LEVEL = '_sdc_level_{}_id' SINGER_VALUE = '_sdc_value' +DEFAULT__MAX_ROWS = 200000 +DEFAULT__MAX_BUFFER_SIZE = 104857600 # 100MB + class BufferedSingerStream(): def __init__(self, @@ -28,13 +35,16 @@ def __init__(self, *args, invalid_records_detect=None, invalid_records_threshold=None, - max_rows=200000, - max_buffer_size=104857600, # 100MB + max_rows=DEFAULT__MAX_ROWS, + max_buffer_size=DEFAULT__MAX_BUFFER_SIZE, **kwargs): """ :param invalid_records_detect: Defaults to True when value is None :param invalid_records_threshold: Defaults to 0 when value is None + :param max_rows: Defaults to 200000 when value is Falsey + :param max_buffer_size: Defaults to 100MB when value if Falsey """ + self.schema = None self.key_properties = None self.validator = None @@ -42,8 +52,8 @@ def __init__(self, self.stream = stream self.invalid_records = [] - self.max_rows = max_rows - self.max_buffer_size = max_buffer_size + self.max_rows = max_rows or DEFAULT__MAX_ROWS + self.max_buffer_size = max_buffer_size or DEFAULT__MAX_BUFFER_SIZE self.invalid_records_detect = invalid_records_detect self.invalid_records_threshold = invalid_records_threshold @@ -58,6 +68,14 @@ def __init__(self, self.__size = 0 self.__lifetime_max_version = None + self.__debug_reporting_interval = math.ceil(self.max_rows / 10.0) + + LOGGER.debug('Stream `{}` created. `max_rows`: {} `max_buffer_size`: {}'.format( + self.stream, + self.max_rows, + self.max_buffer_size + )) + def update_schema(self, schema, key_properties): # In order to determine whether a value _is in_ properties _or not_ we need to flatten `$ref`s etc. self.schema = json_schema.simplify(schema) @@ -104,10 +122,22 @@ def count(self): @property def buffer_full(self): if self.__count >= self.max_rows: + LOGGER.debug('Stream `{}` cutting batch due to row count being {:.2%} {}/{}'.format( + self.stream, + self.__count / self.max_rows, + self.__count, + self.max_rows + )) return True if self.__count > 0: if self.__size >= self.max_buffer_size: + LOGGER.debug('Stream `{}` cutting batch due to bytes being {:.2%} {}/{}'.format( + self.stream, + self.__size / self.max_buffer_size, + self.__size, + self.max_buffer_size + )) return True return False @@ -120,17 +150,43 @@ def __update_version(self, version): if version is None or (self.__lifetime_max_version is not None and self.__lifetime_max_version >= version): return None - ## TODO: log warning about earlier records detected + if self.__count: + LOGGER.debug('WARNING: Stream `{}` dropping {} records due to version being updated from: `{}` to: `{}`'.format( + self.stream, + self.__count, + self.__lifetime_max_version, + version + )) self.flush_buffer() self.__lifetime_max_version = version + def _debug_report_on_buffer_sizes(self): + if self.__count % self.__debug_reporting_interval == 0: + LOGGER.debug('Stream `{}` has {:.2%} {}/{} rows filled'.format( + self.stream, + self.__count / self.max_rows, + self.__count, + self.max_rows + )) + LOGGER.debug('Stream `{}` has {:.2%} {}/{} bytes filled'.format( + self.stream, + self.__size / self.max_buffer_size, + self.__size, + self.max_buffer_size + )) + def add_record_message(self, record_message): add_record = True self.__update_version(record_message.get('version')) if self.__lifetime_max_version != record_message.get('version'): + LOGGER.debug('WARNING: Stream `{}` dropping record due to version mismatch. Expected: `{}`, Got: `{}`'.format( + self.stream, + self.__lifetime_max_version, + record_message.get('version') + )) return None try: @@ -150,6 +206,8 @@ def add_record_message(self, record_message): self.invalid_records_threshold), self.invalid_records) + self._debug_report_on_buffer_sizes() + def peek_buffer(self): return self.__buffer @@ -181,11 +239,13 @@ def get_batch(self): return records def flush_buffer(self): - _buffer = self.__buffer + LOGGER.debug('Stream `{}` flushing buffer...'.format( + self.stream + )) + self.__buffer = [] self.__size = 0 self.__count = 0 - return _buffer def peek_invalid_records(self): return self.invalid_records diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index 32ddb843..7a136e9f 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -42,6 +42,8 @@ def stream_to_target(stream, target, config={}): state_tracker = StreamTracker(target, state_support) _run_sql_hook('before_run_sql', config, target) + line_stats = {} # dict of + try: if not config.get('disable_collection', False): _async_send_usage_stats() @@ -52,9 +54,20 @@ def stream_to_target(stream, target, config={}): max_batch_size = config.get('max_batch_size', 104857600) # 100MB batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50)) + LOGGER.info('Streaming to target with the following configuration: {}'.format( + { + 'state_support': state_support, + 'invalid_records_detect': invalid_records_detect, + 'invalid_records_threshold': invalid_records_threshold, + 'max_batch_rows': max_batch_rows, + 'max_batch_size': max_batch_size, + 'batch_detection_threshold': batch_detection_threshold + })) + line_count = 0 for line in stream: - _line_handler(state_tracker, + _line_handler(line_stats, + state_tracker, target, invalid_records_detect, invalid_records_threshold, @@ -62,10 +75,18 @@ def stream_to_target(stream, target, config={}): max_batch_size, line ) - if line_count > 0 and line_count % batch_detection_threshold == 0: - state_tracker.flush_streams() + line_count += 1 + if line_count % batch_detection_threshold == 0: + LOGGER.debug('Attempting to flush streams at `line_count` {}, with records distribution of: {}'.format( + line_count, + _records_distribution(line_count, line_stats) + )) + state_tracker.flush_streams() + + LOGGER.debug('Forcing flush of streams. Input depleted.') + state_tracker.flush_streams(force=True) _run_sql_hook('after_run_sql', config, target) @@ -78,6 +99,13 @@ def stream_to_target(stream, target, config={}): _report_invalid_records(state_tracker.streams) +def _records_distribution(count, stats): + return { (k, '{:.2%} ({})'.format( + v / count, + v + )) for k, v in stats.items() } + + def _report_invalid_records(streams): for stream_buffer in streams.values(): if stream_buffer.peek_invalid_records(): @@ -87,7 +115,7 @@ def _report_invalid_records(streams): )) -def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, +def _line_handler(line_stats, state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, max_batch_size, line): try: line_data = json.loads(line) @@ -98,7 +126,9 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records if 'type' not in line_data: raise TargetError('`type` is a required key: {}'.format(line)) - if line_data['type'] == 'SCHEMA': + line_type = line_data['type'] + + if line_type == 'SCHEMA': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) @@ -123,21 +153,19 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records schema, key_properties, invalid_records_detect=invalid_records_detect, - invalid_records_threshold=invalid_records_threshold) - if max_batch_rows: - buffered_stream.max_rows = max_batch_rows - if max_batch_size: - buffered_stream.max_buffer_size = max_batch_size + invalid_records_threshold=invalid_records_threshold, + max_rows=max_batch_rows, + max_buffer_size=max_batch_size) state_tracker.register_stream(stream, buffered_stream) else: state_tracker.streams[stream].update_schema(schema, key_properties) - elif line_data['type'] == 'RECORD': + elif line_type == 'RECORD': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) state_tracker.handle_record_message(line_data['stream'], line_data) - elif line_data['type'] == 'ACTIVATE_VERSION': + elif line_type == 'ACTIVATE_VERSION': if 'stream' not in line_data: raise TargetError('`stream` is a required key: {}'.format(line)) if 'version' not in line_data: @@ -149,13 +177,15 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records stream_buffer = state_tracker.streams[line_data['stream']] state_tracker.flush_stream(line_data['stream']) target.activate_version(stream_buffer, line_data['version']) - elif line_data['type'] == 'STATE': + elif line_type == 'STATE': state_tracker.handle_state_message(line_data) else: raise TargetError('Unknown message type {} in message {}'.format( - line_data['type'], + line_type, line)) + line_stats[line_type] = line_stats.get(line_type, 0) + 1 + def _send_usage_stats(): try: