Links
Comment on page

LlamaIndex Cassandra sink

LlamaIndex is a tool for building LLM-powered applications over custom data.
Connect this sink to LangStream to sink records to a Cassandra vector database via LlamaIndex.

Example

This example pipeline receives records from an input topic and sinks the records to a vector database via LlamaIndex.
name: "LlamaIndex Cassandra sink"
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
pipeline:
- name: "Compute embeddings and store them in Cassandra using LlamaIndex"
type: "python-sink"
input: "input-topic"
configuration:
className: llamaindex_cassandra.LlamaIndexCassandraSink
openaiKey: "${ secrets.open-ai.access-key }"
cassandra:
username: "${ secrets.astra.clientId }"
password: "${ secrets.astra.secret }"
secureBundle: "${ secrets.astra.secureBundle }"
keyspace: ks1
table: vs_ll_openai
This is the code for the python sink:
import base64
import io
from typing import Dict, Any
import openai
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from langstream import Sink, Record
from llama_index import VectorStoreIndex, Document
from llama_index.vector_stores import CassandraVectorStore
class LlamaIndexCassandraSink(Sink):
def __init__(self):
self.config = None
self.session = None
self.index = None
def init(self, config: Dict[str, Any]):
self.config = config
openai.api_key = config["openaiKey"]
def start(self):
secure_bundle = self.config["cassandra"]["secureBundle"]
secure_bundle = secure_bundle.removeprefix("base64:")
secure_bundle = base64.b64decode(secure_bundle)
cluster = Cluster(
cloud={
"secure_connect_bundle": io.BytesIO(secure_bundle),
"use_default_tempdir": True,
},
auth_provider=PlainTextAuthProvider(
self.config["cassandra"]["username"],
self.config["cassandra"]["password"],
),
)
self.session = cluster.connect()
vector_store = CassandraVectorStore(
session=self.session,
keyspace=self.config["cassandra"]["keyspace"],
table=self.config["cassandra"]["table"],
embedding_dimension=1536,
insertion_batch_size=15,
)
self.index = VectorStoreIndex.from_vector_store(vector_store)
def write(self, record: Record):
self.index.insert(Document(text=record.value()))
def close(self):
if self.session:
self.session.shutdown()

Topics

Input
  • Structured and unstructured text ?
  • Implicit topic ?
  • Templating ?
Output
  • None, it’s a sink.

Configuration

Label
Type
Description
openaiKey
String
The API key used to authenticate with the OpenAI service.
secureBundle
String
A base64-encoded secure connect bundle for connecting to the Cassandra cluster.
username
String
The username used for authentication to the Cassandra cluster.
password
String
The password used for authentication to the Cassandra cluster.
keyspace
String
The keyspace (namespace) in Cassandra to be used.
table
String
The name of the Cassandra table where the data will be written.