Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T128 refactor loader #128, #122 #138

Merged
merged 28 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9a05868
Added code for using Spark DataFrame API
dolsysmith Jul 28, 2021
5ef31cd
Fixed pyspark version
dolsysmith Jul 28, 2021
ceffc9f
Adding schema and SQL files
dolsysmith Jul 28, 2021
0723af2
Upgraded to twarc 1.12 for column name fix
dolsysmith Jul 28, 2021
57d1ad7
Working with Spark 3/Java 11/Python 3.8
dolsysmith Aug 2, 2021
0fab820
Updated README
dolsysmith Aug 2, 2021
f58028a
Adding headers to files
dolsysmith Aug 4, 2021
1d53efc
Using elasticsearch instead of elasticsearch_dsl to trigger dependenc…
dolsysmith Aug 5, 2021
e32c6b1
Fixed issues in creation of CSV
dolsysmith Aug 6, 2021
1311d8a
Updating Spark SQL to reflect json2csv 1.12
dolsysmith Aug 11, 2021
7e012a3
Mapping to json2csv 1.12
dolsysmith Aug 11, 2021
09a36d7
Updating SQL for json2csv 1.12
dolsysmith Aug 12, 2021
412ddbb
Fixed syntax errors
dolsysmith Aug 12, 2021
c5e76ba
Fixed logic to capture all URL's
dolsysmith Aug 12, 2021
0ff89e9
Fixed logic to capture all URL's
dolsysmith Aug 12, 2021
751ea73
Removing extra comma
dolsysmith Aug 12, 2021
66b3889
Hashtags from retweeted_status
dolsysmith Aug 13, 2021
f21d5ed
Order of hashtag fields
dolsysmith Aug 13, 2021
0739b6f
Added readable version of schema
dolsysmith Aug 13, 2021
f7bab20
Changing JSON handling for tweet data
dolsysmith Aug 20, 2021
3ee1211
Fixing indentation
dolsysmith Aug 20, 2021
52cd0e6
textFile method requires string, not list
dolsysmith Aug 20, 2021
a42cea2
Added support for full_text element to schema
dolsysmith Aug 20, 2021
d81c832
JSON of tweet now correctly indexed; removed extraneous columns from …
dolsysmith Aug 24, 2021
9148842
Refactored to avoid join
dolsysmith Aug 25, 2021
fe401d0
Fixed typo
dolsysmith Aug 25, 2021
fc33807
Using RDD for ES load; DataFrame for extracts; no JSON extract
dolsysmith Aug 27, 2021
e845cf5
Removed extraneous import
dolsysmith Aug 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Dockerfile-loader
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
FROM python:3.6-stretch
FROM python:3.8-buster
MAINTAINER TweetSets <[email protected]>

ADD requirements.txt /opt/tweetsets/
WORKDIR /opt/tweetsets
RUN pip install -r requirements.txt
RUN grep elasticsearch-dsl requirements.txt | xargs pip install -t dependencies
RUN grep elasticsearch requirements.txt | xargs pip install -t dependencies

RUN apt-get update && \
apt-get install -y openjdk-8-jre-headless \
apt-get install -y openjdk-11-jre-headless \
ca-certificates-java \
zip -y

Expand All @@ -18,8 +18,11 @@ WORKDIR /opt/tweetsets
ADD tweetset_loader.py /opt/tweetsets/
ADD models.py /opt/tweetsets/
ADD utils.py /opt/tweetsets/
ADD spark_utils.py /opt/tweetsets/
ADD tweetsets_schema.json /opt/tweetsets/
ADD tweetsets_sql_exp.sql /opt/tweetsets/
ADD setup.py /opt/tweetsets/
ADD elasticsearch-hadoop-7.9.2.jar /opt/tweetsets/elasticsearch-hadoop.jar
ADD elasticsearch-spark-30_2.12-7.13.4.jar /opt/tweetsets/elasticsearch-hadoop.jar
ADD tweetset_cli.py /opt/tweetsets/

RUN python setup.py bdist_egg
Expand Down
15 changes: 7 additions & 8 deletions Dockerfile-spark
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.6-stretch
FROM python:3.8-buster
MAINTAINER TweetSets <[email protected]>

# Based on https://hub.docker.com/r/gettyimages/spark/~/dockerfile/
Expand All @@ -24,15 +24,14 @@ ENV PYTHONIOENCODING UTF-8
ENV PIP_DISABLE_PIP_VERSION_CHECK 1

# JAVA
ARG JAVA_MAJOR_VERSION=8
ARG JAVA_UPDATE_VERSION=131
ARG JAVA_MAJOR_VERSION=11
ARG JAVA_UPDATE_VERSION=11+9
ARG JAVA_BUILD_NUMBER=11
ENV JAVA_HOME /usr/jdk1.${JAVA_MAJOR_VERSION}.0_${JAVA_UPDATE_VERSION}
ENV JAVA_HOME /usr/jdk-${JAVA_MAJOR_VERSION}.0.${JAVA_UPDATE_VERSION}

ENV PATH $PATH:$JAVA_HOME/bin
RUN curl -sL --retry 3 --insecure \
--header "Cookie: oraclelicense=accept-securebackup-cookie;" \
"http://download.oracle.com/otn-pub/java/jdk/${JAVA_MAJOR_VERSION}u${JAVA_UPDATE_VERSION}-b${JAVA_BUILD_NUMBER}/d54c1d3a095b4ff2b6607d096fa80163/server-jre-${JAVA_MAJOR_VERSION}u${JAVA_UPDATE_VERSION}-linux-x64.tar.gz" \
"https://github.com/AdoptOpenJDK/openjdk11-binaries/releases/download/jdk-11.0.11+9/OpenJDK11U-jdk_x64_linux_hotspot_11.0.11_9.tar.gz" \
| gunzip \
| tar x -C /usr/ \
&& ln -s $JAVA_HOME /usr/java \
Expand All @@ -51,13 +50,13 @@ RUN curl -sL --retry 3 \
&& chown -R root:root $HADOOP_HOME

# SPARK
ENV SPARK_VERSION 2.3.2
ENV SPARK_VERSION 3.1.2
ENV SPARK_PACKAGE spark-${SPARK_VERSION}-bin-without-hadoop
ENV SPARK_HOME /usr/spark-${SPARK_VERSION}
ENV SPARK_DIST_CLASSPATH="$HADOOP_HOME/etc/hadoop/*:$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/hdfs/lib/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/yarn/lib/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/lib/*"
ENV PATH $PATH:${SPARK_HOME}/bin
RUN curl -sL --retry 3 \
"https://archive.apache.org/dist/spark/spark-2.3.2/spark-2.3.2-bin-without-hadoop.tgz" \
"https://mirrors.sonic.net/apache/spark/spark-3.1.2/spark-3.1.2-bin-without-hadoop.tgz" \
| gunzip \
| tar x -C /usr/ \
&& mv /usr/$SPARK_PACKAGE $SPARK_HOME \
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ loader Elasticsearch is on the same machine as TweetSets (e.g., in a small devel
spark-submit \
--jars elasticsearch-hadoop.jar \
--master spark://$SPARK_MASTER_HOST:7101 \
--py-files dist/TweetSets-2.1.0-py3.6.egg,dependencies.zip \
--py-files dist/TweetSets-2.1.0-py3.8.egg,dependencies.zip \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.driver.host=$SPARK_DRIVER_HOST \
tweetset_loader.py spark-create /dataset/path/to
Expand Down
Binary file added elasticsearch-spark-30_2.12-7.13.4.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ requests==2.25.0
six==1.10.0
vine==1.3.0
Werkzeug==1.0.1
twarc==1.4.0
pyspark==2.3.2
twarc==1.12.1
pyspark==3.1.2
151 changes: 151 additions & 0 deletions spark_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import pyspark.sql.types as T
from pyspark.sql.functions import col, explode
import pyspark.sql.functions as F
from pyspark.sql import Row
import json
from twarc import json2csv

def load_schema(path_to_schema):
'''Load TweetSets Spark DataFrame schema
:param path_to_schema: path to a Spark schema JSON document on disk'''
with open(path_to_schema, 'r') as f:
schema = json.load(f)
return T.StructType.fromJson(schema)

def load_sql(path_to_sql):
'''Load Spark SQL code for TweetSets data transform
:path_to_sql: path to Spark SQL code on disk'''
with open(path_to_sql, 'r') as f:
return f.read()

def make_spark_df(spark, schema, sql, path_to_dataset, dataset_id):
'''Loads a set of JSON tweets and applies the SQL transform.

:param spark: an initialized SparkSession object
:param schema: a valid Spark DataFrame schema for loading a tweet from JSONL
:param sql: Spark SQL to execute against the DataFrame
:param path_to_dataset: a comma-separated list of JSON files to load
:param dataset_id: a string containing the ID for this dataset'''
# Read JSON files as Spark DataFrame
#df = load_rdd(spark, tweet_schema=schema, path_to_tweets=path_to_dataset)
df = spark.read.schema(schema).json(path_to_dataset)
# Create a temp view for the SQL ops
df.createOrReplaceTempView("tweets")
# Apply SQL transform
df = spark.sql(sql)
# Drop temporary columns
cols_to_drop = [c for c in df.columns if c.endswith('struct') or c.endswith('array') or c.endswith('str')]
df = df.drop(*cols_to_drop)
# Add dataset ID column
df = df.withColumn('dataset_id', F.lit(dataset_id))
return df

def extract_tweet_ids(df, path_to_extract):
'''Saves Tweet ID's from a dataset to the provided path as zipped CSV files.
:param df: Spark DataFrame
:parm path_to_extract: string of path to folder for files'''
# Extract ID column and save as zipped CSV
df.select('tweet_id').distinct().write.option("header", "true").csv(path_to_extract, compression='gzip')

def extract_tweet_json(df, path_to_extract):
'''Saves Tweet JSON documents from a dataset to the provided path as zipped JSON files.
:param df: Spark DataFrame
:parm path_to_extract: string of path to folder for files'''
# Extract ID column and save as zipped JSON
df.select('tweet').write.text(path_to_extract,compression='gzip')

def make_column_mapping(df_columns, array_fields):
'''Creates mapping from TweetSets fields to CSV column headings, using headings derived from twarc.json2csv. Each key is a column name in the DataFrame created from Tweet JSON by SQL transform; each value a tuple: the first element is the name of the CSV column heading, the second element is a Boolean flag indicating whether this field is an array. (Arrays need to be transformed to strings prior to writing to CSV.)
:param df_columns: list of columns in the transformed Spark DataFrame (includes some fields required by json2csv not indexed in Elasticsearch)
:param array_fields: list of fields in df_columns stored as arrays'''
# Map TweetSets fields to their CSV column names
column_mapping = {'retweet_quoted_status_id': 'retweet_or_quote_id',
'retweeted_quoted_screen_name': 'retweet_or_quote_screen_name',
'tweet_id': 'id',
'user_follower_count': 'user_followers_count',
'language': 'lang',
'retweeted_quoted_user_id': 'retweet_or_quote_user_id',
'hashtags_csv': 'hashtags',
'urls_csv': 'urls'
}
# Add remaining fields from the DataFrame if they are used by json2csv
column_mapping.update({k: k for k in df_columns if k in json2csv.get_headings()})
# Set array flag for those fields that require it
column_mapping = {k: (v, True if k in array_fields else False) for k,v in column_mapping.items()}
return column_mapping

def extract_csv(df, path_to_extract):
'''Creates CSV extract where each row is a Tweet document, using the schema in the twarc.json2csv module.
:param df: Spark DataFrame
:parm path_to_extract: string of path to folder for files'''
column_mapping = make_column_mapping(df.columns, array_fields=['text'])
#print('COLUMN MAPPING', column_mapping)
# The hashtags and urls fields are handled differently in the Elasticsearch index and in the CSV (per the twarc.json2csv spec). So we need to drop the ES columns before renaming the CSV-versions of these columns
df = df.drop('hashtags', 'urls')
for k, v in column_mapping.items():
# Need to convert fields stored as arrays
if v[1]:
# Concat arrays with whitespace
df = df.withColumn(k, F.concat_ws(' ', df[k]))
# rename columns as necessary
if k != v[0]:
df = df.withColumnRenamed(k, v[0])
# We select only the columns identified in json2csv, skipping the user_urls column (which may have been deprecated)
csv_columns = [c for c in json2csv.get_headings() if c != 'user_urls']
df_csv = df.select(csv_columns)
# Remove newlines in the text and user_location fields
df_csv = df_csv.withColumn('text', F.regexp_replace('text', '\n|\r', ' '))
df_csv = df_csv.withColumn('user_location', F.regexp_replace('user_location', '\n|\r', ' '))
# Swap back the date fields so that the created_at field contains the unparsed version
data_mapping = {'created_at': 'parsed_created_at',
'parsed_created_at': 'created_at'}
df_csv = df_csv.select([F.col(c).alias(data_mapping.get(c, c)) for c in df_csv.columns])
# Setting the escape character to the double quote. Otherwise, it causes problems for applications reading the CSV.
# Get rid of duplicate tweets
df_csv = df_csv.dropDuplicates(['id'])
df_csv.write.option("header", "true").csv(path_to_extract, compression='gzip', escape='"')

def extract_mentions(df, spark, path_to_extract):
'''Creates nodes and edges of full mentions extract.
:param df: Spark DataFrame (after SQL transform)
:param spark: SparkSession object
:param path_to_extract: string of path to folder for files'''
# Create a temp table so that we can use SQL
df.createOrReplaceTempView("tweets_parsed")
# SQL for extracting the mention ids, screen_names, and user_ids
mentions_sql = '''
select mentions.*,
user_id
from (
select
explode(arrays_zip(mention_user_ids, mention_screen_names)) as mentions,
user_id
from tweets_parsed
)
'''
mentions_df = spark.sql(mentions_sql)
mention_edges = mentions_df.select('mention_user_ids', 'mention_screen_names')\
.distinct()
mention_nodes = mentions_df.select('mention_user_ids', 'user_id').distinct()
mention_nodes.write.option("header", "true").csv(path_to_extract + '/nodes', compression='gzip')
mention_edges.write.option("header", "true").csv(path_to_extract + '/edges', compression='gzip')

def agg_mentions(df, spark, path_to_extract):
'''Creates count of Tweets per mentioned user id.
:param df: Spark DataFrame (after SQL transform)
:param spark: SparkSession object
:parm path_to_extract: string of path to folder for files'''
df.createOrReplaceTempView("tweets_parsed")
sql_agg = '''
select count(distinct tweet_id) as number_mentions,
mention_user_id as mentioned_user
from (
select
explode(mention_user_ids) as mention_user_id,
tweet_id
from tweets_parsed
)
group by mention_user_id
'''
mentions_agg_df = spark.sql(sql_agg)
mentions_agg_df.write.option("header", "true").csv(path_to_extract + '/top_mentions', compression='gzip')
Loading