Python processor

Along with the pre-made agents, you can provide your own processor agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory create a directory named “python”.

Within that directory place the .py file with the class that will be the entry point.

The directory will look something like this:

|- Project directory
    |- application
        |- python
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python processor agents, see the Agent Developer Guide.


Example python class located at ./application/python/

from langstream import SimpleRecord

# Example Python processor that adds an exclamation mark to the end of the record value
class Exclamation(Processor):
  def process(self, record):
      return [SimpleRecord(record.value() + "!!")]

Configure the agent to use the python class:

- name: "Process using Python"
  type: "python-function"
  input: "input-topic" # optional
  output: "output-topic" # optional
    className: example.Exclamation

The python application can optionally take in parameters from the application environment. The following is an example python application that is given a “config” object in its init method.

from langstream import SimpleRecord
import openai
import json
from openai.embeddings_utils import get_embedding

class Embedding(object):

  def init(self, config):
    print('init', config)
    openai.api_key = config["openaiKey"]

  def process(self, record):
    embedding = get_embedding(record.value(), engine='text-embedding-ada-002')
    result = {"input": str(record.value()), "embedding": embedding}
    new_value = json.dumps(result)
    return [SimpleRecord(value=new_value)]

The config object is a map that is built from the agent's pipeline.yaml:

- name: "OpenAI Embeddings"
  type: "python-function"
  input: "input-topic" # optional
  output: "output-topic" # optional
    className: embeddings.Embedding
    openaiKey: "${ }"



