From 215cff5cdb61eb8e0d8eeb2deb225d5b244324d6 Mon Sep 17 00:00:00 2001 From: Andrew Jackson Date: Wed, 15 Dec 2021 17:17:31 +0000 Subject: [PATCH] CDX DAG confirmed to work on h3 also, for #49. Magic. --- manage/airflow/dags/warc_cdx.py | 200 +++++++++++++++++--------------- 1 file changed, 109 insertions(+), 91 deletions(-) diff --git a/manage/airflow/dags/warc_cdx.py b/manage/airflow/dags/warc_cdx.py index c4d6292..b9077c2 100644 --- a/manage/airflow/dags/warc_cdx.py +++ b/manage/airflow/dags/warc_cdx.py @@ -32,94 +32,112 @@ cdx_host = cdx_parsed.netloc # include port cdx_col = cdx_parsed.path[1:] # Strip leading slash -hadoop_service = 'h020' - -with DAG( - f"access_{hadoop_service}_warc_cdx_index", - description='Index Hadoop 0.20 WARCs into OutbackCDX', - default_args=default_args, - schedule_interval='@hourly', - start_date=days_ago(1), - catchup=False, - max_active_runs=1, - params={ - 'hadoop_service' : hadoop_service, - 'trackdb_url' : trackdb_url, - 'cdx_service' : f"http://{cdx_host}", - 'cdx_collection': cdx_col - }, - tags=['access', 'index', 'cdx'] -) as dag: - dag.doc_md = f""" -### Index WARCs into CDX - -This runs Hadoop `{dag.params['hadoop_service']}` jobs to index WARC content into the OutbackCDX service. - -Configuration: - -* Reads and updates TrackDB at `{dag.params['trackdb_url']}` -* Processes WARCS on Hadoop `{dag.params['hadoop_service']}` -* Updates CDX collection `{dag.params['cdx_collection']}` on CDX service `{dag.params['cdx_service']}` -* The push gateway is configured to be `{c.push_gateway}`. - - -How to check it's working, you can: - -* Check for various Prometheus metrics via the Push Gateway: - * For Webrecorder WARCs: - * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}` - * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}` - * For Frequent Crawl WARCs: - * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}` - * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}` - * For Domain Crawl WARCs: - * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}` - * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}` -* The number of WARCs marked as indexed in TrackTB should increase: - * [For Webrecorder WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:webrecorder) - * [For Frequent WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:frequent) - * [For Domain WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:domain) - -Tool container versions: - - * UKWA Manage Task Image: `{c.ukwa_task_image}` - - """ - - cdx_h020_wr = DockerOperator( - task_id='index_h020_webrecorder_cdx', - image=c.ukwa_task_image, - # Add Hadoop 0.20 settings: - entrypoint='/entrypoint-h020.sh', - environment= { - 'MRJOB_CONF': '/etc/mrjob.conf', - 'PUSH_GATEWAY': c.push_gateway, - }, - command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S webrecorder -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', - ) - - cdx_h020_fc = DockerOperator( - task_id='index_h020_frequent_cdx', - image=c.ukwa_task_image, - # Add Hadoop 0.20 settings: - entrypoint='/entrypoint-h020.sh', - environment= { - 'MRJOB_CONF': '/etc/mrjob.conf', - 'PUSH_GATEWAY': c.push_gateway, - }, - command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S frequent -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', - ) - - cdx_h020_dc = DockerOperator( - task_id='index_h020_domain_cdx', - image=c.ukwa_task_image, - # Add Hadoop 0.20 settings: - entrypoint='/entrypoint-h020.sh', - environment= { - 'MRJOB_CONF': '/etc/mrjob.conf', - 'PUSH_GATEWAY': c.push_gateway, - }, - command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S domain -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', - ) - - cdx_h020_wr >> cdx_h020_fc >> cdx_h020_dc +# Function to generate DAGs for different clusters etc +def generate_cdx_dag(hadoop_service): + + if hadoop_service == 'h020': + mrjob_conf = "/etc/mrjob.conf" + entrypoint = '/entrypoint-h020.sh' + elif hadoop_service == 'h3': + mrjob_conf = "/etc/mrjob_h3.conf" + entrypoint = '/entrypoint-h3.sh' + else: + raise Exception(f"Did not recognised Hadoop service {hadoop_service}") + + dag_id = f"access_{hadoop_service}_warc_cdx_index" + with DAG( + dag_id, + description=f'Index Hadoop {hadoop_service} WARCs into OutbackCDX', + default_args=default_args, + schedule_interval='@hourly', + start_date=days_ago(1), + catchup=False, + max_active_runs=1, + params={ + 'hadoop_service' : hadoop_service, + 'trackdb_url' : trackdb_url, + 'cdx_service' : f"http://{cdx_host}", + 'cdx_collection': cdx_col + }, + tags=['access', 'index', 'cdx'] + ) as dag: + dag.doc_md = f""" + ### Index Hadoop {hadoop_service} WARCs into CDX + + This runs Hadoop `{dag.params['hadoop_service']}` jobs to index WARC content into the OutbackCDX service. + + Configuration: + + * Reads and updates TrackDB at `{dag.params['trackdb_url']}` + * Processes WARCS on Hadoop `{dag.params['hadoop_service']}` + * Updates CDX collection `{dag.params['cdx_collection']}` on CDX service `{dag.params['cdx_service']}` + * The push gateway is configured to be `{c.push_gateway}`. + + + How to check it's working, you can: + + * Check for various Prometheus metrics via the Push Gateway: + * For Webrecorder WARCs: + * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}` + * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}` + * For Frequent Crawl WARCs: + * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}` + * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}` + * For Domain Crawl WARCs: + * `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}` + * `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}` + * The number of WARCs marked as indexed in TrackTB should increase: + * [For Webrecorder WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:webrecorder AND hdfs_service_id_s:{hadoop_service}) + * [For Frequent WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:frequent AND hdfs_service_id_s:{hadoop_service}) + * [For Domain WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:domain AND hdfs_service_id_s:{hadoop_service}) + + Tool container versions: + + * UKWA Manage Task Image: `{c.ukwa_task_image}` + + """ + + cdx_wr = DockerOperator( + task_id=f'index_{hadoop_service}_webrecorder_cdx', + image=c.ukwa_task_image, + # Add Hadoop settings: + entrypoint=entrypoint, + environment= { + 'MRJOB_CONF': mrjob_conf, + 'PUSH_GATEWAY': c.push_gateway, + }, + command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S webrecorder -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', + ) + + cdx_fc = DockerOperator( + task_id=f'index_{hadoop_service}_frequent_cdx', + image=c.ukwa_task_image, + # Add Hadoop settings: + entrypoint=entrypoint, + environment= { + 'MRJOB_CONF': mrjob_conf, + 'PUSH_GATEWAY': c.push_gateway, + }, + command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S frequent -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', + ) + + cdx_dc = DockerOperator( + task_id=f'index_{hadoop_service}_domain_cdx', + image=c.ukwa_task_image, + # Add Hadoop settings: + entrypoint=entrypoint, + environment= { + 'MRJOB_CONF': mrjob_conf, + 'PUSH_GATEWAY': c.push_gateway, + }, + command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S domain -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2', + ) + + cdx_wr >> cdx_fc >> cdx_dc + + # Register the DAG + globals()[dag_id] = dag + + +generate_cdx_dag('h020') +generate_cdx_dag('h3')