LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Example
  • Topics
  • Configuration
Edit on GitHub
  1. Pipeline Agents
  2. Custom Agents

Python processor

PreviousPython sourceNextPython service

Last updated 1 year ago

Along with the pre-made agents, you can provide your own processor agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory create a directory named “python”.

Within that directory place the .py file with the class that will be the entry point.

The directory will look something like this:

|- Project directory
    |- application
        |- python
            |- application.py
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python processor agents, see the

Example

Example python class located at ./application/python/example.py:

from langstream import SimpleRecord

# Example Python processor that adds an exclamation mark to the end of the record value
class Exclamation(Processor):
  def process(self, record):
      return [SimpleRecord(record.value() + "!!")]

Configure the agent to use the python class:

- name: "Process using Python"
  type: "python-function"
  input: "input-topic" # optional
  output: "output-topic" # optional
  configuration:
    className: example.Exclamation

The python application can optionally take in parameters from the application environment. The following is an example python application that is given a “config” object in its init method.

from langstream import SimpleRecord
import openai
import json
from openai.embeddings_utils import get_embedding

class Embedding(object):

  def init(self, config):
    print('init', config)
    openai.api_key = config["openaiKey"]

  def process(self, record):
    embedding = get_embedding(record.value(), engine='text-embedding-ada-002')
    result = {"input": str(record.value()), "embedding": embedding}
    new_value = json.dumps(result)
    return [SimpleRecord(value=new_value)]

The config object is a map that is built from the agent's pipeline.yaml:

- name: "OpenAI Embeddings"
  type: "python-function"
  input: "input-topic" # optional
  output: "output-topic" # optional
  configuration:
    className: embeddings.Embedding
    openaiKey: "${ secrets.open-ai.access-key }"

Topics

Input

  • None, the message and configuration will be provided as input to the python function.

Output

  • Structured as a langstream SimpleRecord

Configuration

Label
Type
Description

className

String (required)

A combination of the file name and the class name.

Example: For the file my-python-app.py that has class MyProcessor, the value would be my-python-app.MyProcessor

<any>

Additional configuration properties specific to the application.

Implicit topic

Implicit topic

Agent Developer Guide.
?
?