diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index ad9152b..b385a99 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -47,7 +47,7 @@ class DataPusherPlusConfig(MutableMapping): STDERR: bool = True KEEP_JOBS_AGE: int = 60 - MAX_CONTENT_LENGTH: str = "25600000" + MAX_CONTENT_LENGTH: str = "1256000000000" IGNORE_FILE_HASH: bool = True CHUNK_SIZE: str = "16384" DOWNLOAD_TIMEOUT: int = 300 diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 020bdd2..429f766 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -12,6 +12,7 @@ import time import decimal from urllib.parse import urlsplit +from urllib.parse import urlparse import logging # Third-party imports @@ -445,6 +446,16 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): # If this is an uploaded file to CKAN, authenticate the request, # otherwise we won't get file from private resources headers["Authorization"] = api_key + + # If the ckan_url differs from this url, rewrite this url to the ckan + # url. This can be useful if ckan is behind a firewall. + if not resource_url.startswith(ckan_url): + new_url = urlparse(resource_url) + rewrite_url = urlparse(ckan_url) + new_url = new_url._replace(scheme=rewrite_url.scheme, netloc=rewrite_url.netloc) + resource_url = new_url.geturl() + logger.info('Rewrote resource url to: {0}'.format(resource_url)) + try: kwargs = { "headers": headers, diff --git a/ckanext/datapusher_plus/logic/action.py b/ckanext/datapusher_plus/logic/action.py index ed4710b..2c4a38c 100644 --- a/ckanext/datapusher_plus/logic/action.py +++ b/ckanext/datapusher_plus/logic/action.py @@ -173,9 +173,9 @@ def datapusher_submit(context, data_dict: dict[str, Any]): 'original_url': resource_dict.get('url'), } } - timeout = tk.config.get('ckan.datapusher.timeout', 30) + dp_timeout = tk.config.get('ckan.datapusher.timeout', 3000) try: - job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=timeout)) + job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=dp_timeout)) except Exception as e: log.error("Error submitting job to DataPusher: %s", e) return False diff --git a/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py b/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py index fd2a606..55c8958 100644 --- a/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py +++ b/ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py @@ -5,37 +5,45 @@ Create Date: 2023-09-22 22:14:35.137116 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'e9c4a88839c8' +revision = "e9c4a88839c8" down_revision = None branch_labels = None depends_on = None def upgrade(): - #upgrade jobs table if it not exists - - op.add_column( - u'jobs', - sa.Column( - 'aps_job_id', - sa.UnicodeText), - ) - - #upgrade logs table - op.add_column( - 'logs', - sa.Column( - 'id', - sa.Integer, - primary_key=True, - autoincrement=True), - ) - - + # upgrade jobs table if it not exists + if not _check_column_exists("jobs", "aps_job_id"): + op.add_column( + "jobs", + sa.Column("aps_job_id", sa.UnicodeText), + ) + # upgrade logs table + if not _check_column_exists("logs", "id"): + op.add_column( + "logs", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + ) + + def downgrade(): - pass + # downgrade jobs table + if _check_column_exists("jobs", "aps_job_id"): + op.drop_column("jobs", "aps_job_id") + + # downgrade logs table + if _check_column_exists("logs", "id"): + op.drop_column("logs", "id") + + +def _check_column_exists(table_name, column_name): + bind = op.get_bind() + insp = sa.engine.reflection.Inspector.from_engine(bind) + columns = insp.get_columns(table_name) + return column_name in [column["name"] for column in columns] diff --git a/dot-env.template b/dot-env.template index 829a3a4..013139b 100644 --- a/dot-env.template +++ b/dot-env.template @@ -15,7 +15,7 @@ SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/d # =============== DOWNLOAD SETTINGS ============== # 25mb, this is ignored if either PREVIEW_ROWS > 0 -MAX_CONTENT_LENGTH = 25600000 +MAX_CONTENT_LENGTH = 1256000000 # A Datapusher+ job is triggered automatically everytime a resource is modified (even just its metadata) # if its mimetype is one of the supported datapusher.formats.