-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
T128 refactor loader #128, #122 #138
Conversation
…y install for Spark submit
674c0af
to
0739b6f
Compare
Updated spark_utils.py so that the original JSON (unparsed) is stored in the |
Updated the JSON schema file so that the |
spark_utils.py
Outdated
'''Loads a set of JSON tweets as strings and adds a column index. We do this so that the ultimate output in the JSON extract will have the same null fields as the original. | ||
|
||
:param spark: an initialized SparkSession object | ||
:param path_to_dataset: a comma-separated list of JSON files to load''' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be :param path_to_tweets:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- fixed.
Updated branch as follows:
|
New approach: we load the JSON-L as an RDD, then convert that to a DataFrame, allowing us to preserve the original string representation of the JSON as a separate column and obviating the need for a join (which creates problems on smaller datasets). This approach seems stable, but performance has taken a hit (relative to previous implementations) that is evident on larger datasets. Loading 20 GB with the new implementation (not counting the time to create extracts) took ~35 min, vs. 20 min using the implementation currently in production. |
Is that comparison against the previous implementations of Spark 3? Or compared to TweetSets 2.1? |
Oh never mind, I see you said compared to production. Sorry! |
This versions uses the RDD API for loading to TweetSets (in order to preserve the original JSON as is) and the DataFrame API to create the extracts. Performance is comparable to what's in production for loading and significantly improved for creating extracts. The Spark SQL code includes fields that we use for indexing in Elasticsearch; these are dropped when creating the CSV. I will leave them there for now (I don't think their presence really impacts performance) with an eye toward a future release where we no longer need to load the full JSON into Elasticsearch. At that point, we can use the DataFrame API for everything (which should improve performance further). |
Summary
This branch contains the following changes:
spark-loader
command now uses the Spark DataFrame API to 1) load Tweets into ES and 2) create extracts.Setup
You will need to have an NFS mount shared between your primary and cluster nodes/VM's in order for this branch to work properly. See the instructions in the original issue.
Make sure that
loader.docket-compose.yml
is configured to build the Docker image for theDockerfile-loader
.Likewise for thespark-master
andspark-worker
containers indocker-compose.yml
on both primary and cluster nodes.You'll also need to update your
.env
file to point theDATASET_PATH
variable to the shared NFS mount. (On my VM's, this is/storage/dataset_loading
.)Currently, the data extracts are written to the same directory used for loading datasets. So they will not appear in the TweetSets UI, which is still looking for them elsewhere. But I didn't want to touch
tweetset_server.py
in this branch, given the work Laura has been doing with the Python 3.8 upgrade.Testing
It will be useful to load the same dataset with the regular loader and then with the Spark loader, in order to compare results in the UI.
Dockerfile-spark
.dataset.json
file in the directory with the JSON of the tweets to load.dataset.json
with a name to distinguish it from the previous load.This presumes that your sample dataset is in the
storage/dataset_loading/sample
directory (or whatever NFS mount is mapped to/dataset
in the .ENV file). In my testing, I put the tweet JSON files anddataset.json
in ajson
subdirectory, but that's not strictly necessary.a. Bring down TweetSets on both clusters.
b. Check out the master branch or Laura's Py38 branch (
t126-python-38
).c. Remove the server image (on the primary node) to force rebuild:
docker image rm ts_server-flaskrun:latest
d. Brink TweetSets back up.
Expected Results
Elasticsearch indexing
extended_tweet
andretweeted_status
elements.quoted_status
text fields for tweets of typequote
. (This decision was made in consultation with Laura for the sake of consistency.)Let me know if you see inconsistencies in the indexing that don't make sense with the above.
Dataset extracts
Each should contain one or more zipped files. I've tested the CSV files against those created by
twarc.json2csv
(1.12.1) and documented some minor differences in the data dictionary. Feel free to test them against full extracts created in the UI, but do keep in mind that the current version of TS is using an older version of json2csv.In my testing, Spark created far too many files for the mention extracts. I assume that is a setting that can be configured, but I haven't looked into that yet.
Performance
I am curious to hear how you experience it. I would expect this code to be faster at least for large datasets -- it was so in testing on my own laptop -- but I haven't had a chance to test a large dataset on the VM's. There may be Spark settings we can tweak to improve performance, but my impression is that for some of these, what's optimal depends on the environment, and our dev VM's are not terribly good proxies for production. We might have more success testing this aspect on tweetsets-dev.