Skip to content

Commit

Permalink
Merge pull request #137 from dathere/fixing-issues
Browse files Browse the repository at this point in the history
Fixing issues
  • Loading branch information
tino097 authored May 6, 2024
2 parents f9b0294 + 79de9bd commit ba18d25
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 26 deletions.
2 changes: 1 addition & 1 deletion ckanext/datapusher_plus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DataPusherPlusConfig(MutableMapping):
STDERR: bool = True
KEEP_JOBS_AGE: int = 60

MAX_CONTENT_LENGTH: str = "25600000"
MAX_CONTENT_LENGTH: str = "1256000000000"
IGNORE_FILE_HASH: bool = True
CHUNK_SIZE: str = "16384"
DOWNLOAD_TIMEOUT: int = 300
Expand Down
11 changes: 11 additions & 0 deletions ckanext/datapusher_plus/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import decimal
from urllib.parse import urlsplit
from urllib.parse import urlparse
import logging

# Third-party imports
Expand Down Expand Up @@ -445,6 +446,16 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
# If this is an uploaded file to CKAN, authenticate the request,
# otherwise we won't get file from private resources
headers["Authorization"] = api_key

# If the ckan_url differs from this url, rewrite this url to the ckan
# url. This can be useful if ckan is behind a firewall.
if not resource_url.startswith(ckan_url):
new_url = urlparse(resource_url)
rewrite_url = urlparse(ckan_url)
new_url = new_url._replace(scheme=rewrite_url.scheme, netloc=rewrite_url.netloc)
resource_url = new_url.geturl()
logger.info('Rewrote resource url to: {0}'.format(resource_url))

try:
kwargs = {
"headers": headers,
Expand Down
4 changes: 2 additions & 2 deletions ckanext/datapusher_plus/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ def datapusher_submit(context, data_dict: dict[str, Any]):
'original_url': resource_dict.get('url'),
}
}
timeout = tk.config.get('ckan.datapusher.timeout', 30)
dp_timeout = tk.config.get('ckan.datapusher.timeout', 3000)
try:
job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=timeout))
job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=dp_timeout))
except Exception as e:
log.error("Error submitting job to DataPusher: %s", e)
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,45 @@
Create Date: 2023-09-22 22:14:35.137116
"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e9c4a88839c8'
revision = "e9c4a88839c8"
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
#upgrade jobs table if it not exists

op.add_column(
u'jobs',
sa.Column(
'aps_job_id',
sa.UnicodeText),
)

#upgrade logs table
op.add_column(
'logs',
sa.Column(
'id',
sa.Integer,
primary_key=True,
autoincrement=True),
)


# upgrade jobs table if it not exists
if not _check_column_exists("jobs", "aps_job_id"):
op.add_column(
"jobs",
sa.Column("aps_job_id", sa.UnicodeText),
)
# upgrade logs table
if not _check_column_exists("logs", "id"):
op.add_column(
"logs",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
)


def downgrade():
pass
# downgrade jobs table
if _check_column_exists("jobs", "aps_job_id"):
op.drop_column("jobs", "aps_job_id")

# downgrade logs table
if _check_column_exists("logs", "id"):
op.drop_column("logs", "id")


def _check_column_exists(table_name, column_name):
bind = op.get_bind()
insp = sa.engine.reflection.Inspector.from_engine(bind)
columns = insp.get_columns(table_name)
return column_name in [column["name"] for column in columns]
2 changes: 1 addition & 1 deletion dot-env.template
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/d

# =============== DOWNLOAD SETTINGS ==============
# 25mb, this is ignored if either PREVIEW_ROWS > 0
MAX_CONTENT_LENGTH = 25600000
MAX_CONTENT_LENGTH = 1256000000

# A Datapusher+ job is triggered automatically everytime a resource is modified (even just its metadata)
# if its mimetype is one of the supported datapusher.formats.
Expand Down

0 comments on commit ba18d25

Please sign in to comment.