compute
The compute agent computes field values based on expressions evaluated at runtime. If the field already exists, it will be overwritten.
Example
Given the input:
With an agent configuration of:
The output would be:
Topics
Input
Output
Configuration
fields
object[] (required)
An array of objects describing how to calculate the field values. Refer to the field table for more info.
Field
name
string (required)
The name of the field to be computed. Prefix with “key.” or “value.” to compute the fields in the key or value parts of the message.
Example:
name: “value.first-name”
expression
string (required)
Supports the Expression language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field (do not include mustache brackets, the agent will fill the value correctly).
type
string (required)
The type of the computed field. this will translate to the schema type of the new field in the transformed message. See type reference below.
optional
boolean (optional)
If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.
The default value is “true”
Field type
BOOLEAN
true or false
expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'"
DATETIME
a date-time with an offset from UTC in the RFC3339 format
expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()"
DOUBLE
represents 64-bit floating point.
expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1"
FLOAT
represents 32-bit floating point.
expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1"
INT32
represents 32-bit integer.
expression1: "2147483647", expression2: "1 + 1"
INT64
represents 64-bit integer.
expression1: "9223372036854775807", expression2: "1 + 1"
Expression Language
To support Conditional steps and the Compute Transform, an expression language is required to evaluate the conditional step when
or the compute step expression
. The syntax is EL, which uses the dot notation to access field properties or map keys. It supports the following operators and functions:
Operators
The Expression Language supports the following operators:
Arithmetic: +, - (binary), *, / and div, % and mod, - (unary)
Logical: and, &&, or, ||, not, !
Relational: ==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le.
Functions
Utility methods available under the fn
namespace. For example, to get the current timestamp, use fn:now()
. The Expression Language supports the following functions:\
Name (field)
Description
concat(input1, input2)
Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty
concat3(input1, input2, input3)
Returns a string concatenation of input1, input2 and input3. If either input is null, it is treated as an empty
contains(input, value)
Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false. Do not use this function on lists
coalesce(value, valueIfNull)
Returns value if it is not null, otherwise returns valueIfNull.
dateadd(input, delta, unit)
Performs date/time arithmetic operations on the input date/time.
input
can be either epoch millis or an RFC3339 format like "2022-10-14T10:15:30+01:00"
delta
is the amount of unit to add to input. Can be a negative value to perform subtraction. unit
is the unit of time to add or subtract. Can be one of [years, months, days, hours, minutes, seconds, millis]
.
decimalFromNumber(input)
Converts input
to a BigDecimal
.
input
value of the BigDecimal in DOUBLE or FLOAT. If INTEGER or LONG is provided, an unscaled BigDecimal value will be returned.
decimalFromUnscaled(input, scale)
Converts input
to a BigDecimal
with the given scale
.
input
unscaled value of the BigDecimal. Can be any of STRING, INTEGER, LONG or Array of bytes containing the two's-complement representation in big-endian byte order.scale
the scale of theBigDecimal
to create.
filter(collection, expression)
Returns a new collection containing only the elements of collection
for which expression
is true
. The current element is available under the record
variable. An example is fn:filter(value.queryResults, "fn:toDouble(record.similarity) >= 0.5")
For all methods, if a parameter is not in the right type, a conversion will be done using the rules described in Type conversions. For instance, you can do fn:timestampAdd('2022-10-02T01:02:03Z', '42', 'hours'.bytes)
fromJson
Parse input as JSON.
lowercase(input)
Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null.
now()
Returns the current epoch millis.
replace(input,regex,replacement)
Replaces each substring of input
that matches the regex
regular expression with replacement
. See Java's replaceAll.
str(input)
Converts input
to a string.
toString(input)
Converts input
to a string.
toDouble(input)
Converts the input value to a DOUBLE number, If the input is null
, it returns null
.
toInt(input)
Converts the input value to an INTEGER number, If the input is null
, it returns null
.
toListOfFloat(input)
Converts the input value to a list of FLOAT numbers (embeddings vector), If the input is null
, it returns null
. The input must already be a list
emptyList()
Returns a new empty list
listAdd(list, item)
Returns a new list that contains the contents of a given list plus an item. The input list must be a list and not null
(you can use fn:emptyList() to create an empty list)
emptyMap()
Returns a new empty map
mapToListOfStructs(map, fields)
Converts a map to a list of one element that contains only a selection of fields from the map. For instance if your map is {"text":"value","foo":"bar"}
and you call mapToListOfStructs(map, 'text')
the result is a list list [{"text":"value"}]
listToListOfStructs(list, field)
Converts a list of items to a list of maps with one element named after field
For instance if your list is 'this is a question'
and you call listToListOfStructs(list, 'text')
the result is a list list [{"text":"this is a question"}]
. This is very handful when you have to convert a list of texts to a list of structs
uppercase(input)
Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null.
toJson
Converts input
to a JSON string.
trim(input)
Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion.
unpack(input, fieldsList)
Returns a map containing the elements of input
, for each field in the fieldList
you will see an entry in the map. If the input is a string it is converted to a list using the split()
function with the ',
' separator
split(input, separatorExpression)
Split the input to a list of strings, this is internally using the String.split() function. An empty input corresponds to an empty list. The input is convered to a String using the str() function.
timestampAdd(input, delta, unit)
Returns a timestamp formed by adding delta
in unit
to the input
timestamp. input
is a timestamp to add to. delta
is a long
amount of unit
to add to input
. Can be a negative value to perform subtraction. unit
the string unit of time to add or subtract. Can be one of [years
, months
, days
, hours
, minutes
, seconds
, millis
].
Conditional Steps
Each step
accepts an optional when
configuration that is evaluated at step execution time against current records (the current step in the transform pipeline).
The when
condition supports the expression language syntax, which provides access to the record attributes as follows:
Name (field)
Description
key:
the key portion of the record in a KeyValue schema.
value:
the value portion of the record in a KeyValue schema, or the message payload itself.
messageKey:
the optional key messages are tagged with (aka. Partition Key).
topicName:
the optional name of the topic which the record originated from (aka. Input Topic).
destinationTopic:
the name of the topic on which the transformed record will be sent (aka. Output Topic).
eventTime:
the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message.
properties:
the optional user-defined properties attached to record.
You can use the .
operator to access top level or nested properties on a schema-full key or value.
For example, key.keyField1
or value.valueFiled1.nestedValueField
.
Configuration
Checkout the full configuration properties in the API Reference page.
Last updated