Tasks¶
A Task is the smallest reusable unit of investigation in Analysi. Concretely it is a saved Cy script plus the metadata needed to call it, schedule it, validate it, and embed it in a Workflow.
Source: models/task.py:27.
Anatomy¶
The tasks table stores the script and configuration; identity and lifecycle live on the parent Component row (Task.component_id is the FK to components.id, which is also the ID workflows use to reference the Task).
| Column | Purpose | Notes |
|---|---|---|
script |
The Cy source code | Optional — Tasks of function = "summarization"/"reasoning" etc. may delegate to a built-in handler instead of authoring Cy |
directive |
Free-text instruction passed to LLM-style functions | |
function |
Categorization of what the Task does | One of summarization, data_conversion, extraction, reasoning, planning, visualization, search (constants.py:241) |
scope |
Where the Task fits in the alert pipeline | input / processing / output (constants.py:253) — default processing |
mode |
saved (callable by FQN/ID, reusable) or ad_hoc (one-shot, no parent Task row) |
constants.py:261 |
data_samples |
JSONB list of example inputs | Used by validation and type-propagation tooling |
llm_config |
LLM provider/model/parameters as JSONB | Consumed by llm_run and friends inside the script |
schedule |
Optional cron expression | Picked up by the scheduler and dispatched to the Alerts Worker |
integration_id |
Optional link to a specific Integration | Used when the Task is bound to one connector |
origin_type |
system / user / pack |
Where the Task came from — pack rows arrive via analysi packs install |
managed_resource_key |
Marker for system-managed Tasks | E.g. alert_ingestion, health_check (constants.py:43) |
A Task's IO contract is expressed by the schemas on the Component row plus its data_samples; type checking and propagation use those when the Task is dropped into a workflow (services/type_propagation/propagator.py).
How a Task runs¶
Every execution — whether scheduled, ad-hoc, or as part of a workflow — produces a TaskRun row (models/task_run.py:23). The execution is enqueued as the ARQ job analysi.jobs.task_run_job.execute_task_run and consumed by the Alerts Worker (alert_analysis/worker.py:67). The worker resolves the script (from the saved Task or, for ad-hoc runs, from TaskRun.cy_script), constructs a Cy interpreter (see Cy in Analysi), and runs it.
flowchart LR
classDef src fill:#fef3c7,stroke:#b45309,color:#78350f
classDef svc fill:#86efac,stroke:#15803d,color:#052e16
classDef data fill:#dbeafe,stroke:#1d4ed8,color:#1e3a8a
Sched["Scheduler
(cron schedules table)"]:::src
API["API
/tasks/{id}/execute
/tasks/execute (ad-hoc)"]:::src
WF["Workflow Executor
task node"]:::src
Q["ARQ queue
execute_task_run"]:::svc
AW["Alerts Worker
TaskExecutionService"]:::svc
TR[/"TaskRun row
status: pending → running →
completed | failed | paused"/]:::data
Sched --> Q
API --> Q
WF --> Q
Q --> AW --> TR
Three callers, one execution path¶
- Scheduled. A
Schedulerow withtarget_type=taskandnext_run_atis polled by the integrations worker; when due, aJobRunis created and the task_run job is enqueued. (Terminology — Schedule.) - Ad-hoc. API or MCP clients can POST a Cy script (and optional
input_data) to the execution endpoint. The TaskRun row carries the script incy_scriptand hastask_id = NULL. The MCPexecute_cy_script_adhoctool (mcp/tools/cy_tools.py:405) is the same path with MCP auth. - Workflow node. A Task node in a workflow creates its TaskRun via
TaskRunService.create_execution, withworkflow_run_idandworkflow_node_instance_idpopulated (services/workflow_execution.py:294). The Task script'sinputvariable is the upstream envelope'sresultfield — see Workflows.
TaskRun lifecycle¶
Statuses are defined by TaskConstants.Status (constants.py:27):
| Status | Meaning |
|---|---|
pending |
Created but not yet picked up by a worker |
running |
Worker has the run; default state once execution begins (model default is running — task_run.py:48) |
completed |
Script returned successfully |
failed |
Script raised, or output validation failed |
paused |
Cy interpreter hit a hi_latency tool — see Human-in-the-loop |
When a TaskRun pauses, the pause propagates to any parent WorkflowNodeInstance and AlertAnalysis (chat/skills/hitl.md). Resume is driven by the human:responded control event and replays completed steps from a memoized checkpoint.
Storage of inputs and outputs¶
TaskRun stores inputs and outputs out-of-band so the row stays small and partitioning works:
input_type/input_location:inline(small payloads stored directly ininput_location),s3(object key in MinIO), orfileoutput_type/output_location: same convention- The same scheme is used by
WorkflowRunandWorkflowNodeInstance(models/workflow_execution.py)
The task_runs table is partitioned by created_at (daily); default retention is 90 days, configurable via Helm.
Where to go next¶
- Author a Task: see the Cy language tutorial and Cy in Analysi for the registered tool surface.
- Compose Tasks into a workflow: Workflows.
- Field reference: Terminology — Investigation primitives.