Links

Pipelines

A pipeline is a manifest of topics and processing steps.
The example manifest below defines a pipeline with 1 step. It receives messages via "input-topic", processes the messages with the ai-chat-completions agent, and sends the processed messages to "output-topic".
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
- name: "output-topic"
creation-mode: create-if-not-exists
errors:
on-failure: skip
pipeline:
- name: "ai-chat-completions"
type: "ai-chat-completions"
input: "input-topic"
configuration:
model: "gpt-3.5-turbo"
completion-field: "value"
messages:
- role: user
content: "What can you tell me about {{ value}} ?"

Pipeline configuration

Property
Description
Type
module
String
The module reference for this pipeline. If not specified, a default module will be used.
id
String
Unique id of the pipeline. If not specified, it will be computed automatically.
name
String
The name of the pipeline.
topics
object[]
A collection of topics that will be bound to the application lifecycle, and used to transport data between steps. See Topics for more details
errors
String
Behavior followed when errors occur. See Error Handling.
assets
object[]
A collection of assets that will be bound to the application lifecycle. See Assets for more details
resources
object
Resources configuration for the pipeline agents.
pipeline
object[] (Required)
Pipeline agents configuration.

Pipeline agents configuration

Inside the pipeline property, you must specify a list of agents.
Each agent can be configured with the following properties.
Name
Type
Description
name
String (required)
Agent name
id
String (required)
type
String (required)
The type name of processing to be run. See AI Actions for supported types.
input
String
Reference to the topic name
output
String
Reference to the topic name
configuration
object
Given the chosen type, these are the config values used. Refer to the configuration area of each type for more info.

Agent resources

When deploying a pipeline, some agents might require additional CPU or memory to run properly. Depending on the traffic load and the agent architecture, you might need to scale the pipeline vertically or horizontally, or both.
The resources property allows you to specify the size (scale vertically) and the parallelism (scale horizontally) for the pipeline.
topics:
...
resources:
parallelism: 3
size: 2
pipeline:
...
Setting parallelism to 3 will deploy the pipeline with 3 different replicas. Each replica will use the configured size for CPU and memory.
The size parameter describes both CPU and memory. The value is a multiplier factor that is computed at runtime, starting from a base cpu/memory value. For example, if the default value for the memory - for a single replica - is 512 MB, specifying size: 2 will make the pipeline to use 1024MB per-replica.
The requested resources are implemented by using Kubernetes limits. Be aware that if an agent tries to use more memory than the requested, the pipeline will fail with OOMKilled error.

Multiple manifests in one application

When you place multiple yaml manifests in the same "application" folder, LangStream will create a single application with multiple modules. Each module is a pipeline of agent steps.
|- project-folder
|- application
|- pipeline.yaml
|- second-pipeline.yaml
|- gateways.yaml
|- configuration.yaml
In this structure, both pipeline.yaml and second-pipeline.yaml will be created as a single application, with a separate module for each pipeline.
See the webcrawler pipeline agent and its source code for an example.