LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Example
  • Topics
  • Configuration
  • Webcrawler Status
  • Example webcrawler workflow
Edit on GitHub
  1. Pipeline Agents
  2. Builtin agents
  3. Input & Output

webcrawler-source

PreviousInput & OutputNexts3-source

Last updated 1 year ago

The webcrawler-source agent crawls a website and outputs the site's URL and an HTML document. Crawling a website is an ideal first step in a .

This agent keeps the status of the crawling in a persistent storage. Storage won’t contain a copy of the crawl data, but a single JSON file with a name computed from the name of the agent and the id of the LangStream application.

By default, it requires an S3-compatible bucket that must be defined using bucketName, endpoint, access-key, secret-key and region properties. Another solution is to store the status in a . This can be achieved by setting state-storage: disk.

Example

Example webcrawler agent in a pipeline:

pipeline:
  - name: "Crawl the WebSite"
    type: "webcrawler-source"
    configuration:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

Multiple URLs in pipeline

Multiple seed-urls and allowed-domains are allowed.

To add them to your pipeline, use this syntax:

seed-urls:
  - "http://example1.com"
  - "http://example2.com"
allowed-domains:
  - "http://example1.com"
  - "http://example2.com"  

Topics

Input

  • None, it’s a source

Output

Configuration

Webcrawler Status

Label
Type
Description

pendingUrls

String

Holds the URLs that have been discovered but are yet to be processed.

remainingUrls

String

Holds the URLs that have been discovered but are yet to be processed.

visitedUrls

String

Holds all URLs that have been visited to prevent cyclic crawling.

Example webcrawler workflow

  1. Topics necessary for the application are declared: we will later pass the chunked embeddings to a vector database via the Kafka "chunks-topic".

name: "Crawl a website"
topics:
  - name: "chunks-topic"
    creation-mode: create-if-not-exists
pipeline:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

For each seed-url, the webcrawler:

  • Sets up a connection using Jsoup.

  • Catches HTTP status errors and handles retries or skips based on the error code (HttpStatusException for HTTP errors and UnsupportedMimeTypeException for non-HTML content types)

  • If the content is HTML, the webcrawler fetches the links (hrefs) from the page and adds them to the list of URLs to be crawled, if the href passes the allowed-domain rule set in the pipeline configuration.

  • Gets the page's HTML and creates a document with the site url as a header and raw HTML as output.

The webcrawler then passes the document on to the next agent.

  - name: "Extract text"
    type: "text-extractor"
  - name: "Normalise text"
    type: "text-normaliser"
    configuration:
      make-lowercase: true
      trim-spaces: true
  - name: "Detect language"
    type: "language-detector"
    configuration:
       allowedLanguages: ["en"]
       property: "language"
  - name: "Split into chunks"
    type: "text-splitter"
    configuration:
      splitter_type: "RecursiveCharacterTextSplitter"
      chunk_size: 400
      separators: ["\n\n", "\n", " ", ""]
      keep_separator: false
      chunk_overlap: 100
      length_function: "cl100k_base"
  - name: "Convert to structured data"
    type: "document-to-json"
    configuration:
        text-field: text
        copy-properties: true
  - name: "prepare-structure"
    type: "compute"
    configuration:
      fields:
         - name: "value.filename"
           expression: "properties.name"
           type: STRING
         - name: "value.chunk_id"
           expression: "properties.chunk_id"
           type: STRING
         - name: "value.language"
           expression: "properties.language"
           type: STRING
         - name: "value.chunk_num_tokens"
           expression: "properties.chunk_num_tokens"
           type: STRING
  - name: "compute-embeddings"
    id: "step1"
    type: "compute-ai-embeddings"
    output: "chunks-topic"
    configuration:
      model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model
      embeddings-field: "value.embeddings_vector"
      text: "{{ value.text }}"
      batch-size: 10
      flush-interval: 500
  - name: "Write to Astra"
    type: "vector-db-sink"
    input: "chunks-topic"
    resources:
      size: 2
    configuration:
      datasource: "AstraDatasource"
      table-name: "documents"
      keyspace: "documents"
      mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"

Structured text (JSON)

Implicit topic

Checkout the full configuration properties in the .

Using the webcrawler source agent as a starting point, this workflow will crawl a website, get a page's raw HTML, and process that information into chunks for embedding in a vector database. The complete example code is available in the

The webcrawler source agent configuration is declared. For help finding your credential information, see For help configuring the webcrawler-source, see

The webcrawler itself uses the library to parse HTML with the . The webcrawler explores the web starting from a list of seed URLs and follows links within pages to discover more content.

The extracts metadata and text from records using Apache Tika.

The forces the text into lower case and removes leading and trailing spaces.

The identifies a record’s language. In this case, non-English records are skipped, and English records continue to the next step in the pipeline.

The splits the document into chunks of text. This is an important part of the vectorization pipeline to understand, because it requires balancing between performance and accuracy. chunk_size controls the maximum number of characters of the chunked documents, and chunk_overlap controls the amount of overlap between chunks. A little overlap keeps results more consistent. chunk_size defaults to 1000 characters, and chunk_overlap defaults to 200 characters.

The agent converts the unstructured data to structured JSON.

The structures the JSON output into values the final compute step can work with.

Now that the text is processed and structured, the agent computes embeddings and sends them to the Kafka "chunks-topic".

10. Where to next? If you've got an , use the agent to sink the vectorized embeddings via the Kafka "chunks-topic" to your database. From there, you can your vector data, or ask questions with a . It's up to you!

text embeddings pipeline
persistent disk provided by LangStream
?
LangStream repository.
Jsoup
WHATWG HTML spec
text-extractor agent
text-normaliser agent
language detector agent
text-splitter agent
document-to-json
compute agent
compute-ai-embeddings
Astra vector database
vector-db-sink
query
chatbot
Secrets.
Configuration.
?
API Reference page