Skip to content

Latest commit

 

History

History
274 lines (209 loc) · 14.1 KB

README.md

File metadata and controls

274 lines (209 loc) · 14.1 KB

Lithops

Lithops is a Python multi-cloud distributed computing framework. It allows you to run unmodified local python code at massive scale in the main serverless computing platforms. Lithops delivers the user’s code into the cloud without requiring knowledge of how it is deployed and run. Moreover, its multicloud-agnostic architecture ensures portability across cloud providers and on-premise deployments. In particular, Lithops offers compute and storage backends for most public Clouds (AWS, Google, Azure, IBM, Oracle, Alibaba), HPC Supercomputers (LithopsHPC), and on-premise deployments (OpenShift, OpenNebula, k8s).

Lithops is well suited for highly-parallel programs (parallel map) with little or no need for communication between processes (i.e. Monte Carlo simulations). However, Lithops is especially useful for parallel data processing where many functions read or write in parallel from Object Storage. As we can see in the following plot, Lithops can obtain an aggregate bandwidth of 100GB/s when 1000 Lambda functions read in parallel from S3. This is extremely useful for many scientific data analytics pipelines (genomics, metabolomics, astronomics, climate science, geospatial) that process unstructured or semi-structured data from Object Storage.

In Lithops, we can prioritize performance or cost depending on our requirements. If users require high performance and low startup times, they can use Serverless Function backends like AWS Lambda which can launch hundred of functions in milliseconds. Nevertheless, if the user prioritizes cost versus speed in batch analytics, it could also run the same code in AWS Batch over Spot instances at a fraction of the cost.

Installation

  1. Install Lithops from the PyPi repository:

    pip install lithops
  2. Execute a Hello World function:

    lithops hello

Configuration

Lithops provides an extensible backend architecture (compute, storage) that is designed to work with different Cloud providers and on-premise backends. In this sense, you can code in python and run it unmodified in IBM Cloud, AWS, Azure, Google Cloud, Oracle, Aliyun and on-premise deployments with Kubernetes, OpenNebula or OpenShift.

Follow these instructions to configure your compute and storage backends

Multicloud Lithops

High-level API

Lithops is shipped with 2 different high-level Compute APIs, and 2 high-level Storage APIs. However, the more commonly used APIS are the Future API and the Storage API (used together in most cases). The Storage OS API intercepts Python file access to interact with Object Storage. The Lithops Multiprocessing API intercepts Python standard Multiprocessing API to invoke processes in Functions/containers and to communicate them (Pipe, Queue, manager) using REDIS. Storage OS API and Multiprocessing API has been mainly used to port unmodified Python code to the Cloud.

In general, Lithops follows a simple parallel map API to apply a function to each item in an iterable (like a list) in parallel, utilizing multiple processors to speed up the computation. This is useful in embarrassingly parallel tasks that can be executed independently and simultaneously. Most Lithops pipelines use the Future APIs to launch different independent stages that read and write data to Object Storage.

Futures API

Multiprocessing API

from lithops import FunctionExecutor

def double(i):
    return i * 2

with FunctionExecutor() as fexec:
    f = fexec.map(double, [1,2,3,4,5,6])
    print(f.result())
from lithops.multiprocessing import Pool

def double(i):
    return i * 2

with Pool() as pool:
    result = pool.map(double, [1, 2, 3, 4])
    print(result)

Storage API

Storage OS API

from lithops import Storage

if __name__ == "__main__":
    st = Storage()
    st.put_object(bucket='mybucket',
                  key='test.txt',
                  body='Hello World')

    print(st.get_object(bucket='lithops',
                        key='test.txt'))
from lithops.storage.cloud_proxy import os

if __name__ == "__main__":
    filepath = 'bar/foo.txt'
    with os.open(filepath, 'w') as f:
        f.write('Hello world!')

    dirname = os.path.dirname(filepath)
    print(os.listdir(dirname))
    os.remove(filepath)

You can find more usage examples in the examples folder.

Elastic Data processing and Cloud optimized formats

Lithops is especially useful for parallel data processing. If the pool.map function, instead of a list, it uses a storage bucket, lithops will launch functions in parallel to process all data in that bucket. It will run one function per file, or it will even partition big files and give chunks to each function.

Lithops is ideally suited for processing Cloud Optimized data formats like ZARR, COG, COPC, or FlatGeoBuf among others. Cloud Optimized data is designed to enable on-the-fly partitioning and chunking that leverage Object Storage HTTP RANGE primitives. Such mechanism is essential for fast parallel data processing that benefits from the huge aggregate bandwidth obtained from concurrent functions reading from Object Storage.

Furthermore, thanks to the DATAPLUG library, Lithops can also provide on-the-fly partitioning for other non Cloud optimized data formats like FASTA, FASTQ, FASTQGZIP (genomics), mlMZ (metabolomics) and LIDAR (geospatial). For example, in the following code, we can see how Dataplug enables us to create a CloudObject from a LIDAR file in S3. Dataplug is then used to establish the on-the-fly dynamic partitions for the CloudObject (slices). And finally, Lithops can process the file in parallel from Object Storage just by passing the slices iterator to the map function.

from dataplug import CloudObject
from dataplug.formats.geospatial.copc import CloudOptimizedPointCloud, square_split_strategy

# Function to process each LiDAR slice
def process_lidar_slice(data_slice):
    las_data = data_slice.get()
    lidar_file = laspy.open(las_data)
    ...
    
 co = CloudObject.from_s3(
        CloudOptimizedPointCloud,
        "s3://geospatial/copc/CA_YosemiteNP_2019/USGS_LPC_CA_YosemiteNP_2019_D19_11SKB6892.laz",
        s3_config=local_minio,
    )

    # Partition the point cloud into chunks
    slices = co.partition(square_split_strategy, num_chunks=9)

    # Process each slice in parallel using Lithops
    with lithops.FunctionExecutor() as executor:
        futures = executor.map(process_lidar_slice, slices)
        results = executor.get_result(futures)

Success stories

  • Metaspace Metabolomics Platform is running in production in AWS with hundreds of users. MetaSpace is using Lithops over Lambda Functions and EC2 VMs to access metabolomics data in Amazon S3. MetaSpace moved from Spark to Lithops to simplify dynamic and elastic resource provisioning.
  • OpenNebula Open Source Cloud and Edge Computing platform integrates Lithops as an easy to use appliance for data analytics. OpenNebula also deploys minio storage and Lithops K8s backend to facilitate data analytics in on-premise and edge deployments.
  • Cubed is a popular library for scalable multidimensional array processing with bounded memory. Cubed is a drop-in replacement for Dask's Array API. Cubed integrates Lithops as fast compute backend enabling scalable array processing in the Cloud.
  • BSC Marenostrum 5 SuperComputer is a pre-exascale EuroHPC supercomputer with a peak computational power of 314PFlops. A new Lithops HPC compute backend has been created enabling large-scale computing reaching tens of thousands of concurrent functions. LithopsHPC is now being used in neardata.eu project for extreme data analytics of genomics pipelines.

Documentation

For documentation on using Lithops, see latest release documentation or current github docs.

If you are interested in contributing, see CONTRIBUTING.md.

Additional resources

Blogs and Talks

Papers

Acknowledgements

This project has received funding from the European Union's Horizon 2020 research and innovation programme under grant agreement No 825184 (CloudButton).