Wave 47 — Dim T (predictive service) migration PR — STATE¶
- Date: 2026-05-12 (Wave 47 Phase 2 永遠ループ tick#6)
- Dim: T — predictive service (pull -> push) per
feedback_predictive_service_design - Branch:
feat/jpcite_2026_05_12_wave47_dim_t_migration - Worktree:
/tmp/jpcite-w47-dim-t-mig(lane claim:/tmp/jpcite-w47-dim-t-mig.lane) - Base:
origin/main@6141128b3 - PR: filled at push time
Purpose¶
Storage substrate + daily ETL for the Dim T "predictive service" surface
— the pull-to-push transition required by feedback_predictive_service_design.
Three watch sources are unified into a single predictive subscription table + a single per-fire audit log:
houjin— watch a 法人番号; predictive fires fromam_amendment_diffrows whoseentity_idmatches.program— watch aprograms.unified_id; predictive fires when anam_amendment_diffrow lands AND the program deadline window is still actionable.amendment— watch alaws.law_id; predictive fires from anyam_amendment_diffrow whoseentity_idis a sub-article under the watched law.
24h notification window (configurable 1..168h per subscription).
Stale pending alerts are flipped to expired by a TTL purge in the
same daily ETL pass.
LLM-0 by construction (per feedback_no_operator_llm_api +
feedback_predictive_service_design): we only RANK + ROUTE rows that
already exist in am_amendment_diff. No Anthropic / OpenAI SDK is
imported anywhere in the new code. Natural-language summarisation is
done by the customer's OWN agent on their side. Tests guard the
invariant (test_no_llm_token_in_predictive_etl,
test_no_llm_import_in_migration).
Files (3 new + 2 manifest edits)¶
| Path | LOC | Role |
|---|---|---|
scripts/migrations/280_predictive_service.sql |
122 | schema (subscription + alert log + view) |
scripts/migrations/280_predictive_service_rollback.sql |
24 | rollback (drops only Dim T surface) |
scripts/etl/build_predictive_watch_v2.py |
219 | daily 3-type scan + 24h TTL purge |
tests/test_dim_t_predictive.py |
359 | 18 cases (mig + ETL + 3 watch type + TTL + dedup + LLM-0 guard) |
scripts/migrations/jpcite_boot_manifest.txt |
+20 | register 280 |
scripts/migrations/autonomath_boot_manifest.txt |
+20 | register 280 mirror |
Schema (migration 280)¶
am_predictive_watch_subscription(PK=watch_idINTEGER AUTOINCREMENT)subscriber_token_hashsha256 hex (CHECK length=64; raw token never stored)watch_typeENUM CHECK IN ('houjin','program','amendment')watch_targetopaque per type (CHECK length 1..128)thresholdREAL DEFAULT 0.0 (CHECK >= 0.0; 0.0 = always fire)notify_window_hoursINTEGER DEFAULT 24 (CHECK 1..168; predictive push window)statusENUM CHECK IN ('active','paused','cancelled')created_at,updated_at,last_fired_at(NULL until first fire)- Indexes:
idx_am_predictive_watch_targeton (watch_type, watch_target, status) — ETL daily scan hot pathidx_am_predictive_watch_subscriberon (subscriber_token_hash, status) — "list my subscriptions"uq_am_predictive_watch_activeUNIQUE on (subscriber_token_hash, watch_type, watch_target) WHERE status='active' — dedup
am_predictive_alert_log(PK=alert_idINTEGER AUTOINCREMENT)watch_idFK ->am_predictive_watch_subscription(watch_id)fired_at(NOT NULL, defaultnow)source_diff_id(nullable — for program-window-only fires)payloadJSON (CHECK length 2..65536)delivery_statusENUM CHECK IN ('pending','delivered','failed','expired') — onlydeliveredbills ¥3/reqdelivered_at(CHECK >=fired_at)- Indexes:
idx_am_predictive_alert_watchon (watch_id, fired_at DESC)idx_am_predictive_alert_statuson (delivery_status, fired_at)idx_am_predictive_alert_pending_agepartial on (fired_at) WHERE delivery_status='pending' — TTL scan hot pathuq_am_predictive_alert_deduppartial UNIQUE on (watch_id, source_diff_id) WHERE source_diff_id IS NOT NULL — prevents double-fire
v_predictive_watch_activehelper view (status='active' only, ordered by watch_type+target — driven by ETL collector)
ETL contract (build_predictive_watch_v2.py)¶
Single pass evaluates all 3 watch types via a JOIN against
am_amendment_diff (gracefully no-op when the upstream table is
absent). For each candidate (watch_id, source_diff_id) tuple NOT
already in the alert log, INSERT one row with delivery_status='pending'
and a structural JSON payload (watch_type, watch_target,
source_diff_id, detected_at). Side-pass flips stale pending rows
older than notify_window_hours to expired.
--dry-runplans only (returns counts, writes nothing).--since-hours Nlets the cron backfill (default 24h).- Final stdout line is JSON
{"dim":"T","wave":47,"dry_run":bool,"queued":int,"expired":int,"by_type":{...}}.
Tests (18 cases, all green)¶
tests/test_dim_t_predictive.py::test_mig_280_applies_clean PASSED
tests/test_dim_t_predictive.py::test_mig_280_is_idempotent PASSED
tests/test_dim_t_predictive.py::test_mig_280_rollback_drops_all PASSED
tests/test_dim_t_predictive.py::test_check_watch_type_enum PASSED
tests/test_dim_t_predictive.py::test_check_token_hash_length PASSED
tests/test_dim_t_predictive.py::test_check_threshold_non_negative PASSED
tests/test_dim_t_predictive.py::test_check_notify_window_range PASSED
tests/test_dim_t_predictive.py::test_check_delivered_at_after_fired_at PASSED
tests/test_dim_t_predictive.py::test_three_watch_types_fire_via_etl PASSED
tests/test_dim_t_predictive.py::test_etl_dry_run_writes_nothing PASSED
tests/test_dim_t_predictive.py::test_ttl_purge_marks_stale_pending_expired PASSED
tests/test_dim_t_predictive.py::test_dedup_prevents_double_fire PASSED
tests/test_dim_t_predictive.py::test_unique_active_partial_index PASSED
tests/test_dim_t_predictive.py::test_manifest_jpcite_lists_280 PASSED
tests/test_dim_t_predictive.py::test_manifest_autonomath_lists_280 PASSED
tests/test_dim_t_predictive.py::test_no_llm_token_in_predictive_etl PASSED
tests/test_dim_t_predictive.py::test_no_llm_import_in_migration PASSED
tests/test_dim_t_predictive.py::test_no_legacy_brand_in_new_files PASSED
========================== 18 passed in 1.53s ==========================
Coverage by case bundle:
- Migration applies cleanly + idempotent re-apply.
- Rollback drops every artefact.
- CHECK constraints (watch_type enum, token_hash length=64, threshold>=0, notify_window 1..168, delivered_at>=fired_at, status enum).
- Three watch types (houjin/program/amendment) fire via ETL in one pass.
- Dry-run plans without writing.
- 24h TTL purge flips stale
pending->expired. - Dedup partial unique index prevents double-fire on (watch_id, source_diff_id).
- Unique-active partial index allows re-subscription after cancellation.
- Boot manifest registration (jpcite + autonomath mirror).
- LLM-0 verify (zero
anthropic/openai/google.generativeaitokens in code). - No legacy brand (
税務会計AI/zeimu-kaikei.ai) in new files.
Constraints satisfied¶
- ✅ Migration 280 is pure additive (CREATE TABLE/INDEX/VIEW IF NOT EXISTS); no UPDATE/DELETE of existing rows.
- ✅ Idempotent on every boot (Fly entrypoint.sh §4 safe).
- ✅ Both boot manifests register 280 (jpcite + autonomath mirror).
- ✅ Rollback is dev-only and only drops Dim T surface — customer_watches (mig 088) is untouched.
- ✅ LLM-0 by construction — verified by
test_no_llm_token_in_predictive_etl+test_no_llm_import_in_migration. - ✅ Brand discipline — no legacy
税務会計AI/zeimu-kaikei.aireferences. - ✅ ¥3/req billing posture preserved (only
deliveredrows trigger Stripe usage_record on the dispatcher side; this ETL only enqueues). - ✅ No existing cron / workflow / source file was overwritten (only 2 boot manifests appended).
- ✅ Main worktree untouched (work done in
/tmp/jpcite-w47-dim-t-mig). - ✅ No
rm/mv(banner+index style organisation perfeedback_destruction_free_organization). - ✅ Lane claim atomic via
mkdir /tmp/jpcite-w47-dim-t-mig.lane.
Post-PR¶
- Wire
scripts/cron/dispatch_predictive_alerts.py(runtime layer) — popspendingrows, POSTs to subscriber webhooks, flips todelivered/failed. Out of scope here (this PR is the storage layer + ETL). - Wire REST
GET/POST/DELETE /v1/me/predictive/watches/...(customer-facing CRUD). Out of scope. - Wire MCP tool
predictive.subscribe/predictive.list. Out of scope. - Schedule
build_predictive_watch_v2.pyin.github/workflows/predictive-watch-daily.yml. Out of scope (this PR keeps the daily cron wiring separate so the storage layer can land first).