A Ductile Cookbook: Wiring an Email Security Pipeline

ductilepipelinespluginsintegration-gatewayprompt-injectionautomationagentic

A worked example of building a sequential plugin chain in Ductile — the YAML DSL, the plugin contract, and the gotchas that only surface once an event actually flows. The running example is the email-security pipeline I run in front of an agentic AI’s inbox: a sanitiser, a regex prefilter, two prompt-injection classifiers, and a fusion step that decides process or quarantine. The patterns generalise to any pipeline of cooperating plugins. (Companion posts on the security architecture and the scoring logic are in progress.)

What Ductile Gives You

Ductile is a local, YAML-configured integration gateway — a runtime for chaining small plugins into event-driven pipelines. Three concepts are enough to read this piece:

  • Plugin — a self-contained executable that reads a JSON request from stdin and writes a JSON response to stdout. Plugins are spawned per-command. They can be Python, Go, bash, anything that can read stdin.
  • Pipeline — a YAML-defined sequence of steps. Each step references a plugin and runs in order. Steps communicate by emitting events (response payload) and by baggage (durable named context that accumulates as the chain runs).
  • Event — the message that triggers a pipeline. The first step receives the trigger event; each subsequent step receives the previous step’s emit.

That’s it. There’s also a hooks system, a webhook server, a config integrity layer, and a job inspection API, but for a pipeline cookbook the three concepts above are the working set.

The Pipeline DSL

This is the actual email-pipeline-handle pipeline at the time of writing, edited only for clarity:

pipelines:
  - name: email-pipeline-handle
    on: gmail.new_full_message
    steps:
      # input: gmail.new_full_message (from email_pipeline_fetch)
      - id: triage
        uses: email_pipeline_triage
        baggage:
          mail.message_id:        payload.message_id
          mail.thread_id:         payload.thread_id
          mail.raw_message_json:  payload.raw_message_json

      # input: email.triaged (from triage)
      - id: sanitise
        uses: email_pipeline_sanitise
        baggage:
          sender.address:      payload.address
          sender.trust_level:  payload.trust_level

      # input: email.sanitised (from sanitise)
      - id: regex
        uses: email_pipeline_promptarmor
        baggage:
          mail.parts:         payload.parts
          mail.mime_summary:  payload.mime_summary

      # input: email.scored.regex (from regex)
      - id: promptguard
        uses: email_pipeline_promptguard
        baggage:
          scorer.regex.verdict:       payload.verdict
          scorer.regex.max_weight:    payload.max_weight
          scorer.regex.categories:    payload.categories
          scorer.regex.block_reason:  payload.block_reason

      # input: email.scored.promptguard (from promptguard)
      - id: sentinel
        uses: email_pipeline_sentinel
        baggage:
          scorer.promptguard.verdict:   payload.verdict
          scorer.promptguard.score:     payload.score
          scorer.promptguard.decision:  payload.decision
          scorer.promptguard.reason:    payload.reason

      # input: email.scored.sentinel (from sentinel)
      - id: veto
        uses: email_pipeline_veto
        baggage:
          scorer.sentinel.verdict:   payload.verdict
          scorer.sentinel.score:     payload.score
          scorer.sentinel.decision:  payload.decision
          scorer.sentinel.reason:    payload.reason

A few things to notice:

Each step’s baggage: block claims fields from the immediately preceding step’s emit, not from the original trigger. Triage receives gmail.new_full_message and claims fields that exist on it (message_id, thread_id, raw_message_json). Sanitise receives email.triaged (triage’s output) and claims fields that exist on that (address, trust_level). The pattern continues.

The fact namespace is shared across the chain. Once mail.message_id is in baggage from triage, every downstream step can read it via context.mail.message_id. Baggage is durable for the lifetime of the event chain. Inherited paths are immutable — no step further down can rewrite an upstream fact.

Every fact is named explicitly. Each baggage line is one source path mapped to one durable name. (Ductile does have a from:/namespace: bulk-import form; this pipeline doesn’t use it — a namespace: is required for bulk imports anyway, so it’s not much terser.) A reader looking at this pipeline can see exactly what flows into context without chasing producing-plugin internals. It’s verbose; it’s also forever-readable.

The Action Pipeline (with a Gate)

The first pipeline ends with veto emitting email.process_decision. A second pipeline picks up that event and routes only the process ones to the agent handler:

- name: email-pipeline-act
  on: email.process_decision
  if:
    path: payload.decision
    op: eq
    value: process
  steps:
    - id: handle
      uses: email_handler

The if: block is the gate. It’s a structured predicate (path / op / value, with composite all / any / not) evaluated against the event’s payload — plus any upstream durable context — after the trigger name matches, before any step runs. A false result skips the pipeline’s dispatch entirely, so decision != 'process' (i.e. quarantine) means the handler is never spawned for quarantined messages.

The Plugin Contract

Each plugin is a single executable. Here’s a minimum viable scorer plugin in Python:

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11,<3.13"
# ///
"""Minimal scorer plugin shape.

Reads context.mail.parts from baggage, calls a scoring service,
emits email.scored.<name> with verdict and score.
"""
import json, sys, urllib.request

EVENT_TYPE = "email.scored.example"
DEDUPE_PREFIX = "example-scored:msg:"

def cmd_handle(config, context):
    mail = context.get("mail") or {}
    msg_id = mail.get("message_id")
    parts = mail.get("parts") or []
    if not msg_id or not parts:
        return {"status": "error", "error": "missing baggage", "retry": False}

    score = score_parts(parts, config)
    verdict = "block" if score >= 0.5 else "pass"
    return {
        "status": "ok",
        "result": f"scored {msg_id}: {verdict} ({score:.3f})",  # required when status=ok
        "events": [{
            "type": EVENT_TYPE,
            "payload": {
                "message_id": msg_id,
                "verdict": verdict,
                "score": score,
            },
            "dedupe_key": f"{DEDUPE_PREFIX}{msg_id}",
        }],
    }

def cmd_health(config):
    return {"status": "ok", "result": "healthy"}

def main():
    req = json.load(sys.stdin)
    cmd = req.get("command")
    cfg = req.get("config", {})
    ctx = req.get("context", {})
    if cmd == "handle":  out = cmd_handle(cfg, ctx)
    elif cmd == "health": out = cmd_health(cfg)
    else:                 out = {"status": "error", "error": f"unknown command {cmd!r}"}
    json.dump(out, sys.stdout); sys.stdout.write("\n")

if __name__ == "__main__":
    main()

The corresponding manifest:

manifest_spec: ductile.plugin
manifest_version: 1
name: email_pipeline_example
version: 0.1.0
protocol: 2
entrypoint: run.py
description: "Stage X — example scorer plugin."
concurrency_safe: true
commands:
  - name: handle
    type: write
    description: "Score sanitised parts; emits email.scored.example."
    idempotent: true
    retry_safe: true
    values:
      consume:
        - payload.message_id
        - payload.parts
      emit:
        - event: email.scored.example
          values:
            - payload.message_id
            - payload.verdict
            - payload.score
  - name: health
    type: read
    description: "Plugin self-check."
    idempotent: true
    retry_safe: true
config_keys:
  required: []
  optional:
    - endpoint_url
    - request_timeout_s

A few observations on the contract:

The response envelope is {status, result?, error?, retry?, events?, state_updates?, logs?}. status is ok or error. On ok, a short human-readable result string is required — it’s what surfaces in ductile job inspect and the watch UI. On error, error is required and retry is a fact about the failure (false = retrying the same request can’t help), not a policy instruction — core owns the actual retry decision. Exiting 78 (EX_CONFIG) marks a permanent configuration failure regardless of retry.

Plugins read from context and event. context is the durable baggage assembled by upstream baggage: claims; event is the triggering event envelope from the previous step (present only on handle). For data that flowed through several steps, read context; for data that comes straight from the prior step, either works.

Hard-fail (retry: false, or exit 78) on missing baggage. If a scorer can’t find context.mail.parts, retrying won’t conjure it. Hard-fail and let the operator notice. Soft-failing with retries on data-shape errors creates infinite loops.

Deduplication keys are the plugin’s responsibility. Set dedupe_key on each emitted event. Ductile uses it to suppress duplicate downstream dispatch within the dedupe window (currently ~2 hours). Convention: <plugin-short-name>:msg:<message_id>.

The plugin’s version is its contract version. Bump the major component only when downstream consumers must change — not on internal refactors. (manifest_version: 1 and protocol: 2 are ductile’s manifest-schema and wire-protocol versions; you don’t choose those.)

The Workflow

After any config change to ~/.config/ductile/:

ductile config check    # validate YAML, manifest references, integrity
ductile config lock     # re-compute checksums, authorise current state
ductile system reload   # SIGHUP the running gateway

Three things to know:

Config integrity is real, and tiered. Ductile keeps a .checksums file with BLAKE3 hashes of every authorised config and plugin file. Edit a file without re-running config lock and the mismatch is detected on the next load. What happens then depends on the file: a mismatch on a high-security file (tokens.yaml, webhooks.yaml, scopes/*.json) is a hard fail — the gateway refuses to start. A mismatch on an operational file (config.yaml, pipelines.yaml, plugins/*.yaml, …) is a logged warning by default; set strict_mode: true to make those hard-fail too. Either way the drift between “what’s on disk” and “what was authorised” is surfaced — that’s the point.

Scheduled jobs survive reload. Plugins with schedules: keep their cadence — and their next-run time — across reloads. A plugin’s durable checkpoint state (the snapshot it returns in state_updates, recorded append-only as plugin_facts) persists across reloads too.

Job lineage is queryable. Every plugin invocation has a job ID. ductile job inspect <id> --json shows the transition history, the baggage, the config-snapshot summaries (at enqueue and at start), workspace artifacts, and the parent/child job chain. When a pipeline ran but didn’t do what you expected, this is the first place to look.

What I’d Reach For Next

Two patterns this pipeline doesn’t use, but might be worth knowing about for adjacent designs:

Parallel scorers via split:. The scorers in this pipeline run sequentially because sequential is what worked for me first. Conceptually they’re independent — each reads mail.parts from baggage and writes a disjoint scorer.<name>.* namespace. A split: block (a step’s parallel fan-out, as opposed to steps: sequential nesting) would compress the wall-clock from “sum of all scorer latencies” to “max of any scorer latency”. The cost is one architectural decision: how the join merges baggage back from the parallel branches.

Hook pipelines. The job-failure hooks I do use:

- name: job-failure-notify
  on-hook: job.failed
  steps:
    - id: notify
      uses: discord_notify

on-hook: triggers on lifecycle signals — job.completed, job.failed, job.timed_out — rather than plugin emits, and is mutually exclusive with on:. Useful for operational notifications without coupling them to your business pipelines.


Built on Ductile — a small, YAML-configured integration gateway for personal automation. Companion posts on the security architecture behind this pipeline, and on the scoring and fusion logic, are in progress.