Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dumpit.py #14

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 46 additions & 33 deletions dumpit.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import arrow
import json
import logging
import os

import arrow


class Dumper():
def __init__(self, config_fname):
""" Initialize crawler with variables from config file"""
Expand All @@ -14,8 +16,8 @@ def __init__(self, config_fname):
def fname(self, startdate):
"""Construct the folder and filename for the given date and config file"""

dump_folder = f"{self.config['dump_root']}/{startdate.year:04d}/{startdate.month:02d}/{startdate.day:02d}/"
dump_fname = self.config['dump_fname']+'_'+startdate.format("YYYY-MM-DD")+'.csv'
dump_folder = os.path.join(self.config['dump_root'], startdate.format('YYYY/MM/DD'))
dump_fname = f'{self.config["dump_fname"]}_{startdate.format("YYYY-MM-DD")}.csv'

return dump_folder, dump_fname

Expand All @@ -26,64 +28,76 @@ def dump(self, date, compress='lz4'):

query = self.config['query'].format(startdate=startdate, enddate=enddate)
dump_folder, dump_fname = self.fname(startdate)
intermediate_output_file = os.path.join(dump_folder, dump_fname)
final_output_file = intermediate_output_file
if compress:
final_output_file += f'.{compress}'

# Check if dump already exists
if(os.path.exists(dump_folder+dump_fname+'.lz4')
and os.path.getsize(dump_folder+dump_fname+'.lz4') > 1000):
logging.error(f'{dump_folder}{dump_fname}.lz4 already exists')
if os.path.exists(final_output_file):
logging.error(f'{final_output_file} already exists')
return

# create directories if needed
os.makedirs(dump_folder, exist_ok=True)
os.makedirs(dump_folder, exist_ok=True)

cmd = r"""psql -d {db} -h {psql_host} -U {psql_role} -c "\copy ({query}) to '{fname}' csv header;" """.format(
db=self.config['database'],
psql_host=PSQL_HOST,
psql_role=PSQL_ROLE,
query=query,
fname=dump_folder+dump_fname
)
fname=intermediate_output_file
)

logging.debug(f'Dumping data to csv file ({cmd})...')
ret_value = os.system( cmd )
ret_value = os.system(cmd)
if ret_value != 0:
logging.error(f'Could not dump data? Returned value: {ret_value}')

if compress:
cmd = f'{compress} -f {dump_folder}{dump_fname} {dump_folder}{dump_fname}.{compress}'
cmd = f'{compress} -f {intermediate_output_file} {final_output_file}'
logging.debug(f'Compressing data ({cmd})...')
ret_value = os.system( cmd )
os.remove(dump_folder+dump_fname)
ret_value = os.system(cmd)
os.remove(intermediate_output_file)

if ret_value != 0:
logging.error(f'Could not compress data? Returned value: {ret_value}')

if not os.path.exists(final_output_file):
logging.error(f'No output file created: {final_output_file}')
return

if os.path.getsize(final_output_file) < 1000:
logging.warning(f'Output file was empty. Deleting {final_output_file}')
os.remove(final_output_file)


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s %(processName)s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.StreamHandler()])
format='%(asctime)s %(processName)s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.StreamHandler()])

global PSQL_HOST
PSQL_HOST = os.environ["PSQL_HOST"]
global PSQL_ROLE
PSQL_ROLE = os.environ["PSQL_ROLE"]

parser = argparse.ArgumentParser(
description='Dump data from the database to a CSV file')
parser.add_argument('--config', type=str,
help='configuration file with query and file structure details')
parser.add_argument('--dates', default='', type=str,
help='file containing a list of dates to dump (one date per line)')
parser.add_argument('--date', default='', type=str,
help='date to dump (e.g. 2022-01-20)')
parser.add_argument('--startdate', default='', type=str,
help='start date for a range of dates. Should also specify enddate')
parser.add_argument('--enddate', default='', type=str,
help='end date for a range of dates. Should also specify startdate')
parser.add_argument('--frequency', default='day', type=str,
help='frequency for a range of dates (default: day)')
description='Dump data from the database to a CSV file')
parser.add_argument('--config', type=str,
help='configuration file with query and file structure details')
parser.add_argument('--dates', default='', type=str,
help='file containing a list of dates to dump (one date per line)')
parser.add_argument('--date', default='', type=str,
help='date to dump (e.g. 2022-01-20)')
parser.add_argument('--startdate', default='', type=str,
help='start date for a range of dates. Should also specify enddate')
parser.add_argument('--enddate', default='', type=str,
help='end date for a range of dates. Should also specify startdate')
parser.add_argument('--frequency', default='day', type=str,
help='frequency for a range of dates (default: day)')

args = parser.parse_args()

Expand Down Expand Up @@ -111,4 +125,3 @@ def dump(self, date, compress='lz4'):
# Log any error that could happen
except Exception as e:
logging.error('Error', exc_info=e)

Loading