LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Example
  • Topics
  • Configuration
Edit on GitHub
  1. Examples

LlamaIndex Cassandra sink

LlamaIndex is a tool for building LLM-powered applications over custom data.

Connect this sink to LangStream to sink records to a Cassandra vector database via LlamaIndex.

Example

This example pipeline receives records from an input topic and sinks the records to a vector database via LlamaIndex.

name: "LlamaIndex Cassandra sink"
topics:
  - name: "input-topic"
    creation-mode: create-if-not-exists
pipeline:
  - name: "Compute embeddings and store them in Cassandra using LlamaIndex"
    type: "python-sink"
    input: "input-topic"
    configuration:
      className: llamaindex_cassandra.LlamaIndexCassandraSink
      openaiKey: "${ secrets.open-ai.access-key }"
      cassandra:
        username: "${ secrets.astra.clientId }"
        password: "${ secrets.astra.secret }"
        secureBundle: "${ secrets.astra.secureBundle }"
        keyspace: ks1
        table: vs_ll_openai

This is the code for the python sink:


import base64
import io
from typing import Dict, Any

import openai
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from langstream import Sink, Record
from llama_index import VectorStoreIndex, Document
from llama_index.vector_stores import CassandraVectorStore


class LlamaIndexCassandraSink(Sink):
    def __init__(self):
        self.config = None
        self.session = None
        self.index = None

    def init(self, config: Dict[str, Any]):
        self.config = config
        openai.api_key = config["openaiKey"]

    def start(self):
        secure_bundle = self.config["cassandra"]["secureBundle"]
        secure_bundle = secure_bundle.removeprefix("base64:")
        secure_bundle = base64.b64decode(secure_bundle)
        cluster = Cluster(
            cloud={
                "secure_connect_bundle": io.BytesIO(secure_bundle),
                "use_default_tempdir": True,
            },
            auth_provider=PlainTextAuthProvider(
                self.config["cassandra"]["username"],
                self.config["cassandra"]["password"],
            ),
        )
        self.session = cluster.connect()

        vector_store = CassandraVectorStore(
            session=self.session,
            keyspace=self.config["cassandra"]["keyspace"],
            table=self.config["cassandra"]["table"],
            embedding_dimension=1536,
            insertion_batch_size=15,
        )

        self.index = VectorStoreIndex.from_vector_store(vector_store)

    def write(self, record: Record):
        self.index.insert(Document(text=record.value()))

    def close(self):
        if self.session:
            self.session.shutdown()

Topics

Input

Output

  • None, it’s a sink.

Configuration

Label
Type
Description

openaiKey

String

The API key used to authenticate with the OpenAI service.

secureBundle

String

A base64-encoded secure connect bundle for connecting to the Cassandra cluster.

username

String

The username used for authentication to the Cassandra cluster.

password

String

The password used for authentication to the Cassandra cluster.

keyspace

String

The keyspace (namespace) in Cassandra to be used.

table

String

The name of the Cassandra table where the data will be written.

PreviousFLARE pattern

Last updated 1 year ago

Structured and unstructured text

Implicit topic

Templating

?
?
?