LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Example
  • Topics
  • Configuration
Edit on GitHub
  1. Pipeline Agents
  2. Custom Agents

Python source

PreviousPython sinkNextPython processor

Last updated 1 year ago

Along with the included pre-made agents, you can provide your own custom source agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory, create a directory named “python”.

Within that directory, place the .py file with the class that will be the entry point.

The directory will look something like this:

|- Project directory
    |- application
        |- python
            |- application.py
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python source agents, see the

Example

Example python source located at ./application/python/example.py:

import time
from typing import List
from urllib.parse import urlparse

import boto3
from langchain.document_loaders import S3DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

from langstream import Source, SimpleRecord

class S3Record(SimpleRecord):
    def __init__(self, url, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url = url


class S3LangChain(Source):
    def __init__(self):
        self.loader = None
        self.bucket = None

    def init(self, config):
        bucket_name = config.get("bucketName", "langstream-s3-langchain")
        endpoint_url = config.get("endpoint", "http://minio-endpoint.-not-set:9090")
        aws_access_key_id = config.get("username", "minioadmin")
        aws_secret_access_key = config.get("password", "minioadmin")
        s3 = boto3.resource(
            "s3",
            endpoint_url=endpoint_url,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )

        self.bucket = s3.Bucket(bucket_name)
        self.loader = S3DirectoryLoader(
            bucket_name,
            endpoint_url=endpoint_url,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )

    def read(self) -> List[S3Record]:
        time.sleep(1)
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=100,
            chunk_overlap=20,
            length_function=len,
            add_start_index=False,
        )
        docs = self.loader.load_and_split(text_splitter=text_splitter)
        return [
            S3Record(doc.metadata["source"], value=doc.page_content) for doc in docs
        ]

    def commit(self, records: List[S3Record]):
        objects_to_delete = [
            {"Key": f'{urlparse(record.url).path.lstrip("/")}'} for record in records
        ]
        self.bucket.delete_objects(Delete={"Objects": objects_to_delete})

Configure the agent to use the python class in pipeline.yaml:

- name: "A custom python source"
  type: "python-source"
  output: "output-topic" # optional
  configuration:
    className: application.S3LangChain
    bucketName: langstream-langchain-source
    endpoint: "https://s3.eu-west-2.amazonaws.com"

Topics

Input

  • None, it’s a source

Output

  • Structured as a langstream SimpleRecord

Configuration

Label
Type
Description

className

String (required)

A combination of the file name and the class name.

Example: For the file my-python-app.py that has class MySource, the value would be my-python-app.MySource

<any>

Additional configuration properties specific to the application.

\

Implicit topic

Agent Developer Guide.
?