diff --git a/blockchainetl/jobs/exporters/indexer_item_exporter.py b/blockchainetl/jobs/exporters/indexer_item_exporter.py index 82765f02c..8defb74b5 100644 --- a/blockchainetl/jobs/exporters/indexer_item_exporter.py +++ b/blockchainetl/jobs/exporters/indexer_item_exporter.py @@ -32,26 +32,24 @@ def export_items(self, items): for item_type, file_name in self.item_type_to_file_mapping.items(): items = items_grouped_by_type.get(item_type) if items: - # file_name = f'{item_type}.csv' if item_type not in self.files: - self.files[item_type] = open(file_name, 'w', newline='', - encoding='UTF-8') + table = self.item_type_to_table_mapping[item_type] + converted_items = list(self.convert_items(items, table)) + with open(file_name, 'w', newline='', encoding='UTF-8') as file: + csv_writer = csv.writer(file) + csv_writer.writerows(converted_items) + self.files[item_type] = file_name - table = self.item_type_to_table_mapping[item_type] - csv_writer = csv.writer(self.files[item_type]) - converted_items = list(self.convert_items(items, table)) write_rows += len(converted_items) - csv_writer.writerows(converted_items) duration = datetime.now() - start_time self.logger.info( f"Finished write. Total items processed: {write_rows}. " f"Took {str(duration)}." ) - self.close() + if write_rows > 0: self.call_go() - self.files.clear() def convert_items(self, items, table): @@ -70,16 +68,15 @@ def convert_items(self, items, table): yield [converted_item.get(column) for column in columns] def close(self): - for file in self.files.values(): - file.close() + pass def call_go(self): start_time = datetime.now() go_program = os.environ.get('GO_PROGRAM_PATH', './indexer') - command = [go_program, '--transactions', self.files["transaction"].name, + command = [go_program, '--transactions', self.files["transaction"], '--logs', - self.files["log"].name] + self.files["log"]] try: result = subprocess.run(command, check=True, stdout=subprocess.PIPE,