Skip to content

Commit

Permalink
CDX DAG confirmed to work on h3 also, for #49. Magic.
Browse files Browse the repository at this point in the history
  • Loading branch information
anjackson committed Dec 15, 2021
1 parent f043167 commit 215cff5
Showing 1 changed file with 109 additions and 91 deletions.
200 changes: 109 additions & 91 deletions manage/airflow/dags/warc_cdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,94 +32,112 @@
cdx_host = cdx_parsed.netloc # include port
cdx_col = cdx_parsed.path[1:] # Strip leading slash

hadoop_service = 'h020'

with DAG(
f"access_{hadoop_service}_warc_cdx_index",
description='Index Hadoop 0.20 WARCs into OutbackCDX',
default_args=default_args,
schedule_interval='@hourly',
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
params={
'hadoop_service' : hadoop_service,
'trackdb_url' : trackdb_url,
'cdx_service' : f"http://{cdx_host}",
'cdx_collection': cdx_col
},
tags=['access', 'index', 'cdx']
) as dag:
dag.doc_md = f"""
### Index WARCs into CDX
This runs Hadoop `{dag.params['hadoop_service']}` jobs to index WARC content into the OutbackCDX service.
Configuration:
* Reads and updates TrackDB at `{dag.params['trackdb_url']}`
* Processes WARCS on Hadoop `{dag.params['hadoop_service']}`
* Updates CDX collection `{dag.params['cdx_collection']}` on CDX service `{dag.params['cdx_service']}`
* The push gateway is configured to be `{c.push_gateway}`.
How to check it's working, you can:
* Check for various Prometheus metrics via the Push Gateway:
* For Webrecorder WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}`
* For Frequent Crawl WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}`
* For Domain Crawl WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}`
* The number of WARCs marked as indexed in TrackTB should increase:
* [For Webrecorder WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:webrecorder)
* [For Frequent WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:frequent)
* [For Domain WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:domain)
Tool container versions:
* UKWA Manage Task Image: `{c.ukwa_task_image}`
"""

cdx_h020_wr = DockerOperator(
task_id='index_h020_webrecorder_cdx',
image=c.ukwa_task_image,
# Add Hadoop 0.20 settings:
entrypoint='/entrypoint-h020.sh',
environment= {
'MRJOB_CONF': '/etc/mrjob.conf',
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S webrecorder -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_h020_fc = DockerOperator(
task_id='index_h020_frequent_cdx',
image=c.ukwa_task_image,
# Add Hadoop 0.20 settings:
entrypoint='/entrypoint-h020.sh',
environment= {
'MRJOB_CONF': '/etc/mrjob.conf',
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S frequent -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_h020_dc = DockerOperator(
task_id='index_h020_domain_cdx',
image=c.ukwa_task_image,
# Add Hadoop 0.20 settings:
entrypoint='/entrypoint-h020.sh',
environment= {
'MRJOB_CONF': '/etc/mrjob.conf',
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S domain -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_h020_wr >> cdx_h020_fc >> cdx_h020_dc
# Function to generate DAGs for different clusters etc
def generate_cdx_dag(hadoop_service):

if hadoop_service == 'h020':
mrjob_conf = "/etc/mrjob.conf"
entrypoint = '/entrypoint-h020.sh'
elif hadoop_service == 'h3':
mrjob_conf = "/etc/mrjob_h3.conf"
entrypoint = '/entrypoint-h3.sh'
else:
raise Exception(f"Did not recognised Hadoop service {hadoop_service}")

dag_id = f"access_{hadoop_service}_warc_cdx_index"
with DAG(
dag_id,
description=f'Index Hadoop {hadoop_service} WARCs into OutbackCDX',
default_args=default_args,
schedule_interval='@hourly',
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
params={
'hadoop_service' : hadoop_service,
'trackdb_url' : trackdb_url,
'cdx_service' : f"http://{cdx_host}",
'cdx_collection': cdx_col
},
tags=['access', 'index', 'cdx']
) as dag:
dag.doc_md = f"""
### Index Hadoop {hadoop_service} WARCs into CDX
This runs Hadoop `{dag.params['hadoop_service']}` jobs to index WARC content into the OutbackCDX service.
Configuration:
* Reads and updates TrackDB at `{dag.params['trackdb_url']}`
* Processes WARCS on Hadoop `{dag.params['hadoop_service']}`
* Updates CDX collection `{dag.params['cdx_collection']}` on CDX service `{dag.params['cdx_service']}`
* The push gateway is configured to be `{c.push_gateway}`.
How to check it's working, you can:
* Check for various Prometheus metrics via the Push Gateway:
* For Webrecorder WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-webrecorder", status="success"}}`
* For Frequent Crawl WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-frequent", status="success"}}`
* For Domain Crawl WARCs:
* `ukwa_task_event_timestamp{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}`
* `ukwa_task_total_sent_record_count{{job="cdx-index-{dag.params['hadoop_service']}-domain", status="success"}}`
* The number of WARCs marked as indexed in TrackTB should increase:
* [For Webrecorder WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:webrecorder AND hdfs_service_id_s:{hadoop_service})
* [For Frequent WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:frequent AND hdfs_service_id_s:{hadoop_service})
* [For Domain WARCs]({dag.params['trackdb_url']}/select?q=cdx_index_ss:{dag.params['cdx_collection']} AND stream_s:domain AND hdfs_service_id_s:{hadoop_service})
Tool container versions:
* UKWA Manage Task Image: `{c.ukwa_task_image}`
"""

cdx_wr = DockerOperator(
task_id=f'index_{hadoop_service}_webrecorder_cdx',
image=c.ukwa_task_image,
# Add Hadoop settings:
entrypoint=entrypoint,
environment= {
'MRJOB_CONF': mrjob_conf,
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S webrecorder -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_fc = DockerOperator(
task_id=f'index_{hadoop_service}_frequent_cdx',
image=c.ukwa_task_image,
# Add Hadoop settings:
entrypoint=entrypoint,
environment= {
'MRJOB_CONF': mrjob_conf,
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S frequent -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_dc = DockerOperator(
task_id=f'index_{hadoop_service}_domain_cdx',
image=c.ukwa_task_image,
# Add Hadoop settings:
entrypoint=entrypoint,
environment= {
'MRJOB_CONF': mrjob_conf,
'PUSH_GATEWAY': c.push_gateway,
},
command='windex cdx-index -v -t {{ params.trackdb_url }} -H {{ params.hadoop_service }} -S domain -c {{ params.cdx_service }} -C {{ params.cdx_collection }} -B 2',
)

cdx_wr >> cdx_fc >> cdx_dc

# Register the DAG
globals()[dag_id] = dag


generate_cdx_dag('h020')
generate_cdx_dag('h3')

0 comments on commit 215cff5

Please sign in to comment.