webcrawler-source

The webcrawler-source agent crawls a website and outputs the site's URL and an HTML document. Crawling a website is an ideal first step in a text embeddings pipeline.

This agent keeps the status of the crawling in a persistent storage. Storage won’t contain a copy of the crawl data, but a single JSON file with a name computed from the name of the agent and the id of the LangStream application.

By default, it requires an S3-compatible bucket that must be defined using bucketName, endpoint, access-key, secret-key and region properties. Another solution is to store the status in a persistent disk provided by LangStream. This can be achieved by setting state-storage: disk.

Example

Example webcrawler agent in a pipeline:

pipeline:
  - name: "Crawl the WebSite"
    type: "webcrawler-source"
    configuration:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

Multiple URLs in pipeline

Multiple seed-urls and allowed-domains are allowed.

To add them to your pipeline, use this syntax:

seed-urls:
  - "http://example1.com"
  - "http://example2.com"
allowed-domains:
  - "http://example1.com"
  - "http://example2.com"  

Topics

Input

  • None, it’s a source

Output

  • Structured text (JSON) ?

  • Implicit topic ?

Configuration

Checkout the full configuration properties in the API Reference page.

Webcrawler Status

LabelTypeDescription

pendingUrls

String

Holds the URLs that have been discovered but are yet to be processed.

remainingUrls

String

Holds the URLs that have been discovered but are yet to be processed.

visitedUrls

String

Holds all URLs that have been visited to prevent cyclic crawling.

Example webcrawler workflow

Using the webcrawler source agent as a starting point, this workflow will crawl a website, get a page's raw HTML, and process that information into chunks for embedding in a vector database. The complete example code is available in the LangStream repository.

  1. Topics necessary for the application are declared: we will later pass the chunked embeddings to a vector database via the Kafka "chunks-topic".

name: "Crawl a website"
topics:
  - name: "chunks-topic"
    creation-mode: create-if-not-exists
  1. The webcrawler source agent configuration is declared. For help finding your credential information, see Secrets. For help configuring the webcrawler-source, see Configuration.

pipeline:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

The webcrawler itself uses the Jsoup library to parse HTML with the WHATWG HTML spec. The webcrawler explores the web starting from a list of seed URLs and follows links within pages to discover more content.

For each seed-url, the webcrawler:

  • Sets up a connection using Jsoup.

  • Catches HTTP status errors and handles retries or skips based on the error code (HttpStatusException for HTTP errors and UnsupportedMimeTypeException for non-HTML content types)

  • If the content is HTML, the webcrawler fetches the links (hrefs) from the page and adds them to the list of URLs to be crawled, if the href passes the allowed-domain rule set in the pipeline configuration.

  • Gets the page's HTML and creates a document with the site url as a header and raw HTML as output.

The webcrawler then passes the document on to the next agent.

  1. The text-extractor agent extracts metadata and text from records using Apache Tika.

  - name: "Extract text"
    type: "text-extractor"
  1. The text-normaliser agent forces the text into lower case and removes leading and trailing spaces.

  - name: "Normalise text"
    type: "text-normaliser"
    configuration:
      make-lowercase: true
      trim-spaces: true
  1. The language detector agent identifies a record’s language. In this case, non-English records are skipped, and English records continue to the next step in the pipeline.

  - name: "Detect language"
    type: "language-detector"
    configuration:
       allowedLanguages: ["en"]
       property: "language"
  1. The text-splitter agent splits the document into chunks of text. This is an important part of the vectorization pipeline to understand, because it requires balancing between performance and accuracy. chunk_size controls the maximum number of characters of the chunked documents, and chunk_overlap controls the amount of overlap between chunks. A little overlap keeps results more consistent. chunk_size defaults to 1000 characters, and chunk_overlap defaults to 200 characters.

  - name: "Split into chunks"
    type: "text-splitter"
    configuration:
      splitter_type: "RecursiveCharacterTextSplitter"
      chunk_size: 400
      separators: ["\n\n", "\n", " ", ""]
      keep_separator: false
      chunk_overlap: 100
      length_function: "cl100k_base"
  1. The document-to-json agent converts the unstructured data to structured JSON.

  - name: "Convert to structured data"
    type: "document-to-json"
    configuration:
        text-field: text
        copy-properties: true
  1. The compute agent structures the JSON output into values the final compute step can work with.

  - name: "prepare-structure"
    type: "compute"
    configuration:
      fields:
         - name: "value.filename"
           expression: "properties.name"
           type: STRING
         - name: "value.chunk_id"
           expression: "properties.chunk_id"
           type: STRING
         - name: "value.language"
           expression: "properties.language"
           type: STRING
         - name: "value.chunk_num_tokens"
           expression: "properties.chunk_num_tokens"
           type: STRING
  1. Now that the text is processed and structured, the compute-ai-embeddings agent computes embeddings and sends them to the Kafka "chunks-topic".

  - name: "compute-embeddings"
    id: "step1"
    type: "compute-ai-embeddings"
    output: "chunks-topic"
    configuration:
      model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model
      embeddings-field: "value.embeddings_vector"
      text: "{{ value.text }}"
      batch-size: 10
      flush-interval: 500

10. Where to next? If you've got an Astra vector database, use the vector-db-sink agent to sink the vectorized embeddings via the Kafka "chunks-topic" to your database. From there, you can query your vector data, or ask questions with a chatbot. It's up to you!

  - name: "Write to Astra"
    type: "vector-db-sink"
    input: "chunks-topic"
    resources:
      size: 2
    configuration:
      datasource: "AstraDatasource"
      table-name: "documents"
      keyspace: "documents"
      mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"

Last updated