Agents

LangStream Version: 0.6.2

Compute chat completions (ai-chat-completions)

Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field.

Description
Type
Required
Default Value

ai-service

In case of multiple AI services configured, specify the id of the AI service to use.

string

completion-field

Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.

string

composable

Whether this step can be composed with other steps.

boolean

true

frequency-penalty

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

log-field

Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts.

string

logit-bias

Parameter for the completion request. The parameters are passed to the AI Service as is.

object

max-tokens

Parameter for the completion request. The parameters are passed to the AI Service as is.

integer

messages

Messages to use for chat completions. You can use the Mustache syntax.

min-chunks-per-message

Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.

integer

20

model

The model to use for chat completions. The model must be available in the AI Service.

string

options

Additional options for the model configuration. The structure depends on the model and AI provider.

object

presence-penalty

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

stop

Parameter for the completion request. The parameters are passed to the AI Service as is.

array of string

stream

Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.

boolean

true

stream-response-completion-field

Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.

string

stream-to-topic

Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.

string

temperature

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

top-p

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

user

Parameter for the completion request. The parameters are passed to the AI Service as is.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Compute chat completions (ai-chat-completions).messages

Description
Type
Required
Default Value

role

Role of the message. The role is used to identify the speaker in the chat.

string

content

Content of the message. You can use the Mustache syntax.

string

Compute text completions (ai-text-completions)

Sends the text to the AI Service to compute text completions. The result is stored in the specified field.

Description
Type
Required
Default Value

ai-service

In case of multiple AI services configured, specify the id of the AI service to use.

string

completion-field

Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.

string

composable

Whether this step can be composed with other steps.

boolean

true

frequency-penalty

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

log-field

Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts.

string

logit-bias

Parameter for the completion request. The parameters are passed to the AI Service as is.

object

logprobs

Logprobs parameter (only valid for OpenAI).

string

logprobs-field

Log probabilities to a field.

string

max-tokens

Parameter for the completion request. The parameters are passed to the AI Service as is.

integer

min-chunks-per-message

Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.

integer

20

model

The model to use for text completions. The model must be available in the AI Service.

string

options

Additional options for the model configuration. The structure depends on the model and AI provider.

object

presence-penalty

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

prompt

Prompt to use for text completions. You can use the Mustache syntax.

array of string

stop

Parameter for the completion request. The parameters are passed to the AI Service as is.

array of string

stream

Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.

boolean

true

stream-response-completion-field

Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.

string

stream-to-topic

Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.

string

temperature

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

top-p

Parameter for the completion request. The parameters are passed to the AI Service as is.

number

user

Parameter for the completion request. The parameters are passed to the AI Service as is.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Azure Blob Storage Source (azure-blob-storage-source)

Reads data from Azure blobs. There are three supported ways to authenticate: - SAS token - Storage account name and key - Storage account connection string

Description
Type
Required
Default Value

container

The name of the Azure econtainer to read from.

string

langstream-azure-source

endpoint

Endpoint to connect to. Usually it's https://<storage-account>.blob.core.windows.net.

string

file-extensions

Comma separated list of file extensions to filter by.

string

pdf,docx,html,htm,md,txt

idle-time

Time in seconds to sleep after polling for new files.

integer

5

sas-token

Azure SAS token. If not provided, storage account name and key must be provided.

string

storage-account-connection-string

Azure storage account connection string. If not provided, SAS token must be provided.

string

storage-account-key

Azure storage account key. If not provided, SAS token must be provided.

string

storage-account-name

Azure storage account name. If not provided, SAS token must be provided.

string

Apache Camel Source (camel-source)

Use Apache Camel components as Source

Description
Type
Required
Default Value

component-options

Additional parmaters to pass to the Camel component in the query string format. The values are automatically encoded

object

component-uri

The Camel URI of the component to use as Source.

string

key-header

Header to use as key of the record

string

max-buffered-records

Maximum number of records to buffer

integer

100

Cast record to another schema (cast)

Transforms the data to a target compatible schema. Some step operations like cast or compute involve conversions from a type to another. When this happens the rules are: - timestamp, date and time related object conversions assume UTC time zone if it is not explicit. - date and time related object conversions to/from STRING use the RFC3339 format. - timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z). - date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01). - time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

part

When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.

string

schema-type

The target schema type.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Compute values from the record (compute)

Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

fields

An array of objects describing how to calculate the field values

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Compute values from the record (compute).fields

Description
Type
Required
Default Value

expression

It is evaluated at runtime and the result of the evaluation is assigned to the field. Refer to the language expression documentation for more information on the expression syntax.

string

name

The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message. In addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.]. Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.

string

optional

If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.

boolean

false

type

The type of the computed field. This will translate to the schema type of the new field in the transformed message. The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT. The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used. For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation.

string

Compute embeddings of the record (compute-ai-embeddings)

Compute embeddings of the record. The embeddings are stored in the record under a specific field.

Description
Type
Required
Default Value

ai-service

In case of multiple AI services configured, specify the id of the AI service to use.

string

arguments

Additional arguments to pass to the AI Service. (HuggingFace only)

object

batch-size

Batch size for submitting the embeddings requests.

integer

10

composable

Whether this step can be composed with other steps.

boolean

true

concurrency

Max number of concurrent requests to the AI Service.

integer

4

embeddings-field

Field where to store the embeddings.

string

flush-interval

Flushing is disabled by default in order to avoid latency spikes. You should enable this feature in the case of background processing.

integer

0

loop-over

Execute the agent over a list of documents

string

model

Model to use for the embeddings. The model must be available in the configured AI Service.

string

text-embedding-ada-002

model-url

URL of the model to use. (HuggingFace only). The default is computed from the model: "djl://ai.djl.huggingface.pytorch/{model}"

string

options

Additional options to pass to the AI Service. (HuggingFace only)

object

text

Text to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example: text: "{{{ value.field1 }}} {{{ value.field2 }}}"

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Dispatch agent (dispatch)

Dispatches messages to different destinations based on conditions.

Description
Type
Required
Default Value

routes

Routes.

Dispatch agent (dispatch).routes

Description
Type
Required
Default Value

destination

Destination of the message.

string

action

Action on the message. Possible values are "dispatch" or "drop".

string

dispatch

when

Condition to activate the route. This is a standard EL expression.

string

Document to JSON (document-to-json)

Convert raw text document to JSON. The result will be a JSON object with the text content in the specified field.

Description
Type
Required
Default Value

copy-properties

Whether to copy the message properties/headers in the output message.

boolean

true

text-field

Field name to write the text content to.

string

text

Drop the record (drop)

Drops the record from further processing. Use in conjunction with when to selectively drop records.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Drop fields (drop-fields)

Drops the record fields.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

fields

Fields to drop from the input record.

array of string

part

Part to drop. (value or key)

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Flare Controller (flare-controller)

Apply to the Flare pattern to enhance the quality of text completion results.

Description
Type
Required
Default Value

logprobs-field

The field that contains the logprobs of tokens returned by the ai-text-completion agent.

string

loop-topic

Name of the topic to forward the message in case of requesting more documents.

string

retrieve-documents-field

Name of the field to set in order to request the retrival of more documents.

string

tokens-field

The field that contains the list of tokens returned by the ai-text-completion agent.

string

Flatten record fields (flatten)

Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

delimiter

The delimiter to use when concatenating the field names.

string

_

part

When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Http Request (http-request)

Agent for enriching data with an HTTP request.

Description
Type
Required
Default Value

allow-redirects

Whether or not to follow redirects.

boolean

true

body

Body to send with the request. You can use the Mustache syntax to inject value from the context.

string

handle-cookies

Whether or not to handle cookies during the redirects.

boolean

true

headers

Headers to send with the request. You can use the Mustache syntax to inject value from the context.

object

method

Http method to use for the request.

string

GET

output-field

The field that will hold the results, it can be the same as "field" to override it.

string

query-string

Query string to append to the url. You can use the Mustache syntax to inject value from the context. Note that the values will be automatically escaped.

object

url

Url to send the request to. For adding query string parameters, use the `query-string` field.

string

Identity function (identity)

Simple agent to move data from the input to the output. Could be used for testing or sample applications.

Invoke LangServe (langserve-invoke)

Agent for invoking LangServe based applications

Description
Type
Required
Default Value

allow-redirects

Whether or not to follow redirects.

boolean

true

content-field

Field in the response that will be used as the content of the record.

string

content

debug

Field in the response that will be used as the content of the record.

boolean

fields

Fields of the generated records.

handle-cookies

Whether or not to handle cookies during the redirects.

boolean

true

headers

Headers to send with the request. You can use the Mustache syntax to inject value from the context.

object

method

Http method to use for the request.

string

POST

min-chunks-per-message

Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.

integer

20

output-field

The field that will hold the results, it can be the same as "field" to override it.

string

value

stream-response-field

Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.

string

stream-to-topic

Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.

string

url

Url to send the request to. For adding query string parameters, use the `query-string` field.

string

Invoke LangServe (langserve-invoke).fields

Description
Type
Required
Default Value

expression

Expression to compute the value of the field. This is a standard EL expression.

string

name

Name of the field like value.xx, key.xxx, properties.xxx

string

Language detector (language-detector)

Detect the language of a message’s data and limit further processing based on language codes.

Description
Type
Required
Default Value

allowedLanguages

Define a list of allowed language codes. If the message language is not in this list, the message is dropped.

array of string

property

The name of the message header to write the language code to.

string

language

Log an event (log-event)

Log a line in the agent logs when a record is received.

Description
Type
Required
Default Value

fields

Fields to log.

message

Template for a log message to print (Mustache).

string

when

Condition to trigger the operation. This is a standard EL expression.

string

true

Log an event (log-event).fields

Description
Type
Required
Default Value

expression

Expression to compute the value of the field. This is a standard EL expression.

string

name

Name of the field like value.xx, key.xxx, properties.xxx

string

Merge key-value format (merge-key-value)

Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Python custom processor (python-function)

Run a your own Python processor. All the configuration properties are available the class init method.

Description
Type
Required
Default Value

className

Python class name to instantiate. This class must be present in the application's "python" files.

string

Python custom processor (python-processor)

Run a your own Python processor. All the configuration properties are available the class init method.

Description
Type
Required
Default Value

className

Python class name to instantiate. This class must be present in the application's "python" files.

string

Python custom service (python-service)

Run a your own Python service. All the configuration properties are available in the class init method.

Description
Type
Required
Default Value

className

Python class name to instantiate. This class must be present in the application's "python" files.

string

Python custom sink (python-sink)

Run a your own Python sink. All the configuration properties are available in the class init method.

Description
Type
Required
Default Value

className

Python class name to instantiate. This class must be present in the application's "python" files.

string

Python custom source (python-source)

Run a your own Python source. All the configuration properties are available in the class init method.

Description
Type
Required
Default Value

className

Python class name to instantiate. This class must be present in the application's "python" files.

string

Query (query)

Perform a vector search or simple query against a datasource.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

datasource

Reference to a datasource id configured in the application.

string

fields

Fields of the record to use as input parameters for the query.

array of string

generated-keys

List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.

array of string

loop-over

Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field".

string

mode

Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).

string

query

only-first

If true, only the first result of the query is stored in the output field.

boolean

false

output-field

The name of the field to use to store the query result.

string

query

The query to use to extract the data.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Query a vector database (query-vector-db)

Query a vector database using Vector Search capabilities.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

datasource

Reference to a datasource id configured in the application.

string

fields

Fields of the record to use as input parameters for the query.

array of string

generated-keys

List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.

array of string

loop-over

Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field".

string

mode

Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).

string

query

only-first

If true, only the first result of the query is stored in the output field.

boolean

false

output-field

The name of the field to use to store the query result.

string

query

The query to use to extract the data.

string

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Re-rank (re-rank)

Agent for re-ranking documents based on a query.

Description
Type
Required
Default Value

algorithm

Algorithm to use for re-ranking. 'none' or 'MMR'.

string

none

b

Parameter for B25 algorithm.

number

0.75

embeddings-field

Result field for the embeddings.

string

field

The field that contains the documents to sort.

string

k1

Parameter for B25 algorithm.

number

1.5

lambda

Parameter for MMR algorithm.

number

0.5

max

Maximum number of documents to keep.

integer

100

output-field

The field that will hold the results, it can be the same as "field" to override it.

string

query-embeddings

Field that contains the embeddings of the documents to sort.

string

query-text

Field that already contains the text that has been embedded.

string

text-field

Result field for the text.

string

S3 Source (s3-source)

Reads data from S3 bucket

Description
Type
Required
Default Value

access-key

Access key for the S3 server.

string

minioadmin

bucketName

The name of the bucket to read from.

string

langstream-source

endpoint

The endpoint of the S3 server.

string

http://minio-endpoint.-not-set:9090

file-extensions

Comma separated list of file extensions to filter by.

string

pdf,docx,html,htm,md,txt

idle-time

Time in seconds to sleep after polling for new files.

integer

5

region

Region for the S3 server.

string

secret-key

Secret key for the S3 server.

string

minioadmin

Kafka Connect Sink agent (sink)

Run any Kafka Connect Sink. All the configuration properties are passed to the Kafka Connect Sink.

Description
Type
Required
Default Value

connector.class

Java main class for the Kafka Sink connector.

string

Kafka Connect Source agent (source)

Run any Kafka Connect Source. All the configuration properties are passed to the Kafka Connect Source.

Description
Type
Required
Default Value

connector.class

Java main class for the Kafka Source connector.

string

Text extractor (text-extractor)

Extracts text content from different document formats like PDF, JSON, XML, ODF, HTML and many others.

Text normaliser (text-normaliser)

Apply normalisation to the text.

Description
Type
Required
Default Value

make-lowercase

Whether to make the text lowercase.

boolean

true

trim-spaces

Whether to trim spaces from the text.

boolean

true

Text splitter (text-splitter)

Split message content in chunks.

Description
Type
Required
Default Value

chunk_overlap

RecursiveCharacterTextSplitter splitter option. Chunk overlap of the previous message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.

integer

100

chunk_size

RecursiveCharacterTextSplitter splitter option. Chunk size of each message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.

integer

200

keep_separator

RecursiveCharacterTextSplitter splitter option. Whether or not to keep separators. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.

boolean

false

length_function

RecursiveCharacterTextSplitter splitter option. Options are: r50k_base, p50k_base, p50k_edit and cl100k_base. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.

string

cl100k_base

separators

RecursiveCharacterTextSplitter splitter option. The separator to use for splitting. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.

array of string

"\n\n", "\n", " ", ""

splitter_type

Splitter implementation to use. Currently supported: RecursiveCharacterTextSplitter.

string

RecursiveCharacterTextSplitter

Timer source (timer-source)

Periodically emits records to trigger the execution of pipelines.

Description
Type
Required
Default Value

fields

Fields of the generated records.

period-seconds

Period of the timer in seconds.

integer

60

Timer source (timer-source).fields

Description
Type
Required
Default Value

expression

Expression to compute the value of the field. This is a standard EL expression.

string

name

Name of the field like value.xx, key.xxx, properties.xxx

string

Trigger event (trigger-event)

Emits a record on a side destination when a record is received.

Description
Type
Required
Default Value

continue-processing

Whether to continue processing the record downstream after emitting the event. If the when condition is false, the record is passed downstream anyway. This flag allows you to stop processing system events and trigger a different pipeline.

boolean

true

destination

Destination of the message.

string

fields

Fields of the generated records.

when

Condition to trigger the event. This is a standard EL expression.

string

true

Trigger event (trigger-event).fields

Description
Type
Required
Default Value

expression

Expression to compute the value of the field. This is a standard EL expression.

string

name

Name of the field like value.xx, key.xxx, properties.xxx

string

Unwrap key-value format (unwrap-key-value)

If the record value is in KeyValue format, extracts the KeyValue's key or value and make it the record value.

Description
Type
Required
Default Value

composable

Whether this step can be composed with other steps.

boolean

true

unwrapKey

Whether to unwrap the key instead of the value.

boolean

false

when

Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"

string

Astra (vector-db-sink)

Writes data to DataStax Astra service. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

Description
Type
Required
Default Value

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.

string

keyspace

The keyspace of the table to write to.

string

mapping

Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.

string

table-name

The name of the table to write to. The table must already exist.

string

Astra Vector DB (vector-db-sink)

Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

Description
Type
Required
Default Value

collection-name

The name of the collection to write to. The collection must already exist.

string

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra-vector-db'.

string

fields

Fields of the collection to write to.

Astra Vector DB (vector-db-sink).fields

Description
Type
Required
Default Value

expression

JSTL Expression for computing the field value.

string

name

Field name

string

Cassandra (vector-db-sink)

Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

Description
Type
Required
Default Value

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.

string

keyspace

The keyspace of the table to write to.

string

mapping

Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.

string

table-name

The name of the table to write to. The table must already exist.

string

JDBC (vector-db-sink)

Writes data to any JDBC compatible database.

Description
Type
Required
Default Value

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.

string

fields

Fields of the table to write to.

table-name

The name of the table to write to. The table must already exist.

string

JDBC (vector-db-sink).fields

Description
Type
Required
Default Value

expression

JSTL Expression for computing the field value.

string

name

Field name

string

primary-key

Is this field part of the primary key?

boolean

false

Milvus (vector-db-sink)

Writes data to Milvus/Zillis service.

Description
Type
Required
Default Value

collection-name

Collection name

string

database-name

Collection name

string

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.

string

fields

Fields definition.

Milvus (vector-db-sink).fields

Description
Type
Required
Default Value

expression

JSTL Expression for computing the field value.

string

name

Field name

string

OpenSearch (vector-db-sink)

Writes data to OpenSearch or AWS OpenSearch serverless.

Description
Type
Required
Default Value

batch-size

Batch size for bulk operations. Hitting the batch size will trigger a flush.

integer

10

bulk-parameters

OpenSearch bulk URL parameters.

object

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.

string

fields

Index fields definition.

flush-interval

Flush interval in milliseconds

integer

1000

id

JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.

string

OpenSearch (vector-db-sink).bulk-parameters

Description
Type
Required
Default Value

pipeline

The pipeline ID for preprocessing documents. Refer to the OpenSearch documentation for more details.

string

routing

Routes the request to the specified shard. Refer to the OpenSearch documentation for more details.

string

require_alias

Set to true to require that all actions target an index alias rather than an index. Refer to the OpenSearch documentation for more details.

boolean

refresh

Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer. Note that AWS OpenSearch supports only false. Refer to the OpenSearch documentation for more details.

string

wait_for_active_shards

Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed. Refer to the OpenSearch documentation for more details.

string

timeout

How long to wait for the request to return. Refer to the OpenSearch documentation for more details.

string

OpenSearch (vector-db-sink).fields

Description
Type
Required
Default Value

expression

JSTL Expression for computing the field value.

string

name

Field name

string

Pinecone (vector-db-sink)

Writes data to Pinecone service. To add metadata fields you can add vector.metadata.my-field: "value.my-field". The value is a JSTL Expression to compute the actual value.

Description
Type
Required
Default Value

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.

string

vector.id

JSTL Expression to compute the id.

string

vector.metadata

Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.

object

vector.namespace

JSTL Expression to compute the namespace.

string

vector.vector

JSTL Expression to compute the vector.

string

Apache Solr (vector-db-sink)

Writes data to Apache Solr service. The collection-name is configured at datasource level.

Description
Type
Required
Default Value

commit-within

Commit within option

integer

1000

datasource

Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.

string

fields

Fields definition.

Apache Solr (vector-db-sink).fields

Description
Type
Required
Default Value

expression

JSTL Expression for computing the field value.

string

name

Field name

string

Web crawler source (webcrawler-source)

Crawl a website and extract the content of the pages.

Description
Type
Required
Default Value

access-key

Configuration for handling the agent status. Access key for the S3 server.

string

minioadmin

allow-non-html-contents

Whether to emit non HTML documents to the pipeline (i.e. PDF Files).

boolean

false

allowed-domains

Domains that the crawler is allowed to access.

array of string

bucketName

Configuration for handling the agent status. The name of the bucket.

string

langstream-source

endpoint

Configuration for handling the agent status. The S3 endpoint.

string

http://minio-endpoint.-not-set:9090

forbidden-paths

Paths that the crawler is not allowed to access.

array of string

handle-cookies

Whether to handle cookies.

boolean

true

handle-robots-file

Whether to scan the HTML documents to find links to other pages.

boolean

true

http-timeout

Timeout for HTTP requests. (in milliseconds)

integer

10000

max-depth

Maximum depth of the crawl.

integer

50

max-error-count

Maximum number of errors allowed before stopping.

integer

5

max-unflushed-pages

Maximum number of unflushed pages before the agent persists the crawl data.

integer

100

max-urls

Maximum number of URLs that can be crawled.

integer

1000

min-time-between-requests

Minimum time between two requests to the same domain. (in milliseconds)

integer

500

region

Configuration for handling the agent status. Region for the S3 server.

string

reindex-interval-seconds

Time interval between reindexing of the pages.

integer

86400

scan-html-documents

Whether to scan HTML documents for links to other sites.

boolean

true

secret-key

Configuration for handling the agent status. Secret key for the S3 server.

string

minioadmin

seed-urls

The starting URLs for the crawl.

array of string

state-storage

State storage configuration. "s3" or "disk"

string

s3

user-agent

User agent to use for the requests.

string

Mozilla/5.0 (compatible; LangStream.ai/0.1; +https://langstream.ai)

Last updated