diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index b385a99..8dbbdef 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -35,6 +35,7 @@ class DataPusherPlusConfig(MutableMapping): # ckan_service_provider settings SQLALCHEMY_DATABASE_URI: str = _DATABASE_URI WRITE_ENGINE_URL: str = _WRITE_ENGINE_URL + COPY_READBUFFER_SIZE: int = 1048576 DEBUG: bool = False TESTING: bool = False SECRET_KEY: str = str(uuid.uuid4()) diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 429f766..98af5ed 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -659,7 +659,55 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if resource_format.upper() == "CSV": logger.info("Normalizing/UTF-8 transcoding {}...".format(resource_format)) else: - logger.info("Normalizing/UTF-8 transcoding {} to CSV...".format(format)) + # if not CSV (e.g. TSV, TAB, etc.) we need to normalize to CSV + logger.info( + "Normalizing/UTF-8 transcoding {} to CSV...".format(resource_format) + ) + + qsv_input_utf_8_encoded_csv = os.path.join(temp_dir, 'qsv_input_utf_8_encoded.csv') + + # using uchardet to determine encoding + file_encoding = subprocess.run( + [ + "uchardet", + tmp + ], + check=True, + capture_output=True, + text=True, + ) + logger.info("Identified encoding of the file: {}".format(file_encoding.stdout)) + + # trim the encoding string + file_encoding.stdout = file_encoding.stdout.strip() + + # using iconv to re-encode in UTF-8 + if file_encoding.stdout != "UTF-8": + logger.info("File is not UTF-8 encoded. Re-encoding from {} to UTF-8".format( + file_encoding.stdout) + ) + try: + subprocess.run( + [ + "iconv", + "-f", + file_encoding.stdout, + "-t", + "UTF-8", + tmp, + "--output", + qsv_input_utf_8_encoded_csv, + ], + check=True, + ) + except subprocess.CalledProcessError as e: + # return as we can't push a non UTF-8 CSV + logger.error( + "Job aborted as the file cannot be re-encoded to UTF-8: {}.".format(e) + ) + return + else: + qsv_input_utf_8_encoded_csv = tmp try: qsv_input = subprocess.run( [ @@ -693,7 +741,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): ) except subprocess.CalledProcessError as e: # return as we can't push an invalid CSV file - validate_error_msg = qsv_validate.stderr + validate_error_msg = e.stderr logger.error("Invalid CSV! Job aborted: {}.".format(validate_error_msg)) return logger.info("Well-formed, valid CSV file confirmed...") @@ -1359,6 +1407,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): except psycopg2.Error as e: raise utils.JobError("Could not connect to the Datastore: {}".format(e)) else: + copy_readbuffer_size = config.get("COPY_READBUFFER_SIZE") cur = raw_connection.cursor() """ truncate table to use copy freeze option and further increase @@ -1383,9 +1432,10 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): sql.Identifier(resource_id), column_names, ) - with open(tmp, "rb") as f: + # specify a 1MB buffer size for COPY read from disk + with open(tmp, "rb", copy_readbuffer_size) as f: try: - cur.copy_expert(copy_sql, f) + cur.copy_expert(copy_sql, f, size=copy_readbuffer_size) except psycopg2.Error as e: raise utils.JobError("Postgres COPY failed: {}".format(e)) else: diff --git a/dot-env.template b/dot-env.template index 013139b..17a5892 100644 --- a/dot-env.template +++ b/dot-env.template @@ -13,6 +13,10 @@ WRITE_ENGINE_URL = 'postgresql://datapusher:YOURPASSWORD@localhost/datastore_def # The connect string of the Datapusher+ Job database SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs' +# READ BUFFER SIZE IN BYTES WHEN READING CSV FILE WHEN USING POSTGRES COPY +# default 64k = 65536 +COPY_READBUFFER_SIZE = 65536 + # =============== DOWNLOAD SETTINGS ============== # 25mb, this is ignored if either PREVIEW_ROWS > 0 MAX_CONTENT_LENGTH = 1256000000