LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Example
  • Defining the contents of the output record
  • Aborting the processing downstream
  • Configuration
Edit on GitHub
  1. Pipeline Agents
  2. Builtin agents
  3. Flow control

trigger-event

Previoustimer-sourceNextCustom Agents

Last updated 1 year ago

This agent writes a new record (an "event") to a different topic, based on a condition. It can also optionally drop the record from the main flow of the pipeline.

This agent is different from the as the record that is sent to the new topic is a new record, not the same record that was received.

Example

This is an example using the trigger-event agent to write a new record to a different topic when a condition is met.

  - name: "Split some text"  
    type: "text-splitter"
    input: input-topic-splitter
    configuration:
       ....

  - name: "Trigger event on last chunk"
    type: "trigger-event"
    output: output-topic-chunks
    configuration:
      destination: drop-stale-chunks-topic
      continue-processing: true
      when: fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)
      fields:
          - name: "value.filename"
            expression: "key.filename"

In this example the text-splitter agent splits a text into a set of chunks. When the last chunk is processed, the trigger-event agent writes a new record to the drop-stale-chunks-topic topic. The new record will have a filename field in the value part ("value.filename").

Defining the contents of the output record

Aborting the processing downstream

The trigger-event agent can also abort the processing of the record downstream by setting the continue-processing property to false. This is useful in cases where you have some system events that you want to write to a different topic, but you don't want to continue processing the record downstream.

Configuration

The trigger-event agent configures a set of fields that will be written to the output record. As usual you can write to the key part, the value part, and the properties of the record. Use the to define the fields and write the expression.

Check out the full configuration properties in the .

dispatch agent
expression language
API Reference page