diff --git a/hepcrawl/pipelines.py b/hepcrawl/pipelines.py index e14979a9..50a54292 100644 --- a/hepcrawl/pipelines.py +++ b/hepcrawl/pipelines.py @@ -26,14 +26,13 @@ import pprint import requests import shutil -import structlog import logstash from .api import CrawlResult from .settings import FILES_STORE from .utils import RecordFile -STRUCT_LOGGER = structlog.get_logger() +logger = get_task_logger(__name__) LOGSTASH_HOST = os.environ.get("LOGSTASH_HOST", "localhost") LOGSTASH_PORT = os.environ.get("LOGSTASH_PORT", 6060) @@ -41,9 +40,10 @@ LOGGER = logging.getLogger("python-logstash-logger") LOGGER.setLevel(logging.INFO) LOGGER.addHandler( - logstash.TCPLogstashHandler(LOGSTASH_HOST, LOGSTASH_PORT, version=1) + logstash.TCPLogstashHandler(LOGSTASH_HOST, int(LOGSTASH_PORT), version=1) ) + class DocumentsPipeline(FilesPipeline): """Download all the documents the record passed to download. @@ -57,19 +57,14 @@ class DocumentsPipeline(FilesPipeline): def __init__(self, store_uri, *args, **kwargs): store_uri = store_uri or FILES_STORE - super(DocumentsPipeline, self).__init__( - *args, - store_uri=store_uri, - **kwargs - ) + super(DocumentsPipeline, self).__init__(*args, store_uri=store_uri, **kwargs) def get_media_requests(self, item, info): LOGGER.info("item: :", item) - if item.get('file_urls'): + if item.get("file_urls"): LOGGER.info( - 'Got the following files to download:\n%s', pprint.pformat( - item['file_urls'] - ) + "Got the following files to download:\n%s", + pprint.pformat(item["file_urls"]), ) return [Request(x) for x in item.get(self.files_urls_field, [])] return item.get("file_requests", []) @@ -78,9 +73,9 @@ def generate_presigned_s3_url(self, path, expire=7776000): bucket_location = get_project_settings().get("DOWNLOAD_BUCKET", "documents") LOGGER.info("Generating presigned url for: %s in %s", path, bucket_location) return self.store.s3_client.generate_presigned_url( - ClientMethod='get_object', - Params={'Bucket': bucket_location, "Key": path}, - ExpiresIn=expire + ClientMethod="get_object", + Params={"Bucket": bucket_location, "Key": path}, + ExpiresIn=expire, ) def item_completed(self, results, item, info): @@ -88,8 +83,8 @@ def item_completed(self, results, item, info): LOGGER.debug("results: %s, item: %s, info: %s", results, item, info) record_files = [ RecordFile( - path=self.generate_presigned_s3_url(result_data['path']), - name=os.path.basename(result_data['url']), + path=self.generate_presigned_s3_url(result_data["path"]), + name=os.path.basename(result_data["url"]), ) for ok, result_data in results if ok @@ -118,7 +113,7 @@ def __init__(self): def open_spider(self, spider): self.results_data = [] self.spider_name = spider.name - self.scrape_job = os.environ.get('SCRAPY_JOB') + self.scrape_job = os.environ.get("SCRAPY_JOB") def process_item(self, item, spider): """Add the crawl result to the results data after processing it. @@ -139,17 +134,14 @@ def process_item(self, item, spider): """ self.count += 1 item.record = item.to_hep(source=spider.source) - spider.logger.debug( - 'Got post-enhanced hep record:\n%s', - pprint.pformat(item.record), - ) + crawl_result = CrawlResult.from_parsed_item(item).to_dict() self.results_data.append(crawl_result) - titles = get_value(crawl_result, 'titles.title', default=[]) - dois = get_value(crawl_result, 'dois.value', default=[]) - arxiv_eprints = get_value(crawl_result, 'arxiv_eprints.value', default=[]) - report_numbers = get_value(crawl_result, 'report_numbers.value', default=[]) + titles = get_value(crawl_result, "titles.title", default=[]) + dois = get_value(crawl_result, "dois.value", default=[]) + arxiv_eprints = get_value(crawl_result, "arxiv_eprints.value", default=[]) + report_numbers = get_value(crawl_result, "report_numbers.value", default=[]) for doi in dois: self.dois.append(doi) @@ -160,8 +152,21 @@ def process_item(self, item, spider): for report_numbers in report_numbers: self.report_numbers.append(report_numbers) + logger.info( + "Processing item:\n %s", + pprint.pformat( + dict( + titles=titles, + dois=dois, + arxiv_eprints=arxiv_eprints, + report_numbers=report_numbers, + spider=self.spider_name, + scrapy_job=self.scrape_job, + ) + ) + ) LOGGER.info( - 'Processing item.', + "Processing item.", extra=dict( titles=titles, dois=dois, @@ -169,7 +174,7 @@ def process_item(self, item, spider): report_numbers=report_numbers, spider=self.spider_name, scrapy_job=self.scrape_job, - ) + ), ) return crawl_result @@ -177,35 +182,47 @@ def _prepare_payload(self, spider): """Return payload for push.""" payload_list = [ dict( - job_id=os.environ['SCRAPY_JOB'], - results_uri=os.environ['SCRAPY_FEED_URI'], + job_id=os.environ["SCRAPY_JOB"], + results_uri=os.environ["SCRAPY_FEED_URI"], results_data=[result], errors=[], log_file=None, - spider_name=spider.name - ) for result in self.results_data + spider_name=spider.name, + ) + for result in self.results_data ] - if spider.state.get('errors'): + if spider.state.get("errors"): errors = [ - {'exception': str(err['exception']), 'sender':str(err['sender'])} - for err in spider.state['errors'] + {"exception": str(err["exception"]), "sender": str(err["sender"])} + for err in spider.state["errors"] ] payload_list.append( dict( - job_id=os.environ['SCRAPY_JOB'], - results_uri=os.environ['SCRAPY_FEED_URI'], + job_id=os.environ["SCRAPY_JOB"], + results_uri=os.environ["SCRAPY_FEED_URI"], results_data=[], log_file=None, errors=errors, ) ) + + logger.info( + "Errors:\n %s", + pprint.pformat( + dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + errors=errors, + ) + ), + ) LOGGER.info( - 'Errors.', + "Errors.", extra=dict( spider=self.spider_name, scrapy_job=self.scrape_job, errors=errors, - ) + ), ) return payload_list @@ -213,24 +230,19 @@ def _prepare_payload(self, spider): def _cleanup(spider): """Run cleanup.""" # Cleanup errors - if 'errors' in spider.state: - del spider.state['errors'] + if "errors" in spider.state: + del spider.state["errors"] def close_spider(self, spider): """Post results to HTTP API.""" - api_mapping = spider.settings['API_PIPELINE_TASK_ENDPOINT_MAPPING'] + api_mapping = spider.settings["API_PIPELINE_TASK_ENDPOINT_MAPPING"] task_endpoint = api_mapping.get( - spider.name, spider.settings['API_PIPELINE_TASK_ENDPOINT_DEFAULT'] - ) - api_url = os.path.join( - spider.settings['API_PIPELINE_URL'], - task_endpoint + spider.name, spider.settings["API_PIPELINE_TASK_ENDPOINT_DEFAULT"] ) - if api_url and 'SCRAPY_JOB' in os.environ: + api_url = os.path.join(spider.settings["API_PIPELINE_URL"], task_endpoint) + if api_url and "SCRAPY_JOB" in os.environ: for payload in self._prepare_payload(spider): - json_data = { - "kwargs": payload - } + json_data = {"kwargs": payload} requests.post(api_url, json=json_data) @@ -247,76 +259,112 @@ def __init__(self): self.celery = Celery() def open_spider(self, spider): - self.celery.conf.update(dict( - BROKER_URL=spider.settings['BROKER_URL'], - CELERY_RESULT_BACKEND=spider.settings['CELERY_RESULT_BACKEND'], - CELERY_ACCEPT_CONTENT=spider.settings['CELERY_ACCEPT_CONTENT'], - CELERY_TIMEZONE=spider.settings['CELERY_TIMEZONE'], - CELERY_DISABLE_RATE_LIMITS=spider.settings[ - 'CELERY_DISABLE_RATE_LIMITS' - ], - CELERY_TASK_SERIALIZER='json', - CELERY_RESULT_SERIALIZER='json', - BROKER_TRANSPORT_OPTIONS=spider.settings['BROKER_TRANSPORT_OPTIONS'], - BROKER_CONNECTION_MAX_RETRIES=spider.settings['BROKER_CONNECTION_MAX_RETRIES'], - - )) + self.celery.conf.update( + dict( + BROKER_URL=spider.settings["BROKER_URL"], + CELERY_RESULT_BACKEND=spider.settings["CELERY_RESULT_BACKEND"], + CELERY_ACCEPT_CONTENT=spider.settings["CELERY_ACCEPT_CONTENT"], + CELERY_TIMEZONE=spider.settings["CELERY_TIMEZONE"], + CELERY_DISABLE_RATE_LIMITS=spider.settings[ + "CELERY_DISABLE_RATE_LIMITS" + ], + CELERY_TASK_SERIALIZER="json", + CELERY_RESULT_SERIALIZER="json", + BROKER_TRANSPORT_OPTIONS=spider.settings["BROKER_TRANSPORT_OPTIONS"], + BROKER_CONNECTION_MAX_RETRIES=spider.settings[ + "BROKER_CONNECTION_MAX_RETRIES" + ], + ) + ) super(InspireCeleryPushPipeline, self).open_spider(spider=spider) - def close_spider(self, spider): """Post results to BROKER API.""" from celery.utils.log import get_task_logger - logger = get_task_logger(__name__) - if 'SCRAPY_JOB' not in os.environ: + + if "SCRAPY_JOB" not in os.environ: self._cleanup(spider) return - if hasattr(spider, 'tmp_dir'): + if hasattr(spider, "tmp_dir"): shutil.rmtree(path=spider.tmp_dir, ignore_errors=True) - errors = getattr(spider, 'state', {}).get('errors', []) + errors = getattr(spider, "state", {}).get("errors", []) if self.count > 0 or errors: - task_endpoint = spider.settings[ - 'API_PIPELINE_TASK_ENDPOINT_MAPPING' - ].get( + task_endpoint = spider.settings["API_PIPELINE_TASK_ENDPOINT_MAPPING"].get( spider.name, - spider.settings['API_PIPELINE_TASK_ENDPOINT_DEFAULT'], + spider.settings["API_PIPELINE_TASK_ENDPOINT_DEFAULT"], + ) + logger.info("Triggering celery task: %s.", task_endpoint) + logger.info( + "Sending tasks:\n %s.", + pprint.pformat( + dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + number_of_results=self.count, + ) + ) ) - logger.info('Triggering celery task: %s.', task_endpoint) LOGGER.info( - 'Sending tasks.', + "Sending tasks.", extra=dict( spider=self.spider_name, scrapy_job=self.scrape_job, number_of_results=self.count, - ) + ), ) for kwargs in self._prepare_payload(spider): - logger.debug( - ' Sending results:\n %s', - pprint.pformat(kwargs), - ) res = self.celery.send_task(task_endpoint, kwargs=kwargs) celery_task_info_payload = { - 'celery_task_id': res.id, - 'scrapy_job_id': os.environ.get('SCRAPY_JOB') + "celery_task_id": res.id, + "scrapy_job_id": os.environ.get("SCRAPY_JOB"), } - logger.info('Sent celery task %s', pprint.pformat(celery_task_info_payload)) + logger.info( + "Sent celery task %s", pprint.pformat(celery_task_info_payload) + ) + + logger.info( + "Sent celery task: \n %s", + pprint.pformat( + dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + kwargs=kwargs, + celery_task_id=res.id, + ) + ), + ) + LOGGER.info( - 'Sending task.', - extra=dict( + "Sending task.", + extra=dict( spider=self.spider_name, scrapy_job=self.scrape_job, kwargs=kwargs, - celery_task_id=res.id - ) + celery_task_id=res.id, + ), ) + + logger.info( + "Finish Processing. \n %s", + pprint.pformat( + dict( + dois=self.dois, + arxiv_eprints=self.arxiv_eprints, + report_numbers=self.report_numbers, + spider=self.spider_name, + scrapy_job=self.scrape_job, + number_of_results=self.count, + ) + ), + ) + LOGGER.info( - 'Finish Processing.', + "Finish Processing.", extra=dict( dois=self.dois, arxiv_eprints=self.arxiv_eprints, @@ -324,14 +372,23 @@ def close_spider(self, spider): spider=self.spider_name, scrapy_job=self.scrape_job, number_of_results=self.count, - ) + ), ) else: + logger.info( + "No results.\n %s", + pprint.pformat( + dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + ) + ), + ) LOGGER.info( - 'No results.', + "No results.", extra=dict( spider=self.spider_name, scrapy_job=self.scrape_job, - ) + ), ) self._cleanup(spider)