0) Assumptions inherited from Phase J

  • Auth: prod uses AUTH_MODE=jwks with OAuth2 client-credentials to obtain a Bearer token; dev may use none or static (HS256).
  • Request ID: every call MUST set/propagate X-Request-ID.
  • Idempotency: all POST/PUT MUST set Idempotency-Key (24h TTL), conflict → 409, replay → Idempotency-Status: replayed.
  • API base: in-cluster DNS (e.g., http://portal-api.<ns>.svc.cluster.local:8000).
  • OpenAPI: endpoints and payloads are as defined in the Phase A spec.

1) Topology & Job mapping (use existing CronJob templates)

Create/keep one CronJob per stage, matching current skeleton names:

StageCronJob (existing)Endpoint(s) calledNotes
Extract (View)cj-extract-view.yamlGET /portal/view_common → batch POST /extract/view_commonFilter: JA exists & EN empty; targets: [ai_purpose, help]; mode: upsert_if_changed.
Extract (Field)cj-extract-field.yamlGET /portal/field → batch POST /extract/fieldGroup by model; fields with JA label present & EN missing; mode: upsert_if_changed.
Translatecj-translate-run.yamlPOST /translate/runlimit tunable (default 200); entities [field, view_common].
Packagecj-chroma-package.yamlPOST /chroma/packageDefaults per schema; lang: ja; chunk by limit (default 500).
Upsertcj-chroma-upsert.yamlPOST /chroma/upsertProd: dry_run=false; Dev: dry_run=true allowed; limit tunable (default 1000).

Rationale: keep each concern isolated; rely on schedules with offsets and idempotency for safe chaining.


2) Schedules & offsets (dev/prod defaults; make env-tunable)

  • Dev (low cadence, easy debugging):
    • Extract-View: */30 * * * * at :00
    • Extract-Field: */30 * * * * at :02
    • Translate: */30 * * * * at :05
    • Package: */30 * * * * at :10
    • Upsert: */30 * * * * at :15
  • Prod (faster feedback, still spaced):
    • Extract-View: */15 * * * * at :00
    • Extract-Field: */15 * * * * at :02
    • Translate: */15 * * * * at :04
    • Package: */15 * * * * at :06
    • Upsert: */15 * * * * at :08

Why offsets? Avoid overlap without introducing hard dependencies. If a job overruns, concurrencyPolicy: Forbid prevents double-running.


3) K8s CronJob policy (all jobs)

  • concurrencyPolicy: Forbid
  • backoffLimit: 1 (K8s-level retries); inside the job, also do HTTP retries (see §6).
  • startingDeadlineSeconds: 300
  • successfulJobsHistoryLimit: 1, failedJobsHistoryLimit: 1
  • activeDeadlineSeconds: 10–15 minutes (per stage; see §6 timeouts)
  • Pod resources (baseline): requests: cpu 50m, mem 64Mi; limits: cpu 200m, mem 256Mi.

4) Authentication for Cron

  • Prod: OAuth2 Client Credentials against your IdP.
    • Env: CRON_OAUTH_TOKEN_URL, CRON_CLIENT_ID, CRON_CLIENT_SECRET, CRON_AUDIENCE (and optional CRON_SCOPE).
    • Each job fetches a JWT before calling the API; cache per run.
  • Dev: either no auth (AUTH_MODE=none) or HS256 static token
    • Env: CRON_STATIC_JWT (only for dev if AUTH_MODE=static).

5) Idempotency-Key scheme (deterministic per window)

  • Header: Idempotency-Key: <stage>-<yyyymmddhhmm_window>-<sha256(payload+aud+path)[:16]>
  • Window minutes (env): CRON_WINDOW_MINUTES (default 15).
    Example: extract-view-20251003T1015-4f2a1c0d9e7b8c12
  • Request-ID: generate once per job run, reuse across stage calls: X-Request-ID: <UUIDv4>.

6) HTTP behavior (timeouts & retries inside the job)

  • Each API call: connect timeout 2s, read timeout 45s (upsert may need 90s).
  • Retries: up to 3 attempts with exponential backoff (2s, 4s, 8s); only retry on 5xx/429/timeouts.
  • Treat 409 from Idempotency as success (already done).
  • Log all attempts as JSON (see §9).

7) Candidate selection & batching rules

  • Extract-View:
    • List with GET /portal/view_common?limit=500&cursor=....
    • Select those where help_ja_text exists AND help_en_text is empty OR ai_purpose JA exists and EN empty.
    • Batch action_xmlids into chunks of 100POST /extract/view_common with: {"action_xmlids":[...],"targets":["ai_purpose","help"],"mode":"upsert_if_changed"}
  • Extract-Field:
    • List with GET /portal/field?limit=500&cursor=....
    • Group by model; for each model, pick fields where JA label exists and EN missing.
    • For each model batch of up to 200 fields: {"model":"sale.order","fields":["partner_id","amount_total"],"mode":"upsert_if_changed"}
  • Translate:
    • Single call per run: {"limit": ${TRANSLATE_LIMIT:=200}, "entities":["field","view_common"]}
  • Package:
    • Single call per run: {"entities":["field","view_common"], "lang":"ja", "limit": ${PACKAGE_LIMIT:=500}}
  • Upsert:
    • Single call per run: {"limit": ${UPSERT_LIMIT:=1000}, "dry_run": ${UPSERT_DRY_RUN:=false}}
  • All POSTs carry Idempotency-Key and X-Request-ID.

Note: These selection rules use list endpoints and batch into the required shapes (because /extract/* requires explicit lists). This keeps the pipeline API-driven with no DB coupling inside jobs.


8) Configuration (env) to standardize across all CronJobs

  • API: API_BASE, API_TIMEOUT_CONNECT=2, API_TIMEOUT_READ=45 (upsert may override)
  • Auth (prod): CRON_OAUTH_TOKEN_URL, CRON_CLIENT_ID, CRON_CLIENT_SECRET, CRON_AUDIENCE, CRON_SCOPE
  • Auth (dev): CRON_STATIC_JWT
  • Batching: BATCH_VIEW=100, BATCH_FIELD=200
  • Limits: TRANSLATE_LIMIT=200, PACKAGE_LIMIT=500, UPSERT_LIMIT=1000, UPSERT_DRY_RUN=false
  • Windowing: CRON_WINDOW_MINUTES=15
  • Logging: LOG_LEVEL=INFO

9) Observability & logging (stdout JSON)

  • Each job logs a single summary JSON per step: { "stage":"extract-view", "request_id":"<uuid>", "attempt":1, "idempotency_key":"...", "http_status":200, "metrics":{"picked":123,"inserted":120,"updated":3,"skipped":0,"failed":0}, "duration_ms": 3456 }
  • Include errors[] if the API returns details (e.g., from upsert/package/translate responses).
  • In prod, alert if failed > 0 or the job exits non-zero.

10) Network & security

  • Use in-cluster Service DNS; apply NetworkPolicy to only allow CronJob Pods → API Service.
  • Store OAuth client credentials / static tokens in a dedicated Secret (e.g., secret-cron-auth).
  • Run as non-root; read-only root filesystem if possible.

11) Safety & kill switches

  • To pause the pipeline quickly: kubectl patch cronjob <name> -p '{"spec":{"suspend":true}}'.
  • Dry-run switch for upsert in dev: UPSERT_DRY_RUN=true.
  • If backlog occurs, adjust schedules or increase TRANSLATE_LIMIT/PACKAGE_LIMIT/UPSERT_LIMIT gradually.

12) DoD (acceptance for Phase K)

  • With Cron enabled, items naturally progress queued → upserted without manual intervention.
  • On induced failures (e.g., temporary DB/Chroma outage), K8s + in-job retries handle recovery; no duplicates due to Idempotency-Key.
  • Audit trail present in logs (per step summary with request_id); API side audit from Phase J is visible.
  • No overlapping executions (verified by concurrencyPolicy: Forbid and schedule offsets).

ew files — Runner (API-driven batch)

jobs/cron-runner/Dockerfile

# Minimal Python runner image
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt

COPY main.py /app/main.py

# Default entrypoint; CronJobs pass the stage as an argument
CMD ["python", "/app/main.py"]

jobs/cron-runner/requirements.txt

httpx==0.27.0
pydantic==2.8.2
python-dateutil==2.9.0.post0

jobs/cron-runner/main.py

"""
Universal Cron runner for the API pipeline:
  extract-view -> extract-field -> translate -> package -> upsert

- Auth:
    * Prod: OAuth2 Client Credentials (token URL + client_id/secret [+ audience/scope])
    * Dev: optional static JWT (or no auth)
- Headers: X-Request-ID (single UUID per run), Idempotency-Key (per POST)
- Retries: 3 attempts, exponential backoff on 5xx/429/timeouts
- Timeouts: connect=2s, read=45s (upsert may use 90s)
- Pagination: cursor-based (limit + next_cursor)
- Logging: prints single JSON summary per action to stdout
"""

from __future__ import annotations
import argparse
import asyncio
import base64
import hashlib
import json
import os
import sys
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple

import httpx
from dateutil import tz
from pydantic import BaseModel

UTC = tz.tzutc()

# ---------- Env helpers ----------

def env_str(name: str, default: Optional[str] = None) -> Optional[str]:
    v = os.getenv(name)
    return v if v is not None and v != "" else default

def env_int(name: str, default: int) -> int:
    try:
        return int(os.getenv(name, str(default)))
    except Exception:
        return default

def env_bool(name: str, default: bool) -> bool:
    v = os.getenv(name)
    if v is None:
        return default
    return v.lower() in ("1", "true", "yes", "on")

# ---------- Config ----------

class RunnerConfig(BaseModel):
    api_base: str = env_str("API_BASE", "http://portal-api:8000")
    timeout_connect: float = float(env_int("API_TIMEOUT_CONNECT", 2))
    timeout_read: float = float(env_int("API_TIMEOUT_READ", 45))
    timeout_read_upsert: float = float(env_int("API_TIMEOUT_READ_UPSERT", 90))
    retries: int = env_int("API_RETRIES", 3)

    # batching & limits
    batch_view: int = env_int("BATCH_VIEW", 100)
    batch_field: int = env_int("BATCH_FIELD", 200)
    translate_limit: int = env_int("TRANSLATE_LIMIT", 200)
    package_limit: int = env_int("PACKAGE_LIMIT", 500)
    upsert_limit: int = env_int("UPSERT_LIMIT", 1000)
    upsert_dry_run: bool = env_bool("UPSERT_DRY_RUN", False)

    # idempotency
    window_minutes: int = env_int("CRON_WINDOW_MINUTES", 15)

    # auth
    oauth_token_url: Optional[str] = env_str("CRON_OAUTH_TOKEN_URL")
    client_id: Optional[str] = env_str("CRON_CLIENT_ID")
    client_secret: Optional[str] = env_str("CRON_CLIENT_SECRET")
    audience: Optional[str] = env_str("CRON_AUDIENCE")
    scope: Optional[str] = env_str("CRON_SCOPE")
    static_jwt: Optional[str] = env_str("CRON_STATIC_JWT")

    log_level: str = env_str("LOG_LEVEL", "INFO")

# ---------- Utilities ----------

def chunked(seq: List[Any], size: int) -> Iterable[List[Any]]:
    for i in range(0, len(seq), size):
        yield seq[i : i + size]

def now_epoch_ms() -> int:
    return int(time.time() * 1000)

def floor_window_str(minutes: int) -> str:
    t = time.gmtime()  # UTC
    # floor to nearest window
    floored_min = (t.tm_min // minutes) * minutes
    return time.strftime(f"%Y%m%dT%H{floored_min:02d}", t)

def make_idempotency_key(stage: str, payload: Dict[str, Any], path: str, audience: Optional[str], window_min: int) -> str:
    window = floor_window_str(window_min)
    m = hashlib.sha256()
    m.update(json.dumps(payload, sort_keys=True, ensure_ascii=False, separators=(",", ":")).encode("utf-8"))
    m.update(b"|")
    m.update(path.encode("utf-8"))
    m.update(b"|")
    if audience:
        m.update(audience.encode("utf-8"))
    digest = m.hexdigest()[:16]
    return f"{stage}-{window}-{digest}"

# ---------- HTTP client ----------

@dataclass
class ApiAuth:
    token: Optional[str] = None

class ApiClient:
    def __init__(self, cfg: RunnerConfig, request_id: str):
        self.cfg = cfg
        self.request_id = request_id
        self.auth = ApiAuth()

    async def _ensure_token(self):
        if self.auth.token:
            return
        # OAuth client credentials
        if self.cfg.oauth_token_url and self.cfg.client_id and self.cfg.client_secret:
            data = {"grant_type": "client_credentials"}
            if self.cfg.audience:
                data["audience"] = self.cfg.audience
            if self.cfg.scope:
                data["scope"] = self.cfg.scope
            async with httpx.AsyncClient(timeout=self.cfg.timeout_read) as client:
                r = await client.post(self.cfg.oauth_token_url, data=data, auth=(self.cfg.client_id, self.cfg.client_secret))
                r.raise_for_status()
                tok = r.json().get("access_token")
                if not tok:
                    raise RuntimeError("OAuth token response missing access_token")
                self.auth.token = tok
        elif self.cfg.static_jwt:
            self.auth.token = self.cfg.static_jwt
        else:
            # No auth
            self.auth.token = None

    def _headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]:
        h = {"X-Request-ID": self.request_id, "Accept": "application/json"}
        if self.auth.token:
            h["Authorization"] = f"Bearer {self.auth.token}"
        if idempotency_key:
            h["Idempotency-Key"] = idempotency_key
        return h

    async def _retry(self, func, *args, **kwargs) -> httpx.Response:
        attempt = 0
        last_exc: Optional[Exception] = None
        while attempt < self.cfg.retries:
            try:
                return await func(*args, **kwargs)
            except httpx.HTTPStatusError as e:
                code = e.response.status_code
                # 409 (idempotency conflict) is considered success by caller
                if code in (429, 500, 502, 503, 504):
                    last_exc = e
                else:
                    raise
            except (httpx.TimeoutException, httpx.NetworkError) as e:
                last_exc = e
            attempt += 1
            await asyncio.sleep(2 ** attempt)  # 2s, 4s, 8s
        if last_exc:
            raise last_exc
        raise RuntimeError("request failed without exception")

    async def get_paginated(self, path: str, limit: int = 500, params: Optional[Dict[str, Any]] = None) -> Iterable[Dict[str, Any]]:
        await self._ensure_token()
        url = f"{self.cfg.api_base}{path}"
        cursor = None
        p = dict(params or {})
        p["limit"] = limit
        async with httpx.AsyncClient(timeout=httpx.Timeout(self.cfg.timeout_read, connect=self.cfg.timeout_connect)) as client:
            while True:
                if cursor:
                    p["cursor"] = cursor
                r = await self._retry(
                    client.get, url, headers=self._headers(), params=p
                )
                r.raise_for_status()
                data = r.json()
                items = data.get("items", [])
                for it in items:
                    yield it
                cursor = data.get("next_cursor")
                if not cursor:
                    break

    async def post_json(self, path: str, payload: Dict[str, Any], stage: str, timeout_read: Optional[float] = None) -> httpx.Response:
        await self._ensure_token()
        url = f"{self.cfg.api_base}{path}"
        idem = make_idempotency_key(stage, payload, path, self.cfg.audience, self.cfg.window_minutes)
        read_to = timeout_read if timeout_read is not None else self.cfg.timeout_read
        async with httpx.AsyncClient(timeout=httpx.Timeout(read_to, connect=self.cfg.timeout_connect)) as client:
            r = await self._retry(
                client.post, url, headers=self._headers(idem), json=payload
            )
            # 409 is acceptable (already processed)
            if r.status_code == 409:
                return r
            r.raise_for_status()
            return r

# ---------- Stage logic ----------

def label_has_ja_missing_en(label_i18n: Optional[Dict[str, Any]]) -> bool:
    if not isinstance(label_i18n, dict):
        return False
    ja = label_i18n.get("ja") or label_i18n.get("ja_JP")
    en = label_i18n.get("en") or label_i18n.get("en_US")
    return bool(ja) and not bool(en)

async def run_extract_view(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
    t0 = now_epoch_ms()
    picked = 0
    inserted = 0
    updated = 0
    skipped = 0
    failed = 0
    action_ids: List[str] = []

    # List candidates
    async for item in client.get_paginated("/portal/view_common", limit=500):
        # Conditions:
        # - help_ja_text exists and help_en_text is empty
        # - OR ai_purpose (ja) exists and (en) missing
        help_ja = item.get("help_ja_text")
        help_en = item.get("help_en_text")
        ai_ja = item.get("ai_purpose")
        ai_i18n = item.get("ai_purpose_i18n") or {}
        ai_en = ai_i18n.get("en") or ai_i18n.get("en_US")
        cond = (help_ja and not help_en) or (ai_ja and not ai_en)
        if cond:
            ax = item.get("action_xmlid")
            if ax:
                action_ids.append(ax)

    picked = len(action_ids)
    for chunk in chunked(action_ids, cfg.batch_view):
        payload = {
            "action_xmlids": chunk,
            "targets": ["ai_purpose", "help"],
            "mode": "upsert_if_changed",
        }
        try:
            r = await client.post_json("/extract/view_common", payload, stage="extract-view")
            # Accept both 200 and 409
            if r.status_code == 409:
                continue
            data = r.json()
            inserted += int(data.get("inserted", 0))
            updated += int(data.get("updated", 0))
            skipped += int(data.get("skipped_no_ja", 0)) + int(data.get("skipped_has_en", 0)) + int(data.get("skipped_not_found", 0))
        except Exception:
            failed += 1

    return {
        "stage": "extract-view",
        "picked": picked,
        "inserted": inserted,
        "updated": updated,
        "skipped": skipped,
        "failed": failed,
        "duration_ms": now_epoch_ms() - t0,
    }

async def run_extract_field(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
    t0 = now_epoch_ms()
    picked = 0
    inserted = 0
    updated = 0
    skipped = 0
    failed = 0

    by_model: Dict[str, List[str]] = {}
    async for f in client.get_paginated("/portal/field", limit=500):
        li18n = f.get("label_i18n")
        if label_has_ja_missing_en(li18n):
            model = f.get("model") or ""
            field_name = f.get("field_name")
            if model and field_name:
                by_model.setdefault(model, []).append(field_name)

    for model, fields in by_model.items():
        picked += len(fields)
        for chunk in chunked(fields, cfg.batch_field):
            payload = {"model": model, "fields": chunk, "mode": "upsert_if_changed"}
            try:
                r = await client.post_json("/extract/field", payload, stage="extract-field")
                if r.status_code == 409:
                    continue
                data = r.json()
                inserted += int(data.get("inserted", 0))
                updated += int(data.get("updated", 0))
                skipped += int(data.get("skipped_no_ja", 0)) + int(data.get("skipped_has_en", 0)) + int(data.get("skipped_not_found", 0))
            except Exception:
                failed += 1

    return {
        "stage": "extract-field",
        "picked": picked,
        "inserted": inserted,
        "updated": updated,
        "skipped": skipped,
        "failed": failed,
        "duration_ms": now_epoch_ms() - t0,
    }

async def run_translate(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
    t0 = now_epoch_ms()
    failed = 0
    translated = 0
    picked = 0
    payload = {"limit": cfg.translate_limit, "entities": ["field", "view_common"]}
    try:
        r = await client.post_json("/translate/run", payload, stage="translate")
        if r.status_code != 409:
            data = r.json()
            picked = int(data.get("picked", 0))
            translated = int(data.get("translated", 0))
            failed = int(data.get("failed", 0))
    except Exception:
        failed = 1
    return {
        "stage": "translate",
        "picked": picked,
        "translated": translated,
        "failed": failed,
        "duration_ms": now_epoch_ms() - t0,
    }

async def run_package(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
    t0 = now_epoch_ms()
    queued = 0
    skipped = 0
    failed = 0
    payload = {
        "entities": ["field", "view_common"],
        "lang": "ja",
        "limit": cfg.package_limit,
    }
    try:
        r = await client.post_json("/chroma/package", payload, stage="package")
        if r.status_code != 409:
            data = r.json()
            queued = int(data.get("queued", 0))
            skipped = int(data.get("skipped_no_change", 0))
            failed = int(data.get("failed", 0))
    except Exception:
        failed = 1
    return {
        "stage": "package",
        "queued": queued,
        "skipped": skipped,
        "failed": failed,
        "duration_ms": now_epoch_ms() - t0,
    }

async def run_upsert(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
    t0 = now_epoch_ms()
    processed = 0
    upserted = 0
    skipped = 0
    failed = 0
    payload = {"limit": cfg.upsert_limit, "dry_run": cfg.upsert_dry_run}
    try:
        r = await client.post_json("/chroma/upsert", payload, stage="upsert", timeout_read=cfg.timeout_read_upsert)
        if r.status_code != 409:
            data = r.json()
            processed = int(data.get("processed", 0))
            upserted = int(data.get("upserted", 0))
            skipped = int(data.get("skipped", 0))
            failed = int(data.get("failed", 0))
    except Exception:
        failed = 1
    return {
        "stage": "upsert",
        "processed": processed,
        "upserted": upserted,
        "skipped": skipped,
        "failed": failed,
        "duration_ms": now_epoch_ms() - t0,
    }

# ---------- CLI ----------

async def main_async(stage: str) -> int:
    cfg = RunnerConfig()
    request_id = str(uuid.uuid4())
    client = ApiClient(cfg, request_id)

    if stage == "extract-view":
        res = await run_extract_view(client, cfg)
    elif stage == "extract-field":
        res = await run_extract_field(client, cfg)
    elif stage == "translate":
        res = await run_translate(client, cfg)
    elif stage == "package":
        res = await run_package(client, cfg)
    elif stage == "upsert":
        res = await run_upsert(client, cfg)
    else:
        print(json.dumps({"error": f"unknown stage '{stage}'"}))
        return 2

    # Standard summary record
    res["request_id"] = request_id
    print(json.dumps(res, ensure_ascii=False))
    # Exit non-zero only on hard failures (network/exception); here use failed>0 as soft success (exit 0)
    return 0

def parse_args(argv: List[str]) -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Cron runner for API pipeline")
    p.add_argument("stage", choices=["extract-view", "extract-field", "translate", "package", "upsert"], help="pipeline stage")
    return p.parse_args(argv)

def main():
    args = parse_args(sys.argv[1:])
    code = asyncio.run(main_async(args.stage))
    sys.exit(code)

if __name__ == "__main__":
    main()

New files — Kubernetes (base)

k8s/base/cronjobs/sa-cron-runner.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: cron-runner
  labels:
    app.kubernetes.io/name: cron-runner

Add this ServiceAccount to your base kustomization.yaml under resources:.


Overlay patches — Development

k8s/overlays/dev/patch-cron-schedules.yaml

# Offsets for 30-min cadence
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  schedule: "*/30 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  schedule: "2,32 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  schedule: "5,35 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  schedule: "10,40 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  schedule: "15,45 * * * *"

k8s/overlays/dev/patch-cron-policy.yaml

# Common policy & SA; applied per CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never

k8s/overlays/dev/patch-cron-image.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:dev

k8s/overlays/dev/patch-cron-env.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: API_TIMEOUT_CONNECT
                  value: "2"
                - name: API_TIMEOUT_READ
                  value: "45"
                - name: CRON_WINDOW_MINUTES
                  value: "30"
                - name: BATCH_VIEW
                  value: "100"
                - name: LOG_LEVEL
                  value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: API_TIMEOUT_CONNECT
                  value: "2"
                - name: API_TIMEOUT_READ
                  value: "45"
                - name: CRON_WINDOW_MINUTES
                  value: "30"
                - name: BATCH_FIELD
                  value: "200"
                - name: LOG_LEVEL
                  value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: TRANSLATE_LIMIT
                  value: "200"
                - name: LOG_LEVEL
                  value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: PACKAGE_LIMIT
                  value: "500"
                - name: LOG_LEVEL
                  value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: UPSERT_LIMIT
                  value: "1000"
                - name: UPSERT_DRY_RUN
                  value: "false"
                - name: API_TIMEOUT_READ_UPSERT
                  value: "90"
                - name: LOG_LEVEL
                  value: "INFO"

k8s/overlays/dev/patch-cron-args.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["extract-view"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["extract-field"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["translate"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["package"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["upsert"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }

Overlay patches — Production

k8s/overlays/prod/patch-cron-schedules.yaml

# Offsets for 15-min cadence
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  schedule: "*/15 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  schedule: "2,17,32,47 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  schedule: "4,19,34,49 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  schedule: "6,21,36,51 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  schedule: "8,23,38,53 * * * *"

k8s/overlays/prod/patch-cron-policy.yaml

# Same as dev; included here for clarity/independence
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  concurrencyPolicy: Forbid
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 900
      template:
        spec:
          serviceAccountName: cron-runner
          restartPolicy: Never

k8s/overlays/prod/patch-cron-image.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              image: ghcr.io/your-org/cron-runner:${GIT_SHA}

k8s/overlays/prod/patch-cron-env.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: API_TIMEOUT_CONNECT
                  value: "2"
                - name: API_TIMEOUT_READ
                  value: "45"
                - name: CRON_WINDOW_MINUTES
                  value: "15"
                - name: BATCH_VIEW
                  value: "100"
                - name: LOG_LEVEL
                  value: "INFO"
                # OAuth (from secret)
                - name: CRON_OAUTH_TOKEN_URL
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
                - name: CRON_CLIENT_ID
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
                - name: CRON_CLIENT_SECRET
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
                - name: CRON_AUDIENCE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
                - name: CRON_SCOPE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: API_TIMEOUT_CONNECT
                  value: "2"
                - name: API_TIMEOUT_READ
                  value: "45"
                - name: CRON_WINDOW_MINUTES
                  value: "15"
                - name: BATCH_FIELD
                  value: "200"
                - name: LOG_LEVEL
                  value: "INFO"
                - name: CRON_OAUTH_TOKEN_URL
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
                - name: CRON_CLIENT_ID
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
                - name: CRON_CLIENT_SECRET
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
                - name: CRON_AUDIENCE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
                - name: CRON_SCOPE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: TRANSLATE_LIMIT
                  value: "200"
                - name: LOG_LEVEL
                  value: "INFO"
                - name: CRON_OAUTH_TOKEN_URL
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
                - name: CRON_CLIENT_ID
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
                - name: CRON_CLIENT_SECRET
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
                - name: CRON_AUDIENCE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
                - name: CRON_SCOPE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: PACKAGE_LIMIT
                  value: "500"
                - name: LOG_LEVEL
                  value: "INFO"
                - name: CRON_OAUTH_TOKEN_URL
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
                - name: CRON_CLIENT_ID
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
                - name: CRON_CLIENT_SECRET
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
                - name: CRON_AUDIENCE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
                - name: CRON_SCOPE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              env:
                - name: API_BASE
                  value: "http://portal-api:8000"
                - name: UPSERT_LIMIT
                  value: "1000"
                - name: UPSERT_DRY_RUN
                  value: "false"
                - name: API_TIMEOUT_READ_UPSERT
                  value: "90"
                - name: LOG_LEVEL
                  value: "INFO"
                - name: CRON_OAUTH_TOKEN_URL
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
                - name: CRON_CLIENT_ID
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
                - name: CRON_CLIENT_SECRET
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
                - name: CRON_AUDIENCE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
                - name: CRON_SCOPE
                  valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }

k8s/overlays/prod/patch-cron-args.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-view
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["extract-view"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-extract-field
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["extract-field"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-translate-run
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["translate"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-package
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["package"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cj-chroma-upsert
spec:
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: runner
              command: ["python", "/app/main.py"]
              args: ["upsert"]
              resources:
                requests: { cpu: 50m, memory: 64Mi }
                limits: { cpu: 200m, memory: 256Mi }

k8s/overlays/prod/secret-cron-auth.yaml (if you don’t already have one)

apiVersion: v1
kind: Secret
metadata:
  name: secret-cron-auth
type: Opaque
stringData:
  CRON_OAUTH_TOKEN_URL: "https://your-idp.example.com/oauth/token"
  CRON_CLIENT_ID: "xxxxxxxxxxxxxxxx"
  CRON_CLIENT_SECRET: "xxxxxxxxxxxxxxxx"
  CRON_AUDIENCE: "https://portal-api.example.com"
  CRON_SCOPE: "api.read api.write"

Notes (wiring)

  • Add k8s/base/cronjobs/sa-cron-runner.yaml to base kustomization.yaml resources.
  • Add these patch files to overlays kustomization.yaml (dev/prod respectively).
  • Ensure the base CronJobs are already included as resources (as in your current tree).

1) Test matrix (what we verify)

AreaGoalMethodExpected
Extract-ViewPicks only items with JA present & EN missing (help or ai_purpose)Seed minimal rows, run extract-viewpicked = targeted items; inserted/updated > 0; no duplicates on re-run (Idempotency).
Extract-FieldPicks only fields with JA present & EN missingSeed fields across 1–2 models, run extract-fieldSame as above; fields with EN present are skipped.
TranslateMoves pending → translated, respects limitRun translatetranslated increases up to limit; failures reported.
PackageCreates portal_chroma_doc with queuedRun packagequeued increases; docs have doc_id = natural_key::lang.
UpsertMoves queued → upserted (idempotent)Run upsertprocessedupserted; subsequent run upserts 0 or only new.
IdempotencyNo double effects within windowRe-run same stage within 15–30 min200/409 with Idempotency-Status: replayed (or API returns 409); counters unchanged.
RetriesBackoff on transient errorsInject 500/timeout onceSucceeds after retry (<= 3 attempts), job exits 0, summary logs attempts.
Auth & HeadersPropagate Request-ID, Bearer tokenInspect API logsCorrelated X-Request-ID present; 401/403 if token invalid.

2) Sample data (SQL) — minimal but realistic

Run these in your dev database before tests. Adjust table/column names only if they differ.

-- Models (optional helper for model_table)
INSERT INTO public.portal_model (model, model_table, label_i18n, notes, created_at, updated_at)
VALUES
  ('sale.order', 'sale_order', '{"ja":"受注","en":null}', null, now(), now())
ON CONFLICT DO NOTHING;

-- Fields: one target (EN missing), one control (EN present)
INSERT INTO public.portal_fields (model_id, model, model_table, field_name, ttype, label_i18n, notes, origin, created_at, updated_at)
VALUES
  ( (SELECT id FROM public.portal_model WHERE model='sale.order'),
    'sale.order', 'sale_order', 'partner_id', 'many2one',
    '{"ja":"取引先","en":null}', null, 'odoo', now(), now()
  ),
  ( (SELECT id FROM public.portal_model WHERE model='sale.order'),
    'sale.order', 'sale_order', 'amount_total', 'monetary',
    '{"ja":"合計","en":"Total"}', null, 'odoo', now(), now()
  )
ON CONFLICT DO NOTHING;

-- ViewCommon: one with help_ja present & help_en missing; one control already translated
INSERT INTO public.portal_view_common
  (action_xmlid, action_name, model, model_label, model_table, view_types, primary_view_type,
   help_ja_text, help_en_text, ai_purpose, ai_purpose_i18n, created_at, updated_at)
VALUES
  ('sale.action_orders', 'Orders', 'sale.order', '受注', 'sale_order',
    ARRAY['list','form'], 'list',
    '受注一覧の説明(日本語)', null,
    'この画面の目的(日本語)', '{"en":null}',
    now(), now()
  ),
  ('sale.action_orders_report', 'Order Reports', 'sale.order', '受注', 'sale_order',
    ARRAY['list'], 'list',
    '説明(日本語)', 'Description (EN done)',
    '目的(日本語)', '{"en":"Purpose (done)"}',
    now(), now()
  )
ON CONFLICT DO NOTHING;

-- Translate & portal_chroma_doc are initially empty; the pipeline will populate them.
DELETE FROM public.translate;
DELETE FROM public.portal_chroma_doc;

What’s targeted by the runner:

  • Fields: sale.order:partner_id (JA present, EN missing) → should be extracted.
  • Fields (control): sale.order:amount_total (EN already present) → should be skipped.
  • ViewCommon: sale.action_orders (help_ja present & help_en missing; also ai_purpose JA present, EN missing) → should be extracted.
  • ViewCommon (control): sale.action_orders_report (EN already present for help/purpose) → should be skipped.

3) How to run (local/dev)

3.1 Runner (local)

From jobs/cron-runner/:

# Build runner image locally (optional; or use python directly)
docker build -t cron-runner:dev .

# Or run with local Python (requires API reachable at API_BASE)
pip install -r requirements.txt
export API_BASE="http://localhost:8000"           # or in-cluster URL if running in K8s
export CRON_WINDOW_MINUTES=30
export LOG_LEVEL=INFO

# Dev auth (Phase J: AUTH_MODE=none) or a static JWT if you use AUTH_MODE=static:
# export CRON_STATIC_JWT="eyJhbGciOi..."

python main.py extract-view
python main.py extract-field
python main.py translate
python main.py package
python main.py upsert

Each command prints a single JSON summary (one line). Save them if you want to diff across runs.

3.2 Kubernetes (dev overlay)

To accelerate tests, temporarily change schedules to every minute (offsets still apply) and apply dev overlay:

# (optional) patch schedules for fast test cycles
# cj-extract-view: "*/1 * * * *", cj-extract-field: "*/1 * * * *" (at :02), etc.
kubectl apply -k k8s/overlays/dev

# Observe
kubectl get cronjobs
kubectl get jobs --watch
kubectl logs job/<job-name> -c runner

4) Expected results (based on the sample data)

After running once in order (extract-viewextract-fieldtranslatepackageupsert):

  1. Extract-View (sale.action_orders)
    • picked ≥ 1 (exactly 1 from sample)
    • inserted + updated ≥ 1
    • skipped may be 0
    • failed = 0
    • Translate table now has 1 row for the view_common target (status pending)
  2. Extract-Field (sale.order:partner_id)
    • picked ≥ 1 (exactly 1 from sample; amount_total is skipped)
    • inserted + updated ≥ 1
    • skipped ≥ 0 (the EN-present control should count as skipped, if accounted)
    • failed = 0
    • Translate table now has 2 pending rows total (1 from view_common + 1 from field)
  3. Translate
    • picked ≥ 1 (up to TRANSLATE_LIMIT, default 200)
    • translated = 2 (for our two pending items)
    • failed = 0
    • Translate rows move from pendingtranslated with translated_label/translated_purpose populated (exact content depends on the provider and your configuration)
  4. Package
    • queued ≥ 2 (one doc for field; one for view_common; language = ja)
    • skipped_no_change = 0 on first run
    • failed = 0
    • portal_chroma_doc contains 2 docs with status='queued' and unique doc_id = natural_key::lang
  5. Upsert
    • processed ≥ 2
    • upserted ≥ 2
    • skipped = 0 on first run
    • failed = 0
    • portal_chroma_doc rows move to status='upserted'
  6. Re-run the same stages within the window (e.g., 15–30 minutes)
    • extract-view / extract-field:
      • API may return 409 (idempotency) or small counts with skipped_* due to “no JA”/“EN present”.
      • Runner treats 409 as success; totals do not double.
    • translate: no new pendingtranslated ≈ 0
    • package: no changes → queued small or 0; skipped_no_change may increase
    • upsert: upserted ≈ 0

Side checks

  • API audit logs should show one X-Request-ID per runner execution, and Idempotency-Key on POSTs (409 or replayed behavior indicated by your middleware headers).
  • /status/summary should reflect translate.translated and chroma_doc.upserted counts consistent with the above.

5) Failure-injection & retry scenarios

Case F1 — transient 500 on first POST to /extract/view_common

  • Simulate by temporarily forcing the API to return HTTP 500 once.
  • Runner behavior: retries (2s → 4s → 8s) up to 3 attempts; eventually 200/409.
  • Expected: job exits 0, JSON summary shows failed=0 or 1 (depending on where you count), inserted/updated as normal.

Case F2 — timeout on /chroma/upsert

  • Simulate a slow response; runner uses read timeout 90s for upsert.
  • Expected: if it times out once, it retries; if all retries fail, job still prints summary and exits non-zero only for unhandled errors (default code prints failed > 0 but exits 0; you can change policy if desired).

Case F3 — auth error (401/403)

  • Provide an invalid token / audience.
  • Expected: API returns 401/403; runner does not retry these; job prints JSON with minimal metrics and you’ll see 401/403 in API logs with the same X-Request-ID.

6) Unit tests (runner) — examples with pytest + respx

Add this under jobs/cron-runner/tests/test_runner.py. It mocks HTTP calls and verifies idempotency key & retry logic.

import json
import uuid
import respx
import httpx
import pytest

from jobs.cron_runner.main import RunnerConfig, ApiClient, make_idempotency_key

def test_idempotency_key_deterministic(monkeypatch):
    stage = "extract-view"
    payload = {"action_xmlids": ["sale.action_orders"], "targets": ["help"], "mode":"upsert_if_changed"}
    path = "/extract/view_common"
    audience = "https://portal-api.example.com"

    # Freeze window to ensure stable key
    class T:
        def __enter__(self): return self
        def __exit__(self, *a): return False
    # monkeypatch time.gmtime if needed to a fixed minute

    k1 = make_idempotency_key(stage, payload, path, audience, 15)
    k2 = make_idempotency_key(stage, payload, path, audience, 15)
    assert k1 == k2
    assert stage in k1

@pytest.mark.asyncio
async def test_retry_on_500(monkeypatch):
    cfg = RunnerConfig(api_base="http://api", retries=3)
    client = ApiClient(cfg, request_id=str(uuid.uuid4()))

    with respx.mock(assert_all_called=True) as rsx:
        rsx.post("http://api/extract/view_common").respond(500, json={"error":"boom"})
        rsx.post("http://api/extract/view_common").respond(200, json={"inserted":1,"updated":0,"skipped_no_ja":0,"skipped_has_en":0,"skipped_not_found":0})

        payload = {"action_xmlids":["sale.action_orders"],"targets":["help"],"mode":"upsert_if_changed"}
        r = await client.post_json("/extract/view_common", payload, stage="extract-view")
        assert r.status_code == 200

How to run:

pip install pytest respx
pytest -q jobs/cron-runner/tests

7) Integration tests (against the real dev API)

  1. Seed the sample SQL (Section 2).
  2. Run the stages locally:
export API_BASE="http://localhost:8000"
python jobs/cron-runner/main.py extract-view
python jobs/cron-runner/main.py extract-field
python jobs/cron-runner/main.py translate
python jobs/cron-runner/main.py package
python jobs/cron-runner/main.py upsert
  1. Verify with API or DB:
    • GET /status/summary shows translate.translated >= 2, chroma_doc.upserted >= 2.
    • DB: SELECT status FROM public.portal_chroma_doc; → all upserted.
  2. Re-run the same stages within 15–30 minutes:
    • Expect near-zero deltas; API may reply 409 or replay; no duplicates.

8) K8s end-to-end (dev overlay)

  • Apply dev overlay (or a dedicated test overlay with every-minute schedules).
  • Watch jobs for 2–3 cycles and collect stdout summaries.
  • Confirm no overlaps (thanks to concurrencyPolicy: Forbid), and the pipeline naturally advances queued → upserted.

Final notes

  • Treat the runner summaries as ground truth for job-level SLOs; ingest them into your logging/observability (e.g., Loki/ELK).
  • Keep idempotency windows aligned across jobs (default 15 min) to prevent duplicate effects.
  • If your translation adapter throttles, reduce TRANSLATE_LIMIT and/or slow the cadence.
  • For production, use real OAuth client credentials and confirm the JWKS/issuer/audience checks from Phase J.

Comments

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です