Agents
LangStream Version: 0.6.2
Compute chat completions (ai-chat-completions
)
ai-chat-completions
)Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field.
ai-service
In case of multiple AI services configured, specify the id of the AI service to use.
string
completion-field
Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
string
composable
Whether this step can be composed with other steps.
boolean
true
frequency-penalty
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
log-field
Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts.
string
logit-bias
Parameter for the completion request. The parameters are passed to the AI Service as is.
object
max-tokens
Parameter for the completion request. The parameters are passed to the AI Service as is.
integer
min-chunks-per-message
Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer
20
model
The model to use for chat completions. The model must be available in the AI Service.
string
✓
options
Additional options for the model configuration. The structure depends on the model and AI provider.
object
presence-penalty
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
stop
Parameter for the completion request. The parameters are passed to the AI Service as is.
array of string
stream
Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.
boolean
true
stream-response-completion-field
Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
string
stream-to-topic
Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.
string
temperature
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
top-p
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
user
Parameter for the completion request. The parameters are passed to the AI Service as is.
string
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Compute chat completions (ai-chat-completions
).messages
ai-chat-completions
).messagesrole
Role of the message. The role is used to identify the speaker in the chat.
string
content
Content of the message. You can use the Mustache syntax.
string
✓
Compute text completions (ai-text-completions
)
ai-text-completions
)Sends the text to the AI Service to compute text completions. The result is stored in the specified field.
ai-service
In case of multiple AI services configured, specify the id of the AI service to use.
string
completion-field
Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
string
composable
Whether this step can be composed with other steps.
boolean
true
frequency-penalty
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
log-field
Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts.
string
logit-bias
Parameter for the completion request. The parameters are passed to the AI Service as is.
object
logprobs
Logprobs parameter (only valid for OpenAI).
string
logprobs-field
Log probabilities to a field.
string
max-tokens
Parameter for the completion request. The parameters are passed to the AI Service as is.
integer
min-chunks-per-message
Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer
20
model
The model to use for text completions. The model must be available in the AI Service.
string
✓
options
Additional options for the model configuration. The structure depends on the model and AI provider.
object
presence-penalty
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
prompt
Prompt to use for text completions. You can use the Mustache syntax.
array of string
✓
stop
Parameter for the completion request. The parameters are passed to the AI Service as is.
array of string
stream
Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.
boolean
true
stream-response-completion-field
Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
string
stream-to-topic
Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.
string
temperature
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
top-p
Parameter for the completion request. The parameters are passed to the AI Service as is.
number
user
Parameter for the completion request. The parameters are passed to the AI Service as is.
string
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Azure Blob Storage Source (azure-blob-storage-source
)
azure-blob-storage-source
)Reads data from Azure blobs. There are three supported ways to authenticate: - SAS token - Storage account name and key - Storage account connection string
container
The name of the Azure econtainer to read from.
string
langstream-azure-source
endpoint
Endpoint to connect to. Usually it's https://<storage-account>.blob.core.windows.net.
string
✓
file-extensions
Comma separated list of file extensions to filter by.
string
pdf,docx,html,htm,md,txt
idle-time
Time in seconds to sleep after polling for new files.
integer
5
sas-token
Azure SAS token. If not provided, storage account name and key must be provided.
string
storage-account-connection-string
Azure storage account connection string. If not provided, SAS token must be provided.
string
storage-account-key
Azure storage account key. If not provided, SAS token must be provided.
string
storage-account-name
Azure storage account name. If not provided, SAS token must be provided.
string
Apache Camel Source (camel-source
)
camel-source
)Use Apache Camel components as Source
component-options
Additional parmaters to pass to the Camel component in the query string format. The values are automatically encoded
object
component-uri
The Camel URI of the component to use as Source.
string
✓
key-header
Header to use as key of the record
string
max-buffered-records
Maximum number of records to buffer
integer
100
Cast record to another schema (cast
)
cast
)Transforms the data to a target compatible schema. Some step operations like cast or compute involve conversions from a type to another. When this happens the rules are: - timestamp, date and time related object conversions assume UTC time zone if it is not explicit. - date and time related object conversions to/from STRING use the RFC3339 format. - timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z). - date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01). - time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).
composable
Whether this step can be composed with other steps.
boolean
true
part
When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.
string
schema-type
The target schema type.
string
✓
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Compute values from the record (compute
)
compute
)Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.
composable
Whether this step can be composed with other steps.
boolean
true
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Compute values from the record (compute
).fields
compute
).fieldsexpression
It is evaluated at runtime and the result of the evaluation is assigned to the field. Refer to the language expression documentation for more information on the expression syntax.
string
✓
name
The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message. In addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.]. Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.
string
✓
optional
If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.
boolean
false
type
The type of the computed field. This will translate to the schema type of the new field in the transformed message. The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT. The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used. For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation.
string
Compute embeddings of the record (compute-ai-embeddings
)
compute-ai-embeddings
)Compute embeddings of the record. The embeddings are stored in the record under a specific field.
ai-service
In case of multiple AI services configured, specify the id of the AI service to use.
string
arguments
Additional arguments to pass to the AI Service. (HuggingFace only)
object
batch-size
Batch size for submitting the embeddings requests.
integer
10
composable
Whether this step can be composed with other steps.
boolean
true
concurrency
Max number of concurrent requests to the AI Service.
integer
4
embeddings-field
Field where to store the embeddings.
string
✓
flush-interval
Flushing is disabled by default in order to avoid latency spikes. You should enable this feature in the case of background processing.
integer
0
loop-over
Execute the agent over a list of documents
string
model
Model to use for the embeddings. The model must be available in the configured AI Service.
string
text-embedding-ada-002
model-url
URL of the model to use. (HuggingFace only). The default is computed from the model: "djl://ai.djl.huggingface.pytorch/{model}"
string
options
Additional options to pass to the AI Service. (HuggingFace only)
object
text
Text to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example: text: "{{{ value.field1 }}} {{{ value.field2 }}}"
string
✓
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Dispatch agent (dispatch
)
dispatch
)Dispatches messages to different destinations based on conditions.
Dispatch agent (dispatch
).routes
dispatch
).routesdestination
Destination of the message.
string
action
Action on the message. Possible values are "dispatch" or "drop".
string
dispatch
when
Condition to activate the route. This is a standard EL expression.
string
Document to JSON (document-to-json
)
document-to-json
)Convert raw text document to JSON. The result will be a JSON object with the text content in the specified field.
copy-properties
Whether to copy the message properties/headers in the output message.
boolean
true
text-field
Field name to write the text content to.
string
text
Drop the record (drop
)
drop
)Drops the record from further processing. Use in conjunction with when to selectively drop records.
composable
Whether this step can be composed with other steps.
boolean
true
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Drop fields (drop-fields
)
drop-fields
)Drops the record fields.
composable
Whether this step can be composed with other steps.
boolean
true
fields
Fields to drop from the input record.
array of string
✓
part
Part to drop. (value or key)
string
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Flare Controller (flare-controller
)
flare-controller
)Apply to the Flare pattern to enhance the quality of text completion results.
logprobs-field
The field that contains the logprobs of tokens returned by the ai-text-completion agent.
string
✓
loop-topic
Name of the topic to forward the message in case of requesting more documents.
string
✓
retrieve-documents-field
Name of the field to set in order to request the retrival of more documents.
string
✓
tokens-field
The field that contains the list of tokens returned by the ai-text-completion agent.
string
✓
Flatten record fields (flatten
)
flatten
)Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.
composable
Whether this step can be composed with other steps.
boolean
true
delimiter
The delimiter to use when concatenating the field names.
string
_
part
When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.
string
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Http Request (http-request
)
http-request
)Agent for enriching data with an HTTP request.
allow-redirects
Whether or not to follow redirects.
boolean
true
body
Body to send with the request. You can use the Mustache syntax to inject value from the context.
string
handle-cookies
Whether or not to handle cookies during the redirects.
boolean
true
headers
Headers to send with the request. You can use the Mustache syntax to inject value from the context.
object
method
Http method to use for the request.
string
GET
output-field
The field that will hold the results, it can be the same as "field" to override it.
string
✓
query-string
Query string to append to the url. You can use the Mustache syntax to inject value from the context. Note that the values will be automatically escaped.
object
url
Url to send the request to. For adding query string parameters, use the `query-string` field.
string
✓
Identity function (identity
)
identity
)Simple agent to move data from the input to the output. Could be used for testing or sample applications.
Invoke LangServe (langserve-invoke
)
langserve-invoke
)Agent for invoking LangServe based applications
allow-redirects
Whether or not to follow redirects.
boolean
true
content-field
Field in the response that will be used as the content of the record.
string
content
debug
Field in the response that will be used as the content of the record.
boolean
handle-cookies
Whether or not to handle cookies during the redirects.
boolean
true
headers
Headers to send with the request. You can use the Mustache syntax to inject value from the context.
object
method
Http method to use for the request.
string
POST
min-chunks-per-message
Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer
20
output-field
The field that will hold the results, it can be the same as "field" to override it.
string
✓
value
stream-response-field
Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
string
stream-to-topic
Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.
string
url
Url to send the request to. For adding query string parameters, use the `query-string` field.
string
✓
Invoke LangServe (langserve-invoke
).fields
langserve-invoke
).fieldsexpression
Expression to compute the value of the field. This is a standard EL expression.
string
✓
name
Name of the field like value.xx, key.xxx, properties.xxx
string
✓
Language detector (language-detector
)
language-detector
)Detect the language of a message’s data and limit further processing based on language codes.
allowedLanguages
Define a list of allowed language codes. If the message language is not in this list, the message is dropped.
array of string
property
The name of the message header to write the language code to.
string
language
Log an event (log-event
)
log-event
)Log a line in the agent logs when a record is received.
message
Template for a log message to print (Mustache).
string
when
Condition to trigger the operation. This is a standard EL expression.
string
true
Log an event (log-event
).fields
log-event
).fieldsexpression
Expression to compute the value of the field. This is a standard EL expression.
string
✓
name
Name of the field like value.xx, key.xxx, properties.xxx
string
✓
Merge key-value format (merge-key-value
)
merge-key-value
)Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.
composable
Whether this step can be composed with other steps.
boolean
true
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Python custom processor (python-function
)
python-function
)Run a your own Python processor. All the configuration properties are available the class init method.
className
Python class name to instantiate. This class must be present in the application's "python" files.
string
✓
Python custom processor (python-processor
)
python-processor
)Run a your own Python processor. All the configuration properties are available the class init method.
className
Python class name to instantiate. This class must be present in the application's "python" files.
string
✓
Python custom service (python-service
)
python-service
)Run a your own Python service. All the configuration properties are available in the class init method.
className
Python class name to instantiate. This class must be present in the application's "python" files.
string
✓
Python custom sink (python-sink
)
python-sink
)Run a your own Python sink. All the configuration properties are available in the class init method.
className
Python class name to instantiate. This class must be present in the application's "python" files.
string
✓
Python custom source (python-source
)
python-source
)Run a your own Python source. All the configuration properties are available in the class init method.
className
Python class name to instantiate. This class must be present in the application's "python" files.
string
✓
Query (query
)
query
)Perform a vector search or simple query against a datasource.
composable
Whether this step can be composed with other steps.
boolean
true
datasource
Reference to a datasource id configured in the application.
string
✓
fields
Fields of the record to use as input parameters for the query.
array of string
generated-keys
List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.
array of string
loop-over
Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field".
string
mode
Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).
string
query
only-first
If true, only the first result of the query is stored in the output field.
boolean
false
output-field
The name of the field to use to store the query result.
string
✓
query
The query to use to extract the data.
string
✓
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Query a vector database (query-vector-db
)
query-vector-db
)Query a vector database using Vector Search capabilities.
composable
Whether this step can be composed with other steps.
boolean
true
datasource
Reference to a datasource id configured in the application.
string
✓
fields
Fields of the record to use as input parameters for the query.
array of string
generated-keys
List of fields to use as generated keys. The generated keys are returned in the output, depending on the database.
array of string
loop-over
Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field".
string
mode
Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).
string
query
only-first
If true, only the first result of the query is stored in the output field.
boolean
false
output-field
The name of the field to use to store the query result.
string
✓
query
The query to use to extract the data.
string
✓
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Re-rank (re-rank
)
re-rank
)Agent for re-ranking documents based on a query.
algorithm
Algorithm to use for re-ranking. 'none' or 'MMR'.
string
none
b
Parameter for B25 algorithm.
number
0.75
embeddings-field
Result field for the embeddings.
string
field
The field that contains the documents to sort.
string
✓
k1
Parameter for B25 algorithm.
number
1.5
lambda
Parameter for MMR algorithm.
number
0.5
max
Maximum number of documents to keep.
integer
100
output-field
The field that will hold the results, it can be the same as "field" to override it.
string
✓
query-embeddings
Field that contains the embeddings of the documents to sort.
string
query-text
Field that already contains the text that has been embedded.
string
text-field
Result field for the text.
string
S3 Source (s3-source
)
s3-source
)Reads data from S3 bucket
access-key
Access key for the S3 server.
string
minioadmin
bucketName
The name of the bucket to read from.
string
langstream-source
endpoint
The endpoint of the S3 server.
string
http://minio-endpoint.-not-set:9090
file-extensions
Comma separated list of file extensions to filter by.
string
pdf,docx,html,htm,md,txt
idle-time
Time in seconds to sleep after polling for new files.
integer
5
region
Region for the S3 server.
string
secret-key
Secret key for the S3 server.
string
minioadmin
Kafka Connect Sink agent (sink
)
sink
)Run any Kafka Connect Sink. All the configuration properties are passed to the Kafka Connect Sink.
connector.class
Java main class for the Kafka Sink connector.
string
✓
Kafka Connect Source agent (source
)
source
)Run any Kafka Connect Source. All the configuration properties are passed to the Kafka Connect Source.
connector.class
Java main class for the Kafka Source connector.
string
✓
Text extractor (text-extractor
)
text-extractor
)Extracts text content from different document formats like PDF, JSON, XML, ODF, HTML and many others.
Text normaliser (text-normaliser
)
text-normaliser
)Apply normalisation to the text.
make-lowercase
Whether to make the text lowercase.
boolean
true
trim-spaces
Whether to trim spaces from the text.
boolean
true
Text splitter (text-splitter
)
text-splitter
)Split message content in chunks.
chunk_overlap
RecursiveCharacterTextSplitter splitter option. Chunk overlap of the previous message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
integer
100
chunk_size
RecursiveCharacterTextSplitter splitter option. Chunk size of each message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
integer
200
keep_separator
RecursiveCharacterTextSplitter splitter option. Whether or not to keep separators. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
boolean
false
length_function
RecursiveCharacterTextSplitter splitter option. Options are: r50k_base, p50k_base, p50k_edit and cl100k_base. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
string
cl100k_base
separators
RecursiveCharacterTextSplitter splitter option. The separator to use for splitting. Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
array of string
"\n\n", "\n", " ", ""
splitter_type
Splitter implementation to use. Currently supported: RecursiveCharacterTextSplitter.
string
RecursiveCharacterTextSplitter
Timer source (timer-source
)
timer-source
)Periodically emits records to trigger the execution of pipelines.
period-seconds
Period of the timer in seconds.
integer
60
Timer source (timer-source
).fields
timer-source
).fieldsexpression
Expression to compute the value of the field. This is a standard EL expression.
string
✓
name
Name of the field like value.xx, key.xxx, properties.xxx
string
✓
Trigger event (trigger-event
)
trigger-event
)Emits a record on a side destination when a record is received.
continue-processing
Whether to continue processing the record downstream after emitting the event. If the when condition is false, the record is passed downstream anyway. This flag allows you to stop processing system events and trigger a different pipeline.
boolean
true
destination
Destination of the message.
string
✓
when
Condition to trigger the event. This is a standard EL expression.
string
true
Trigger event (trigger-event
).fields
trigger-event
).fieldsexpression
Expression to compute the value of the field. This is a standard EL expression.
string
✓
name
Name of the field like value.xx, key.xxx, properties.xxx
string
✓
Unwrap key-value format (unwrap-key-value
)
unwrap-key-value
)If the record value is in KeyValue format, extracts the KeyValue's key or value and make it the record value.
composable
Whether this step can be composed with other steps.
boolean
true
unwrapKey
Whether to unwrap the key instead of the value.
boolean
false
when
Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string
Astra (vector-db-sink
)
vector-db-sink
)Writes data to DataStax Astra service. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.
string
✓
keyspace
The keyspace of the table to write to.
string
mapping
Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.
string
✓
table-name
The name of the table to write to. The table must already exist.
string
✓
Astra Vector DB (vector-db-sink
)
vector-db-sink
)Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
collection-name
The name of the collection to write to. The collection must already exist.
string
✓
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra-vector-db'.
string
✓
Astra Vector DB (vector-db-sink
).fields
vector-db-sink
).fieldsexpression
JSTL Expression for computing the field value.
string
✓
name
Field name
string
✓
Cassandra (vector-db-sink
)
vector-db-sink
)Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.
string
✓
keyspace
The keyspace of the table to write to.
string
mapping
Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.
string
✓
table-name
The name of the table to write to. The table must already exist.
string
✓
JDBC (vector-db-sink
)
vector-db-sink
)Writes data to any JDBC compatible database.
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.
string
✓
table-name
The name of the table to write to. The table must already exist.
string
✓
JDBC (vector-db-sink
).fields
vector-db-sink
).fieldsexpression
JSTL Expression for computing the field value.
string
✓
name
Field name
string
✓
primary-key
Is this field part of the primary key?
boolean
false
Milvus (vector-db-sink
)
vector-db-sink
)Writes data to Milvus/Zillis service.
collection-name
Collection name
string
database-name
Collection name
string
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.
string
✓
Milvus (vector-db-sink
).fields
vector-db-sink
).fieldsexpression
JSTL Expression for computing the field value.
string
✓
name
Field name
string
✓
OpenSearch (vector-db-sink
)
vector-db-sink
)Writes data to OpenSearch or AWS OpenSearch serverless.
batch-size
Batch size for bulk operations. Hitting the batch size will trigger a flush.
integer
10
bulk-parameters
OpenSearch bulk URL parameters.
object
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.
string
✓
flush-interval
Flush interval in milliseconds
integer
1000
id
JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.
string
OpenSearch (vector-db-sink
).bulk-parameters
vector-db-sink
).bulk-parameterspipeline
The pipeline ID for preprocessing documents. Refer to the OpenSearch documentation for more details.
string
routing
Routes the request to the specified shard. Refer to the OpenSearch documentation for more details.
string
require_alias
Set to true to require that all actions target an index alias rather than an index. Refer to the OpenSearch documentation for more details.
boolean
refresh
Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer. Note that AWS OpenSearch supports only false. Refer to the OpenSearch documentation for more details.
string
wait_for_active_shards
Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed. Refer to the OpenSearch documentation for more details.
string
timeout
How long to wait for the request to return. Refer to the OpenSearch documentation for more details.
string
OpenSearch (vector-db-sink
).fields
vector-db-sink
).fieldsexpression
JSTL Expression for computing the field value.
string
✓
name
Field name
string
✓
Pinecone (vector-db-sink
)
vector-db-sink
)Writes data to Pinecone service. To add metadata fields you can add vector.metadata.my-field: "value.my-field". The value is a JSTL Expression to compute the actual value.
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.
string
✓
vector.id
JSTL Expression to compute the id.
string
vector.metadata
Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.
object
vector.namespace
JSTL Expression to compute the namespace.
string
vector.vector
JSTL Expression to compute the vector.
string
Apache Solr (vector-db-sink
)
vector-db-sink
)Writes data to Apache Solr service. The collection-name is configured at datasource level.
commit-within
Commit within option
integer
1000
datasource
Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.
string
✓
Apache Solr (vector-db-sink
).fields
vector-db-sink
).fieldsexpression
JSTL Expression for computing the field value.
string
✓
name
Field name
string
✓
Web crawler source (webcrawler-source
)
webcrawler-source
)Crawl a website and extract the content of the pages.
access-key
Configuration for handling the agent status. Access key for the S3 server.
string
minioadmin
allow-non-html-contents
Whether to emit non HTML documents to the pipeline (i.e. PDF Files).
boolean
false
allowed-domains
Domains that the crawler is allowed to access.
array of string
bucketName
Configuration for handling the agent status. The name of the bucket.
string
langstream-source
endpoint
Configuration for handling the agent status. The S3 endpoint.
string
http://minio-endpoint.-not-set:9090
forbidden-paths
Paths that the crawler is not allowed to access.
array of string
handle-cookies
Whether to handle cookies.
boolean
true
handle-robots-file
Whether to scan the HTML documents to find links to other pages.
boolean
true
http-timeout
Timeout for HTTP requests. (in milliseconds)
integer
10000
max-depth
Maximum depth of the crawl.
integer
50
max-error-count
Maximum number of errors allowed before stopping.
integer
5
max-unflushed-pages
Maximum number of unflushed pages before the agent persists the crawl data.
integer
100
max-urls
Maximum number of URLs that can be crawled.
integer
1000
min-time-between-requests
Minimum time between two requests to the same domain. (in milliseconds)
integer
500
region
Configuration for handling the agent status. Region for the S3 server.
string
reindex-interval-seconds
Time interval between reindexing of the pages.
integer
86400
scan-html-documents
Whether to scan HTML documents for links to other sites.
boolean
true
secret-key
Configuration for handling the agent status. Secret key for the S3 server.
string
minioadmin
seed-urls
The starting URLs for the crawl.
array of string
state-storage
State storage configuration. "s3" or "disk"
string
s3
user-agent
User agent to use for the requests.
string
Mozilla/5.0 (compatible; LangStream.ai/0.1; +https://langstream.ai)
Last updated