Skip to main content

API Source

The api source type polls any HTTP endpoint on a configurable interval, diffs the response against previously stored state in SQLite, and emits only the changes as standard pipeline messages. Any request/response API becomes a real-time event stream — no webhooks, no changes to the upstream system, no event infrastructure required.

Quick Start

Poll an OpenWeatherMap endpoint every 30 seconds and emit changes:
sources:
  - name: weather
    type: api
    topic: weather_london
    api:
      url: "https://api.openweathermap.org/data/2.5/weather?q=London&appid=${OWM_KEY}"
      interval: 30s
      key_path: "id"
Three fields are required: url, interval, and key_path. Everything else has a sensible default.

Config Reference

Source-Level Fields

FieldTypeRequiredDescription
typestringyesMust be "api"
namestringyesUnique name for this source
topicstringyesTopic (table) to write events to

api Block Fields

Core

FieldTypeDefaultDescription
urlstringrequiredFull URL to poll. Supports ${ENV_VAR} expansion.
methodstringGETHTTP method: GET, POST, or PUT.
intervaldurationrequiredHow often to poll (e.g. 30s, 1m, 5m).
key_pathgjson pathrequiredPath within each record to the unique key.
response_pathgjson path""Path to the array of records in the response. Empty means top-level array or single object.
change_detectionstringdiff"diff" (field-level) or "hash" (SHA-256).
detect_deletesboolfalseEmit a deleted event when a record disappears between polls.
timeoutduration30sHTTP request timeout per page.
initial_snapshotbooltrueIf false, suppresses emission on the first poll.
headersmap{}HTTP headers. Values support ${ENV_VAR} expansion.
bodystring""Request body for POST/PUT.
max_consecutive_failuresint5Failed cycles before exponential backoff.

Watermark Block

Controls incremental polling — how the source avoids re-fetching data it has already seen.
FieldTypeDefaultDescription
strategystringnonenone, timestamp, cursor, or etag.
pathgjson path""Path to extract the watermark value.
paramstring""Query parameter name to set on subsequent requests.
formatstringRFC3339Go time layout for timestamp strategy.
initialstring""Seed watermark value for first run.
overlapduration10sSubtract from watermark to handle clock skew.

Pagination Block

FieldTypeDefaultDescription
strategystringnonenone, link_header, cursor, or offset.
paramstring""Query parameter for cursor value.
pathgjson path""Path to extract cursor from response.
has_more_pathgjson path""Boolean path indicating more pages exist.
offset_paramstringoffsetQuery parameter for offset.
limit_paramstringlimitQuery parameter for page size.
limitintPage size (required for offset strategy).
total_pathgjson path""Path to total record count.
max_pagesint100Hard cap on pages per poll cycle.

Watermark Strategies

Full-scan on every poll. Diffs against stored state. Simple and correct for any API.Best for: small result sets, APIs with no filtering support.
watermark:
  strategy: none

Pagination Strategies


Change Detection Modes

diff (default)

Field-level comparison. Emits records annotated with _change metadata:
{
  "number": 42,
  "state": "closed",
  "_change": {
    "type": "updated",
    "fields": ["state"],
    "previous": { "state": "open" },
    "current": { "state": "closed" }
  }
}

hash

SHA-256 hash of the full payload. If the hash changes, the record is emitted. No field-level metadata. Lower CPU overhead for large payloads.

Cookbook Examples

GitHub Pull Requests

Poll open PRs with Link header pagination and ETag caching:
sources:
  - name: github_prs
    type: api
    topic: open_prs
    api:
      url: "https://api.github.com/repos/org/repo/pulls?state=open&per_page=100"
      interval: 5m
      key_path: "number"
      detect_deletes: true
      headers:
        Authorization: "Bearer ${GITHUB_TOKEN}"
      watermark:
        strategy: etag
      pagination:
        strategy: link_header

Stripe Charges

Cursor-based pagination and watermark:
sources:
  - name: stripe_charges
    type: api
    topic: charges
    api:
      url: "https://api.stripe.com/v1/charges?limit=100"
      interval: 10s
      key_path: "id"
      response_path: "data"
      headers:
        Authorization: "Bearer ${STRIPE_SECRET_KEY}"
      watermark:
        strategy: cursor
        path: "data.@last.id"
        param: "starting_after"
      pagination:
        strategy: cursor
        param: "starting_after"
        path: "data.@last.id"
        has_more_path: "has_more"

Jira Issues

Timestamp watermark with offset pagination:
sources:
  - name: jira_issues
    type: api
    topic: jira_issues
    api:
      url: "https://your-org.atlassian.net/rest/api/3/search?jql=project=ENG"
      interval: 2m
      key_path: "id"
      response_path: "issues"
      headers:
        Authorization: "Basic ${JIRA_API_TOKEN}"
      watermark:
        strategy: timestamp
        path: "fields.updated"
        param: "jql"
        format: "2006-01-02T15:04:05.000-0700"
        overlap: 30s
      pagination:
        strategy: offset
        offset_param: "startAt"
        limit_param: "maxResults"
        limit: 50
        total_path: "total"

Operational Guidance

Choosing a Poll Interval

Use caseSuggested interval
Near-real-time tracking10s30s
Operational data (orders, tickets)1m5m
Reference data (users, products)5m30m
Slow-changing config data1h or more

Rate Limiting

LiteJoin handles rate limiting automatically:
  • 429 Too Many Requests — retried with Retry-After header delay
  • X-RateLimit-Remaining: 0 — pauses until reset
  • 5xx errors — retried with exponential backoff (max 3 retries)
  • 4xx errors (except 429) — not retried (configuration problem)

Environment Variables

Header values and URLs support ${ENV_VAR} expansion:
headers:
  Authorization: "Bearer ${STRIPE_SECRET_KEY}"
url: "https://api.example.com/data?api_key=${API_KEY}"

gjson Paths

All path fields use gjson syntax:
PatternExampleWhat it accesses
fieldidTop-level field
a.bdata.idNested field
#.field#.idField from all array elements
data.@last.idLast element’s id in data array