Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ingest/extract process #122

Open
dolsysmith opened this issue Jul 14, 2021 · 3 comments
Open

Refactor ingest/extract process #122

dolsysmith opened this issue Jul 14, 2021 · 3 comments
Assignees
Labels
Milestone

Comments

@dolsysmith
Copy link
Contributor

dolsysmith commented Jul 14, 2021

See also #117 and #121.

Problem

Depending on the parameters, user-generated extracts from large datasets can take a long time (hours or days) and consume a lot of disk space (~1TB). We have implemented a check to prevent the creation of multiple versions of a full (non-parametrized) extract, but we have nothing in place to prevent users from creating multiple versions of the same parametrized extract.

In addition, the extract jobs do not fail gracefully when interrupted by a critical error (such as a lack of disk space).

Questions

  1. What use cases do these custom extracts satisfy?
  2. Would we still meet most use cases if the app placed a reasonable limit on the size of custom extracts, given that full extracts are also available?

Proposal

Full extracts
  • Created automatically at time of load (using a Spark job).
  • Include mentions datasets (currently disabled).
Custom extracts
  • Option 1: Users can generate a custom extract containing up to N number of Tweets.
  • Option 2: Users can generate CSV/JSON extracts containing up to N number of Tweets and can generate extracts of Tweet ID's up to any size.
  • For the benefit of users wanting to filter the full extract themselves, we can provide documentation and a Jupyter notebook for doing this work locally with pyspark.

Benefits

  • Save space on disk by eliminating very large user extracts.
  • Obviate the need to provide fault tolerance for long-running jobs. Custom user extracts would be completed within a reasonable timeframe and could use the current implementation (Celery task queue).
  • Save time on producing full extracts. Latency on retrieval from large Elasticsearch result sets is high. Native Spark seems orders-of-magnitude more efficient (when writing the full dataset) -- but see the note below on the Spark DataFrame API. (On my local machine, for a 50 GB dataset, the longest extract (full Tweet JSON) took 8 minutes.

Workflow diagram

Further documentation (including testing) in this notebook.

@dolsysmith
Copy link
Contributor Author

dolsysmith commented Jul 22, 2021

Dev Setup Instructions

Updated 7/28/21

For development on this ticket, it will be necessary to set up an NFS on your dev environment, similar to the shared /storage/dataset_loading directory on prod. A shared filesystem is necessary for persisting data to disk from a Spark cluster.

To set up an NFS between two dev VM's, I used the following method (adapted from these instructions).

  1. Install the NFS server package on one VM (henceforth the NFS server):
    sudo apt update
    sudo apt install nfs-kernel-server
    
  2. Configure a directory to be exported. I'm using a directory on the /storage volume, which has adequate space for TweetSets testing.
    a. Create a directory to share: sudo mkdir /storage/dataset_loading
    b. Set yourself as the directory's owner: sudo chown dsmith:dsmith /storage/dataset_loading
    c. Edit the /etc/exports file: sudo nano /etc/exports
    d.. Add a line at the bottom mapping a directory on the server to the client VM, using the client VM's IP address. (This should be the IP address internal to the WRLC network, not the exterrnal IP address. On my VM's, it's associated with the ens160 interface.) Use values for your_uid and your_gid from step c. (This will give the client the same privileges to the shared folder as those associated with your user account on the server.)
    /storage/dataset_loading         172.27.20.233(rw,sync,no_root_squash,no_subtree_check)
    
  3. Restart the NFS server: sudo systemctl restart nfs-kernel-server. (If you get an error from this command, you can view the service log by running sudo journalctl -u nfs-server.service).
  4. On the client VM, install the nfs-common package:
sudo apt update
sudo apt install nfs-common

  1. Create a mount point on the client (if it doesn't already exist) and mount the NFS directory (from the server) to that directory. In this case, I'm mounting the dataset_loading directory on the server VM to the same directory on the client. (I believe this operation will overwrite any data in the local directory if it exists.) Note that the IP address here is that of the server VM.
sudo mkdir /storage/dataset_loading
sudo mount 172.27.20.231:/storage/dataset_loading /storage/dataset_loading
  1. To persist this change on reboot, add the following line to the /etc/fstab file on the client VM:
172.27.20.231:/storage/dataset_loading      /storage/dataset_loading    nfs auto,nofail,noatime,nolock,intr,tcp,actimeo=1800 0 0

@dolsysmith
Copy link
Contributor Author

dolsysmith commented Jul 22, 2021

Other considerations:

Spark DataFrame API

  • Moving from the Spark RDD API to the Spark DataFrame API (as documented in the notebook above) appears to provide significant performance benefits, both in loading datasets into Elasticsearch and in creating custom extracts. The reason for this improvement lies in the ability to leverage Spark SQL for the transformations currently delegated to Python. (Spark SQL is translated into Scala function calls, allowing us to execute all ELT logic within the Spark engine itself.)

Spark/pyspark version

  • The code above was written and tested with Spark/pyspark 2.4.7. The arrays_zip Spark function used in creating the mentions extracts requires version 2.4. TweetSets currently uses version 2.3.2; I believe we could upgrade without any breaking changes, but we would need to test.
  • Depending on how we choose to handle the creation of JSON extracts, we might benefit from upgrading to Spark v. 3, which changes the behavior when handling null values. It looks like version 3 is compatible with our ES version, but we would probably need to use a later version of the es-hadoop library as well.

@dolsysmith dolsysmith added this to the 2.2 milestone Jul 26, 2021
@dolsysmith dolsysmith changed the title Refactor extract process Refactor ingest/extract process Jul 26, 2021
@dolsysmith
Copy link
Contributor Author

dolsysmith commented Aug 2, 2021

Regarding our use of the Elasticsearch Scroll API to retrieve the results for the user-generated extracts, please note that in more recent version of Elasticsearch, the use of this API for deep pagination is discouraged:

We no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging through more than 10,000 hits, use the search_after parameter with a point in time (PIT).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants