Signals import template

Signals import template

Use this page when you want to import a real source into Zentrik and you need more than a single request. It is meant for engineers, operators, and coding agents who want a solid starting point for batching, retries, polling, and source-specific mapping.

This is the heavier-duty companion to Signals API quickstart. It is a good fit when you want to:

  • import many records
  • control request pacing
  • keep source metadata
  • poll for processing status
  • hand the script to an engineer or coding agent and only swap the source-fetch logic

When to use this

Use this template when your source already exposes records through an API or export and you want to load them into Discovery without waiting for a native connector.

Common examples:

  • onboarding call transcripts from a meeting tool
  • support exports from a ticket system
  • survey rows from a CSV or warehouse extract
  • synthesized research notes from another internal system

If you only need one first-success API call, use Signals API quickstart instead.

How the template works

The template does four jobs:

  1. load a JSON array of records
  2. turn each record into a create payload for POST /external/v1/signals
  3. submit each item and capture the returned publicId and jobId
  4. optionally poll each signal until it reaches processed or failed

It also includes:

  • optional readyz checking before a live run
  • retry handling for transport errors, 429s, and temporary 5xx responses
  • a dry run mode that prints request bodies without sending them
  • optional per-item delay for rate limiting

Input shape

The template accepts a JSON array. Each item is either:

  • external_text for most imports
  • raw_signal when you need explicit source routing or a lower-level payload

Use external_text by default. It lets you send one body of text with a signal type and optional metadata. Use raw_signal only when you need direct control over the source descriptor or data payload.

Production template

This is the production template you can adapt for a real source import.

python186 lines
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any

def _api_base() -> str:
    return os.environ.get("ZENTRIK_API_BASE", "http://localhost:8080/api").rstrip("/")

def _api_key() -> str:
    key = os.environ.get("ZENTRIK_API_KEY", "").strip()
    if not key:
        print("Set ZENTRIK_API_KEY (workspace External API key).", file=sys.stderr)
        sys.exit(1)
    return key

def _wait_budget() -> float:
    raw = os.environ.get("EVAL_API_WAIT_SECONDS", "").strip()
    if not raw:
        return 120.0
    try:
        return max(0.0, float(raw))
    except ValueError:
        return 120.0

def _should_retry_http(code: int, err_body: str, method: str) -> bool:
    if code in {429, 502, 503, 504}:
        return True
    if method == "GET" and code == 404 and "static/index.html" in err_body:
        return True
    return False

def load_env_file(path: Path) -> None:
    if not path.is_file():
        return
    for raw in path.read_text(encoding="utf-8").splitlines():
        line = raw.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, _, val = line.partition("=")
        key = key.strip()
        val = val.strip().strip("'").strip('"')
        if key and key not in os.environ:
            os.environ[key] = val

def request_json(method: str, path: str, body: dict[str, Any] | None = None, *, max_wait_seconds: float | None = None, timeout: int = 120) -> tuple[int, Any]:
    url = f"{_api_base()}{path}"
    data = json.dumps(body).encode() if body is not None else None
    req = urllib.request.Request(
        url,
        data=data,
        method=method,
        headers={
            "Authorization": f"Bearer {_api_key()}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        },
    )
    wait_seconds = _wait_budget() if max_wait_seconds is None else max_wait_seconds
    deadline = time.time() + wait_seconds
    attempt = 0
    while True:
        try:
            with urllib.request.urlopen(req, timeout=timeout) as resp:
                raw = resp.read().decode()
                return resp.status, json.loads(raw) if raw.strip() else None
        except urllib.error.HTTPError as exc:
            err_body = exc.read().decode()
            if time.time() < deadline and _should_retry_http(exc.code, err_body, method):
                attempt += 1
                time.sleep(min(5.0, 1.0 + attempt))
                continue
            try:
                parsed = json.loads(err_body)
            except json.JSONDecodeError:
                parsed = err_body
            return exc.code, parsed
        except (urllib.error.URLError, ConnectionResetError, TimeoutError) as exc:
            if time.time() >= deadline:
                return 599, str(exc)
            attempt += 1
            time.sleep(min(5.0, 1.0 + attempt))

def check_readyz() -> bool:
    code, payload = request_json("GET", "/readyz", max_wait_seconds=30, timeout=15)
    if code != 200:
        print(f"GET /readyz -> {code} {payload}", file=sys.stderr)
        return False
    if not isinstance(payload, dict):
        return False
    checks = payload.get("checks") or {}
    return checks.get("database") == "ok" and checks.get("workspaceQueue") == "ok"

def spec_to_create_body(spec: dict[str, Any]) -> dict[str, Any]:
    kind = str(spec.get("kind") or "").strip()
    if kind == "external_text":
        body: dict[str, Any] = {"text": spec["text"], "signalType": spec["signalType"]}
        for key in ("name", "additionalContext", "productId", "productIds", "accountId", "occurredAt", "externalId"):
            if spec.get(key) is not None:
                body[key] = spec[key]
        return body
    if kind == "raw_signal":
        body = {"source": spec["source"]}
        for key in ("name", "data", "externalId", "occurredAt", "integrationId", "context", "insightIds"):
            if spec.get(key) is not None:
                body[key] = spec[key]
        return body
    raise ValueError(f"Unknown kind {kind!r}; use external_text or raw_signal")

def poll_signal(public_id: str, *, poll_seconds: int, interval: float = 6.0) -> tuple[dict[str, Any], bool]:
    deadline = time.time() + poll_seconds
    latest: dict[str, Any] = {}
    while time.time() < deadline:
        code, res = request_json("GET", f"/external/v1/signals/{public_id}")
        if code == 200 and isinstance(res, dict):
            latest = res
            st = res.get("status")
            if st == "processed":
                return res, True
            if st == "failed":
                return res, False
        time.sleep(interval)
    return latest, False

def run_item(spec: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]:
    idx = spec.get("_index", 0)
    body = spec_to_create_body(spec)
    if args.dry_run:
        print(json.dumps({"index": idx, "would_post": body}, indent=2))
        return {"index": idx, "ok": True, "dry_run": True}

    code, created = request_json("POST", "/external/v1/signals", body)
    public_id = created["publicId"]
    job_id = created["jobId"]
    print(f"[{idx}] created {public_id} jobId={job_id}")

    if args.no_poll:
        return {"index": idx, "ok": True, "publicId": public_id, "jobId": job_id}

    final, success = poll_signal(str(public_id), poll_seconds=args.poll_seconds, interval=args.poll_interval)
    return {"index": idx, "ok": success, "publicId": public_id, "jobId": job_id, "status": final.get("status"), "processing": final.get("processing")}

def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Ingest signals via Zentrik External API.")
    p.add_argument("--input", type=Path, required=True, help="JSON file: array of signal specs.")
    p.add_argument("--env-file", type=Path, default=None, help="Optional .env file to load.")
    p.add_argument("--dry-run", action="store_true", help="Validate and print bodies; no POST.")
    p.add_argument("--skip-readyz", action="store_true", help="Skip GET /readyz.")
    p.add_argument("--no-poll", action="store_true", help="Do not wait for processed or failed.")
    p.add_argument("--poll-seconds", type=int, default=300, help="Max seconds per signal when polling.")
    p.add_argument("--poll-interval", type=float, default=6.0, help="Seconds between GET polls.")
    p.add_argument("--delay-ms", type=int, default=0, help="Pause after each item.")
    return p.parse_args()

def main() -> None:
    args = parse_args()
    if args.env_file:
        load_env_file(args.env_file)

    specs = json.loads(args.input.read_text(encoding="utf-8"))
    for i, spec in enumerate(specs):
        spec["_index"] = i

    if not args.dry_run and not args.skip_readyz and not check_readyz():
        sys.exit(1)

    results = []
    for spec in specs:
        results.append(run_item(spec, args))
        if args.delay_ms > 0:
            time.sleep(args.delay_ms / 1000.0)

    ok_n = sum(1 for r in results if r.get("ok"))
    print(json.dumps({"attempted": len(results), "ok": ok_n, "results": results}, indent=2))
    if ok_n != len(results):
        sys.exit(1)

if __name__ == "__main__":
    main()

Example input

This is a representative input file for the production template.

json27 lines
[
  {
    "kind": "external_text",
    "externalId": "client-demo:typed-note-1",
    "name": "Customer call — export and dark mode",
    "signalType": "transcript",
    "text": "Customer asked for dark mode and CSV export of the roadmap. They said exports are blocking their QBR deck."
  },
  {
    "kind": "raw_signal",
    "externalId": "client-demo:support-ticket-1",
    "name": "Support ticket — SSO provisioning timeout",
    "source": {
      "type": "manual-text",
      "external_data": {
        "signalType": "support_ticket",
        "analysisProfile": "support_ticket",
        "provider": "client_helpdesk_export",
        "dataset": "march_priority_tickets"
      }
    },
    "data": {
      "transcriptText": "Customer reports that SCIM group sync times out after 30 minutes. Admin has to retry three times before users are provisioned.",
      "additionalContext": "Pasted from the client's support system. Treat statements as a support ticket, not a sales transcript."
    }
  }
]

Adapting it to your source

When you adapt this template for a real source:

  1. replace the source-fetch step with calls to your source API
  2. keep the Zentrik payload mapping separate and explicit
  3. start with external_text unless you need lower-level routing
  4. run dry run first to inspect the outgoing payloads
  5. send a small batch before you import the whole dataset

For a source like MeetGeek, the usual change is simple: fetch transcripts from the source API, map each transcript to an external_text item with signalType: transcript, preserve the source identifier in externalId, and keep any source note in additionalContext.

Next steps

Use these pages alongside the template:

Troubleshooting

These items are about workflow and expectations in the product, not a broken OAuth client. If something contradicts what you see in your workspace, note your workspace name and the screen, then contact us.

The dry run looks wrong

Do not send the batch yet. Fix the mapping first and rerun with dry run until the payloads match the source records you expect to import.

The batch is slow or unstable

Reduce batch size, add delay between items, and confirm readyz before each run. Keep the first live run small so you can verify the source mapping before you scale up.