LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
Edit on GitHub
  1. Building Applications
  2. Application structure

Stateful agents

Provisioning disks to agents offers several benefits:

  • Stateful Computations: Allows agents to retain data between computations, enabling the execution of complex, multi-step tasks.

  • Improved Efficiency: Reduces the need to repeatedly fetch data from external sources, resulting in faster processing times.

  • Data Persistence: Ensures that important data is not lost in case of agent restarts or failures.

The agent must declare the resources.disk section to automatically ask for a persistent disk. Disks are automatically provided to the agents at runtime by LangStream: the provided disks are isolated from other agents and each agent can request different disk sizes and types.

- name: "Stateful processing using Python"
  resources:
    disk:
      enabled: true
      size: 50M
    id: "my-python-processor"
    type: "python-processor"

The disk section provides these parameters:

  • enabled (boolean): whether to provide the disk or not

  • size (string): size of the disk to provision. e.g. 100K, 100M, 100G

  • type (string): type of the disk

At runtime LangStream converts the disk specification to the actual storage provisioner disk request, as configured in the LangStream cluster. The type option is statically mapped to a Kubernetes Storage class. The value default means to use the default Storage Class configured in Kubernetes.

Once the agent requests the disk, the disk is mounted in the local file system of the agent. In Python, you can access the directory by calling AgentContext.get_persistent_state_directory().

from langstream import SimpleRecord, Processor, AgentContext
import os

class Exclamation(Processor):
    def init(self, config, context: AgentContext):
        self.context = context

    def process(self, record):
        directory = self.context.get_persistent_state_directory()
        counter_file = os.path.join(directory, "counter.txt")
        ...
PreviousError HandlingNext.langstreamignore

Last updated 1 year ago