LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Python sink example
  • Topics
  • Configuration
Edit on GitHub
  1. Pipeline Agents
  2. Custom Agents

Python sink

PreviousCustom AgentsNextPython source

Last updated 1 year ago

Along with the included pre-made agents, you can provide your own custom sink agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory, create a directory named “python”.

Within that directory, place the .py file with the class that will be the entry point.

The directory will look like this:

|- Project directory
    |- application
        |- python
            |- application.py
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python sink agents, see the

Python sink example

from typing import List
from langstream import Sink, Record

class ExampleSink(Sink):

    def start(self):
        # Initialize any necessary resources or connections here
        pass

    def write(self, record: Record):
        # Process the records to the sink
        # Perform your processing logic here
        processed_data = self.process_data(record.value)

        # Print or log the processed data
        print("Processed Data:", processed_data)

        # Implement any necessary logic for storing the processed data

    def process_data(self, data):
        # Placeholder for your data processing logic
        # Modify this function according to your actual processing needs
        processed_data = data.upper()  # Example: Convert text to uppercase
        return processed_data

    def close(self):
        # Clean up any resources or connections here
        pass

Configure the agent to use the python class in pipeline.yaml:

pipeline:
  - name: "A custom Python sink"
    type: "python-sink"
    input: "input-topic"
    output: "output-topic"
    configuration:
      className: application.ExampleSink

Topics

Input

Output

  • None, it’s a sink.

Configuration

Label
Type
Description

className

String (required)

A combination of the file name and the class name.

Example: For the file my-python-app.py that has class MySink, the value would be my-python-app.MySink

<any>

Additional configuration properties specific to the application.

Structured and unstructured text

Implicit topic

Templating

Agent Developer Guide.
?
?
?