LlamaIndex Cassandra sink

LlamaIndex is a tool for building LLM-powered applications over custom data.

Connect this sink to LangStream to sink records to a Cassandra vector database via LlamaIndex.

Example

This example pipeline receives records from an input topic and sinks the records to a vector database via LlamaIndex.

name: "LlamaIndex Cassandra sink"
topics:
  - name: "input-topic"
    creation-mode: create-if-not-exists
pipeline:
  - name: "Compute embeddings and store them in Cassandra using LlamaIndex"
    type: "python-sink"
    input: "input-topic"
    configuration:
      className: llamaindex_cassandra.LlamaIndexCassandraSink
      openaiKey: "${ secrets.open-ai.access-key }"
      cassandra:
        username: "${ secrets.astra.clientId }"
        password: "${ secrets.astra.secret }"
        secureBundle: "${ secrets.astra.secureBundle }"
        keyspace: ks1
        table: vs_ll_openai

This is the code for the python sink:


import base64
import io
from typing import Dict, Any

import openai
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from langstream import Sink, Record
from llama_index import VectorStoreIndex, Document
from llama_index.vector_stores import CassandraVectorStore


class LlamaIndexCassandraSink(Sink):
    def __init__(self):
        self.config = None
        self.session = None
        self.index = None

    def init(self, config: Dict[str, Any]):
        self.config = config
        openai.api_key = config["openaiKey"]

    def start(self):
        secure_bundle = self.config["cassandra"]["secureBundle"]
        secure_bundle = secure_bundle.removeprefix("base64:")
        secure_bundle = base64.b64decode(secure_bundle)
        cluster = Cluster(
            cloud={
                "secure_connect_bundle": io.BytesIO(secure_bundle),
                "use_default_tempdir": True,
            },
            auth_provider=PlainTextAuthProvider(
                self.config["cassandra"]["username"],
                self.config["cassandra"]["password"],
            ),
        )
        self.session = cluster.connect()

        vector_store = CassandraVectorStore(
            session=self.session,
            keyspace=self.config["cassandra"]["keyspace"],
            table=self.config["cassandra"]["table"],
            embedding_dimension=1536,
            insertion_batch_size=15,
        )

        self.index = VectorStoreIndex.from_vector_store(vector_store)

    def write(self, record: Record):
        self.index.insert(Document(text=record.value()))

    def close(self):
        if self.session:
            self.session.shutdown()

Topics

Input

  • Structured and unstructured text ?

  • Implicit topic ?

  • Templating ?

Output

  • None, it’s a sink.

Configuration

Last updated