LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
Edit on GitHub
  1. Integrations
  2. Data storage

Cassandra

Connecting to Apache Cassandra

To use Apache Cassandra as a vector database, create a "vector-database" (or "datasource") resource in your configuration.yaml file.

Support for Vector Search is available since Cassandra 5.0, so you need to use a version of Cassandra >= 5.0 or equivalent.

resources:
  - type: "vector-database"
    name: "CassandraDataSource"
    configuration:
      service: "cassandra"
      username: "${ secrets.cassandra.username }"
      password: "${ secrets.cassandra.password }"
      port: "${ secrets.cassandra.port }"
      contact-points: "${ secrets.cassandra.contact-points }"
      loadBalancing-localDc: "${ secrets.cassandra.loadBalancing-localDc }"
      

Required parameters:

  • contact-points: the address to connect to Cassandra

  • loadBalancing-localDc: the datacenter to connect to

Optional parameters:

  • port: the port to connect to Cassandra (default is 9042)

  • username: the username

  • password: the password

Special assets for Cassandra

For "Vector Database" resources based on Astra, you can use special assets in your pipeline: "cassandra-keyspace" and "cassandra-table".

assets:
  - name: "langstream-keyspace"
    asset-type: "cassandra-keyspace"
    creation-mode: create-if-not-exists    
    config:
      keyspace: "langstream"
      datasource: "CassandraDataSource"
      create-statements:
        - "CREATE KEYSPACE vsearch WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};"
      delete-statements:
        - "DROP KEYSPACE IF EXISTS vsearch;"
  - name: "products-table"
    asset-type: "cassandra-table"
    creation-mode: create-if-not-exists
    deletion-mode: delete
    config:
      table-name: "products"
      keyspace: "langstream"
      datasource: "CassandraDataSource"
      create-statements:
        - "CREATE TABLE IF NOT EXISTS langstream.products (id int PRIMARY KEY,name TEXT,description TEXT, embeddings VECTOR<FLOAT,1536>);"
        - "CREATE CUSTOM INDEX IF NOT EXISTS documents_ann_index ON documents.documents(embeddings) USING 'StorageAttachedIndex';"
      delete-statements:
        - "TRUNCATE TABLE langstream.products;"

With the "cassandra-keyspace" asset you can create a keyspace in your Cassandra cluster. The keyspace is a logical container for tables. It is similar to a database in a relational database.

With the "cassandra-table" asset you can create a table in your Astra DB instance. The table is a collection of rows that share a schema of columns. It is similar to a table in a relational database.

Writing to Cassandra

Use the "vector-db-sink" agent with the following parameters to write to a Cassandra database:

pipeline:
  - name: "Write to Cassandra"
    type: "vector-db-sink"
    input: "chunks-topic"
    resources:
      size: 2
    configuration:
      datasource: "CassandraDataSource"
      table-name: "documents"
      keyspace: "documents"
      mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"

Set the table-name to the name of the table you want to write to. Set the keyspace to the name of the keyspace you want to write to. The mapping field is a comma-separated list of field mappings, in the form "field-name=expression". The expression is a expression that can reference the value of the current message, for instance "value.filename".

Configuration

PreviousAstraNextPinecone

Last updated 1 year ago

Internally LangStream is using the DataStax Connector for Apache Kafka and Pulsar to write to Cassandra. You can find more information about the mapping parameters in the .

Check out the full configuration properties in the .

documentation
API Reference page