Apache Spark is a distributed computing framework designed for big data processing and analytics. This connector enables Qdrant to be a storage destination in Spark.
To integrate the connector into your Spark environment, get the JAR file from one of the sources listed below.
Important
Ensure your system is running Java 8.
The packaged jar
file can be found here.
To build the jar
from source, you need JDK@8 and Maven installed.
Once the requirements have been satisfied, run the following command in the project root.
mvn package
The JAR file will be written into the target
directory by default.
Find the project on Maven Central here.
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"spark.jars",
"spark-VERSION.jar", # Specify the downloaded JAR file
)
.master("local[*]")
.appName("qdrant")
.getOrCreate()
Important
Before loading the data using this connector, a collection has to be created in advance with the appropriate vector dimensions and configurations.
The connector supports ingesting multiple named/unnamed, dense/sparse vectors.
Click each to expand.
Unnamed/Default vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Named vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("vector_name", <VECTOR_NAME>)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
The
embedding_field
andvector_name
options are maintained for backward compatibility. It is recommended to usevector_fields
andvector_names
for named vectors as shown below.
Multiple named vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Multiple sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Combination of named dense and sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Multi-vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("multi_vector_fields", "<COLUMN_NAME>")
.option("multi_vector_names", "<MULTI_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Multiple Multi-vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("multi_vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("multi_vector_names", "<MULTI_VECTOR_NAME>,<ANOTHER_MULTI_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
No vectors - Entire dataframe is stored as payload
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Tip
Check out our example of using the Spark connector with Databricks.
You can use the connector as a library in Databricks to ingest data into Qdrant.
- Go to the
Libraries
section in your cluster dashboard. - Select
Install New
to open the library installation modal. - Search for
io.qdrant:spark:VERSION
in the Maven packages and clickInstall
.
The appropriate Spark data types are mapped to the Qdrant payload based on the provided schema
.
Option | Description | Column DataType | Required |
---|---|---|---|
qdrant_url |
gRPC URL of the Qdrant instance. Eg: http://localhost:6334 | - | ✅ |
collection_name |
Name of the collection to write data into | - | ✅ |
schema |
JSON string of the dataframe schema | - | ✅ |
embedding_field |
Name of the column holding the embeddings (Deprecated - Use vector_fields instead) |
ArrayType(FloatType) |
❌ |
id_field |
Name of the column holding the point IDs. Default: Random UUID | StringType or IntegerType |
❌ |
batch_size |
Max size of the upload batch. Default: 64 | - | ❌ |
retries |
Number of upload retries. Default: 3 | - | ❌ |
api_key |
Qdrant API key for authentication | - | ❌ |
vector_name |
Name of the vector in the collection. | - | ❌ |
vector_fields |
Comma-separated names of columns holding the vectors. | ArrayType(FloatType) |
❌ |
vector_names |
Comma-separated names of vectors in the collection. | - | ❌ |
sparse_vector_index_fields |
Comma-separated names of columns holding the sparse vector indices. | ArrayType(IntegerType) |
❌ |
sparse_vector_value_fields |
Comma-separated names of columns holding the sparse vector values. | ArrayType(FloatType) |
❌ |
sparse_vector_names |
Comma-separated names of the sparse vectors in the collection. | - | ❌ |
multi_vector_fields |
Comma-separated names of columns holding the multi-vector values. | ArrayType(ArrayType(FloatType)) |
❌ |
multi_vector_names |
Comma-separated names of the multi-vectors in the collection. | - | ❌ |
shard_key_selector |
Comma-separated names of custom shard keys to use during upsert. | - | ❌ |
wait |
Wait for each batch upsert to complete. true or false . Defaults to true . |
- | ❌ |
Apache 2.0 © 2024