LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Prerequisites
  • Example
  • Topics
  • Configuration
Edit on GitHub
  1. Pipeline Agents
  2. Builtin agents
  3. Input & Output

sink

Previousazure-blob-storage-sourceNextvector-db-sink

Last updated 1 year ago

The sink agent will attempt to write the input message data to the specified connector. LangStream does not pre-load any sink connectors. However it is compatible with Kafka connect. You can specify a Kafka sink connector (jar) as an application dependency and then use this agent to implement the connector.

Prerequisites

You will need the (public) URL of the sink connector’s jar. Here are resources to discover different Kafka connectors:

  • (find public URL of connector jar)

  • (example individual connector release)

Once you have the jar, you’ll need its sha512 hash. Here are resources to calculate this:

  • (simple online tool)

  • (for linux, mac, & wsl)

  • (for powershell)

Finally, you will need the connector’s configuration properties. If you are using an individually distributed jar, the properties should be available in the project’s documentation. offers a searchable database of connectors, which link to the documentation.

Example

This is an example using the Apache Cassandra sink for Kafka. Include the URL, sha512, and type as an application dependency in configuration.yaml

configuration:
  dependencies:
    - name: "Kafka Connect Sink for Apache Cassandra from DataStax"
      url: "https://github.com/datastax/kafka-sink/releases/download/1.5.0/kafka-connect-cassandra-sink-1.5.0.jar"
      sha512sum: "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
      type: "java-library"

Then the agent would implement the sink connector with appropriate values

- name: "Write to Cassandra"
  type: "sink"
  input: "input-topic" # optional
  configuration:
    name: cassandra-sink
    "connector.class": com.datastax.oss.kafka.sink.CassandraSinkConnector
    "key.converter": org.apache.kafka.connect.storage.StringConverter
    "value.converter": org.apache.kafka.connect.storage.StringConverter
    "cloud.secureConnectBundle": "{{ secrets.cassandra.secure-connect-bundle }}"
    "auth.username": "{{ secrets.cassandra.username }}"
    "auth.password": "{{ secrets.cassandra.password }}"
    "topic.input-topic.vsearch.products.mapping": "id=value.id,description=value.description,name=value.name"

Topics

Input

Output

  • None, it’s a sink.

Configuration

Structured and unstructured text

Implicit topic

Templating

Checkout the full configuration properties in the .

Confluent maven repository
Apache Cassandra sink for Kafka
SHA512 File Hash online
sha512sum compute
Get-FileHash
Confluent connector hub
?
?
?
API Reference page