Added: Support for source tags
Partners can now add source tags to the spark connector.
Example:
The following example shows how to set source tag when upserting records into Pinecone using PySpark:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
# Initialize Spark session with the spark-pinecone dependency
spark = SparkSession.builder.getOrCreate()
# Your API key, index name, and source tag
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"
# Declare the schema
COMMON_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("namespace", StringType(), True),
StructField("values", ArrayType(FloatType(), False), False),
StructField("metadata", StringType(), True),
StructField("sparse_values", StructType([
StructField("indices", ArrayType(LongType(), False), False),
StructField("values", ArrayType(FloatType(), False), False)
]), True)
])
# Read the file and apply the schema
df = spark.read \
.option("multiLine", value = True) \
.option("mode", "PERMISSIVE") \
.schema(COMMON_SCHEMA) \
.json("/FileStore/tables/sample-4.jsonl")
# Show if the read was successful
print("df count:", df.count(), "should be 7")
df.show()
# Write to Pinecone
df.write \
.option("pinecone.apiKey", api_key) \
.option("pinecone.indexName", index_name) \
.option("pinecone.sourceTag", source_tag) \
.format("io.pinecone.spark.pinecone.Pinecone") \
.mode("append") \
.save()
The following example shows how to set source tag when upserting records into Pinecone using scala-spark:
import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object MainApp extends App {
// Define the Pinecone API key, index name, and source tag
val apiKey = "PINECONE_API_KEY"
val indexName = "PINECONE_INDEX_NAME"
val sourceTag = "PINECONE_SOURCE_TAG"
// Configure Spark to run locally with all available cores
val conf = new SparkConf().setMaster("local[*]")
// Create a Spark session with the defined configuration
val spark = SparkSession.builder().config(conf).getOrCreate()
// Read the JSON file into a DataFrame, applying the COMMON_SCHEMA
val df = spark.read
.option("multiLine", value = true)
.option("mode", "PERMISSIVE")
.schema(COMMON_SCHEMA)
.json("src/test/resources/sample.jsonl") // path to sample.jsonl
// Define Pinecone options as a Map
val pineconeOptions = Map(
PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName,
PineconeOptions.PINECONE_SOURCE_TAG_CONF -> sourceTag
)
// Show if the read was successful
println(df.count() + "should be 7")
df.show(df.count().toInt)
// Write the DataFrame to Pinecone using the defined options
df.write
.options(pineconeOptions)
.format("io.pinecone.spark.pinecone.Pinecone")
.mode(SaveMode.Append)
.save()
}
Updated: Metadata size to 40 kb
Previously, users could only upsert records with metadata size of 5 kb. With this release, users can upsert records with metadata of size 40 kb.
Updated: Pinecone java sdk client v1.0.0 to v1.2.2
Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v1.0.0
to v1.1.0
.
What's Changed
- Remove extra variables from scala-spark example and remove runTest step from release process by @rohanshah18 in #33
- Update max metadata size to 40 KB by @rohanshah18 in #34
- Update java sdk to v1.2.2. and add source tag by @rohanshah18 in #35
- Release v1.1.0 by @rohanshah18 in #36
Full Changelog: v1.0.0...v1.1.0