LlamaIndex is a tool for building LLM-powered applications over custom data.
Connect this sink to LangStream to sink records to a Cassandra vector database via LlamaIndex.
This example pipeline receives records from an input topic and sinks the records to a vector database via LlamaIndex.
name: "LlamaIndex Cassandra sink"
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
pipeline:
- name: "Compute embeddings and store them in Cassandra using LlamaIndex"
type: "python-sink"
input: "input-topic"
configuration:
className: llamaindex_cassandra.LlamaIndexCassandraSink
openaiKey: "${ secrets.open-ai.access-key }"
cassandra:
username: "${ secrets.astra.clientId }"
password: "${ secrets.astra.secret }"
secureBundle: "${ secrets.astra.secureBundle }"
keyspace: ks1
table: vs_ll_openai
import base64
import io
from typing import Dict, Any
import openai
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from langstream import Sink, Record
from llama_index import VectorStoreIndex, Document
from llama_index.vector_stores import CassandraVectorStore
class LlamaIndexCassandraSink(Sink):
def __init__(self):
self.config = None
self.session = None
self.index = None
def init(self, config: Dict[str, Any]):
self.config = config
openai.api_key = config["openaiKey"]
def start(self):
secure_bundle = self.config["cassandra"]["secureBundle"]
secure_bundle = secure_bundle.removeprefix("base64:")
secure_bundle = base64.b64decode(secure_bundle)
cluster = Cluster(
cloud={
"secure_connect_bundle": io.BytesIO(secure_bundle),
"use_default_tempdir": True,
},
auth_provider=PlainTextAuthProvider(
self.config["cassandra"]["username"],
self.config["cassandra"]["password"],
),
)
self.session = cluster.connect()
vector_store = CassandraVectorStore(
session=self.session,
keyspace=self.config["cassandra"]["keyspace"],
table=self.config["cassandra"]["table"],
embedding_dimension=1536,
insertion_batch_size=15,
)
self.index = VectorStoreIndex.from_vector_store(vector_store)
def write(self, record: Record):
self.index.insert(Document(text=record.value()))
def close(self):
if self.session:
self.session.shutdown()