Signals import template
Signals import template
Use this page when you want to import a real source into Zentrik and you need more than a single request. It is meant for engineers, operators, and coding agents who want a solid starting point for batching, retries, polling, and source-specific mapping.
This is the heavier-duty companion to Signals API quickstart. It is a good fit when you want to:
- import many records
- control request pacing
- keep source metadata
- poll for processing status
- hand the script to an engineer or coding agent and only swap the source-fetch logic
When to use this
Use this template when your source already exposes records through an API or export and you want to load them into Discovery without waiting for a native connector.
Common examples:
- onboarding call transcripts from a meeting tool
- support exports from a ticket system
- survey rows from a CSV or warehouse extract
- synthesized research notes from another internal system
If you only need one first-success API call, use Signals API quickstart instead.
How the template works
The template does four jobs:
- load a JSON array of records
- turn each record into a create payload for POST /external/v1/signals
- submit each item and capture the returned publicId and jobId
- optionally poll each signal until it reaches processed or failed
It also includes:
- optional readyz checking before a live run
- retry handling for transport errors, 429s, and temporary 5xx responses
- a dry run mode that prints request bodies without sending them
- optional per-item delay for rate limiting
Input shape
The template accepts a JSON array. Each item is either:
- external_text for most imports
- raw_signal when you need explicit source routing or a lower-level payload
Use external_text by default. It lets you send one body of text with a signal type and optional metadata. Use raw_signal only when you need direct control over the source descriptor or data payload.
Production template
This is the production template you can adapt for a real source import.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
def _api_base() -> str:
return os.environ.get("ZENTRIK_API_BASE", "http://localhost:8080/api").rstrip("/")
def _api_key() -> str:
key = os.environ.get("ZENTRIK_API_KEY", "").strip()
if not key:
print("Set ZENTRIK_API_KEY (workspace External API key).", file=sys.stderr)
sys.exit(1)
return key
def _wait_budget() -> float:
raw = os.environ.get("EVAL_API_WAIT_SECONDS", "").strip()
if not raw:
return 120.0
try:
return max(0.0, float(raw))
except ValueError:
return 120.0
def _should_retry_http(code: int, err_body: str, method: str) -> bool:
if code in {429, 502, 503, 504}:
return True
if method == "GET" and code == 404 and "static/index.html" in err_body:
return True
return False
def load_env_file(path: Path) -> None:
if not path.is_file():
return
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip("'").strip('"')
if key and key not in os.environ:
os.environ[key] = val
def request_json(method: str, path: str, body: dict[str, Any] | None = None, *, max_wait_seconds: float | None = None, timeout: int = 120) -> tuple[int, Any]:
url = f"{_api_base()}{path}"
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"Bearer {_api_key()}",
"Content-Type": "application/json",
"Accept": "application/json",
},
)
wait_seconds = _wait_budget() if max_wait_seconds is None else max_wait_seconds
deadline = time.time() + wait_seconds
attempt = 0
while True:
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode()
return resp.status, json.loads(raw) if raw.strip() else None
except urllib.error.HTTPError as exc:
err_body = exc.read().decode()
if time.time() < deadline and _should_retry_http(exc.code, err_body, method):
attempt += 1
time.sleep(min(5.0, 1.0 + attempt))
continue
try:
parsed = json.loads(err_body)
except json.JSONDecodeError:
parsed = err_body
return exc.code, parsed
except (urllib.error.URLError, ConnectionResetError, TimeoutError) as exc:
if time.time() >= deadline:
return 599, str(exc)
attempt += 1
time.sleep(min(5.0, 1.0 + attempt))
def check_readyz() -> bool:
code, payload = request_json("GET", "/readyz", max_wait_seconds=30, timeout=15)
if code != 200:
print(f"GET /readyz -> {code} {payload}", file=sys.stderr)
return False
if not isinstance(payload, dict):
return False
checks = payload.get("checks") or {}
return checks.get("database") == "ok" and checks.get("workspaceQueue") == "ok"
def spec_to_create_body(spec: dict[str, Any]) -> dict[str, Any]:
kind = str(spec.get("kind") or "").strip()
if kind == "external_text":
body: dict[str, Any] = {"text": spec["text"], "signalType": spec["signalType"]}
for key in ("name", "additionalContext", "productId", "productIds", "accountId", "occurredAt", "externalId"):
if spec.get(key) is not None:
body[key] = spec[key]
return body
if kind == "raw_signal":
body = {"source": spec["source"]}
for key in ("name", "data", "externalId", "occurredAt", "integrationId", "context", "insightIds"):
if spec.get(key) is not None:
body[key] = spec[key]
return body
raise ValueError(f"Unknown kind {kind!r}; use external_text or raw_signal")
def poll_signal(public_id: str, *, poll_seconds: int, interval: float = 6.0) -> tuple[dict[str, Any], bool]:
deadline = time.time() + poll_seconds
latest: dict[str, Any] = {}
while time.time() < deadline:
code, res = request_json("GET", f"/external/v1/signals/{public_id}")
if code == 200 and isinstance(res, dict):
latest = res
st = res.get("status")
if st == "processed":
return res, True
if st == "failed":
return res, False
time.sleep(interval)
return latest, False
def run_item(spec: dict[str, Any], args: argparse.Namespace) -> dict[str, Any]:
idx = spec.get("_index", 0)
body = spec_to_create_body(spec)
if args.dry_run:
print(json.dumps({"index": idx, "would_post": body}, indent=2))
return {"index": idx, "ok": True, "dry_run": True}
code, created = request_json("POST", "/external/v1/signals", body)
public_id = created["publicId"]
job_id = created["jobId"]
print(f"[{idx}] created {public_id} jobId={job_id}")
if args.no_poll:
return {"index": idx, "ok": True, "publicId": public_id, "jobId": job_id}
final, success = poll_signal(str(public_id), poll_seconds=args.poll_seconds, interval=args.poll_interval)
return {"index": idx, "ok": success, "publicId": public_id, "jobId": job_id, "status": final.get("status"), "processing": final.get("processing")}
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Ingest signals via Zentrik External API.")
p.add_argument("--input", type=Path, required=True, help="JSON file: array of signal specs.")
p.add_argument("--env-file", type=Path, default=None, help="Optional .env file to load.")
p.add_argument("--dry-run", action="store_true", help="Validate and print bodies; no POST.")
p.add_argument("--skip-readyz", action="store_true", help="Skip GET /readyz.")
p.add_argument("--no-poll", action="store_true", help="Do not wait for processed or failed.")
p.add_argument("--poll-seconds", type=int, default=300, help="Max seconds per signal when polling.")
p.add_argument("--poll-interval", type=float, default=6.0, help="Seconds between GET polls.")
p.add_argument("--delay-ms", type=int, default=0, help="Pause after each item.")
return p.parse_args()
def main() -> None:
args = parse_args()
if args.env_file:
load_env_file(args.env_file)
specs = json.loads(args.input.read_text(encoding="utf-8"))
for i, spec in enumerate(specs):
spec["_index"] = i
if not args.dry_run and not args.skip_readyz and not check_readyz():
sys.exit(1)
results = []
for spec in specs:
results.append(run_item(spec, args))
if args.delay_ms > 0:
time.sleep(args.delay_ms / 1000.0)
ok_n = sum(1 for r in results if r.get("ok"))
print(json.dumps({"attempted": len(results), "ok": ok_n, "results": results}, indent=2))
if ok_n != len(results):
sys.exit(1)
if __name__ == "__main__":
main()Example input
This is a representative input file for the production template.
[
{
"kind": "external_text",
"externalId": "client-demo:typed-note-1",
"name": "Customer call — export and dark mode",
"signalType": "transcript",
"text": "Customer asked for dark mode and CSV export of the roadmap. They said exports are blocking their QBR deck."
},
{
"kind": "raw_signal",
"externalId": "client-demo:support-ticket-1",
"name": "Support ticket — SSO provisioning timeout",
"source": {
"type": "manual-text",
"external_data": {
"signalType": "support_ticket",
"analysisProfile": "support_ticket",
"provider": "client_helpdesk_export",
"dataset": "march_priority_tickets"
}
},
"data": {
"transcriptText": "Customer reports that SCIM group sync times out after 30 minutes. Admin has to retry three times before users are provisioned.",
"additionalContext": "Pasted from the client's support system. Treat statements as a support ticket, not a sales transcript."
}
}
]Adapting it to your source
When you adapt this template for a real source:
- replace the source-fetch step with calls to your source API
- keep the Zentrik payload mapping separate and explicit
- start with external_text unless you need lower-level routing
- run dry run first to inspect the outgoing payloads
- send a small batch before you import the whole dataset
For a source like MeetGeek, the usual change is simple: fetch transcripts from the source API, map each transcript to an external_text item with signalType: transcript, preserve the source identifier in externalId, and keep any source note in additionalContext.
Next steps
Use these pages alongside the template:
- Signals API quickstart for the first successful single-record run
- Signals API reference for endpoint details
- Import evidence into Discovery for the non-technical product guide
Troubleshooting
These items are about workflow and expectations in the product, not a broken OAuth client. If something contradicts what you see in your workspace, note your workspace name and the screen, then contact us.
The dry run looks wrong
Do not send the batch yet. Fix the mapping first and rerun with dry run until the payloads match the source records you expect to import.
The batch is slow or unstable
Reduce batch size, add delay between items, and confirm readyz before each run. Keep the first live run small so you can verify the source mapping before you scale up.