compute
The compute agent computes field values based on expressions evaluated at runtime. If the field already exists, it will be overwritten.
Example
Given the input:
With an agent configuration of:
The output would be:
Topics
Input
Output
Configuration
Label | Type | Description |
---|---|---|
fields | object[] (required) | An array of objects describing how to calculate the field values. Refer to the field table for more info. |
Field
Label | Type | Description |
---|---|---|
name | string (required) | The name of the field to be computed. Prefix with “key.” or “value.” to compute the fields in the key or value parts of the message. Example: name: “value.first-name” |
expression | string (required) | Supports the Expression language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field (do not include mustache brackets, the agent will fill the value correctly). |
type | string (required) | The type of the computed field. this will translate to the schema type of the new field in the transformed message. See type reference below. |
optional | boolean (optional) | If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression. The default value is “true” |
Field type
Label | Type | Description |
---|---|---|
BOOLEAN | true or false | expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'" |
DATE | a date without a time-zone in the RFC3339 format | expression1: "2021-12-03" |
DATETIME | a date-time with an offset from UTC in the RFC3339 format | expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()" |
DOUBLE | represents 64-bit floating point. | expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1" |
FLOAT | represents 32-bit floating point. | expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1" |
INT32 | represents 32-bit integer. | expression1: "2147483647", expression2: "1 + 1" |
INT64 | represents 64-bit integer. | expression1: "9223372036854775807", expression2: "1 + 1" |
TIME | a time without a time-zone in the RFC3339 format | expression1: "20:15:45" |
Expression Language
To support Conditional steps and the Compute Transform, an expression language is required to evaluate the conditional step when
or the compute step expression
. The syntax is EL, which uses the dot notation to access field properties or map keys. It supports the following operators and functions:
Operators
The Expression Language supports the following operators:
Arithmetic: +, - (binary), *, / and div, % and mod, - (unary)
Logical: and, &&, or, ||, not, !
Relational: ==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le.
Functions
Utility methods available under the fn
namespace. For example, to get the current timestamp, use fn:now()
. The Expression Language supports the following functions:\
Name (field) | Description |
concat(input1, input2) | Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty |
concat3(input1, input2, input3) | Returns a string concatenation of input1, input2 and input3. If either input is null, it is treated as an empty |
contains(input, value) | Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false. Do not use this function on lists |
coalesce(value, valueIfNull) | Returns value if it is not null, otherwise returns valueIfNull. |
dateadd(input, delta, unit) | Performs date/time arithmetic operations on the input date/time.
|
decimalFromNumber(input) | Converts
|
decimalFromUnscaled(input, scale) | Converts
|
filter(collection, expression) | Returns a new collection containing only the elements of For all methods, if a parameter is not in the right type, a conversion will be done using the rules described in Type conversions. For instance, you can do |
fromJson | Parse input as JSON. |
lowercase(input) | Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
now() | Returns the current epoch millis. |
replace(input,regex,replacement) | Replaces each substring of |
str(input) | Converts |
toString(input) | Converts |
toDouble(input) | Converts the input value to a DOUBLE number, If the input is |
toInt(input) | Converts the input value to an INTEGER number, If the input is |
toListOfFloat(input) | Converts the input value to a list of FLOAT numbers (embeddings vector), If the input is |
emptyList() | Returns a new empty list |
listAdd(list, item) | Returns a new list that contains the contents of a given list plus an item. The input list must be a list and not |
emptyMap() | Returns a new empty map |
mapToListOfStructs(map, fields) | Converts a map to a list of one element that contains only a selection of fields from the map. For instance if your map is |
listToListOfStructs(list, field) | Converts a list of items to a list of maps with one element named after |
uppercase(input) | Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
toJson | Converts |
trim(input) | Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion. |
unpack(input, fieldsList) | Returns a map containing the elements of |
split(input, separatorExpression) | Split the input to a list of strings, this is internally using the String.split() function. An empty input corresponds to an empty list. The input is convered to a String using the str() function. |
timestampAdd(input, delta, unit) | Returns a timestamp formed by adding |
Conditional Steps
Each step
accepts an optional when
configuration that is evaluated at step execution time against current records (the current step in the transform pipeline).
The when
condition supports the expression language syntax, which provides access to the record attributes as follows:
Name (field) | Description |
key: | the key portion of the record in a KeyValue schema. |
value: | the value portion of the record in a KeyValue schema, or the message payload itself. |
messageKey: | the optional key messages are tagged with (aka. Partition Key). |
topicName: | the optional name of the topic which the record originated from (aka. Input Topic). |
destinationTopic: | the name of the topic on which the transformed record will be sent (aka. Output Topic). |
eventTime: | the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message. |
properties: | the optional user-defined properties attached to record. |
You can use the .
operator to access top level or nested properties on a schema-full key or value.
For example, key.keyField1
or value.valueFiled1.nestedValueField
.
Configuration
Checkout the full configuration properties in the API Reference page.
Last updated