0) Assumptions inherited from Phase J
- Auth: prod uses
AUTH_MODE=jwkswith OAuth2 client-credentials to obtain a Bearer token; dev may usenoneorstatic(HS256). - Request ID: every call MUST set/propagate
X-Request-ID. - Idempotency: all POST/PUT MUST set
Idempotency-Key(24h TTL), conflict → 409, replay →Idempotency-Status: replayed. - API base: in-cluster DNS (e.g.,
http://portal-api.<ns>.svc.cluster.local:8000). - OpenAPI: endpoints and payloads are as defined in the Phase A spec.
1) Topology & Job mapping (use existing CronJob templates)
Create/keep one CronJob per stage, matching current skeleton names:
| Stage | CronJob (existing) | Endpoint(s) called | Notes |
|---|---|---|---|
| Extract (View) | cj-extract-view.yaml | GET /portal/view_common → batch POST /extract/view_common | Filter: JA exists & EN empty; targets: [ai_purpose, help]; mode: upsert_if_changed. |
| Extract (Field) | cj-extract-field.yaml | GET /portal/field → batch POST /extract/field | Group by model; fields with JA label present & EN missing; mode: upsert_if_changed. |
| Translate | cj-translate-run.yaml | POST /translate/run | limit tunable (default 200); entities [field, view_common]. |
| Package | cj-chroma-package.yaml | POST /chroma/package | Defaults per schema; lang: ja; chunk by limit (default 500). |
| Upsert | cj-chroma-upsert.yaml | POST /chroma/upsert | Prod: dry_run=false; Dev: dry_run=true allowed; limit tunable (default 1000). |
Rationale: keep each concern isolated; rely on schedules with offsets and idempotency for safe chaining.
2) Schedules & offsets (dev/prod defaults; make env-tunable)
- Dev (low cadence, easy debugging):
- Extract-View:
*/30 * * * *at :00 - Extract-Field:
*/30 * * * *at :02 - Translate:
*/30 * * * *at :05 - Package:
*/30 * * * *at :10 - Upsert:
*/30 * * * *at :15
- Extract-View:
- Prod (faster feedback, still spaced):
- Extract-View:
*/15 * * * *at :00 - Extract-Field:
*/15 * * * *at :02 - Translate:
*/15 * * * *at :04 - Package:
*/15 * * * *at :06 - Upsert:
*/15 * * * *at :08
- Extract-View:
Why offsets? Avoid overlap without introducing hard dependencies. If a job overruns, concurrencyPolicy: Forbid prevents double-running.
3) K8s CronJob policy (all jobs)
concurrencyPolicy: ForbidbackoffLimit: 1(K8s-level retries); inside the job, also do HTTP retries (see §6).startingDeadlineSeconds: 300successfulJobsHistoryLimit: 1,failedJobsHistoryLimit: 1activeDeadlineSeconds: 10–15 minutes (per stage; see §6 timeouts)- Pod resources (baseline):
requests: cpu 50m, mem 64Mi;limits: cpu 200m, mem 256Mi.
4) Authentication for Cron
- Prod: OAuth2 Client Credentials against your IdP.
- Env:
CRON_OAUTH_TOKEN_URL,CRON_CLIENT_ID,CRON_CLIENT_SECRET,CRON_AUDIENCE(and optionalCRON_SCOPE). - Each job fetches a JWT before calling the API; cache per run.
- Env:
- Dev: either no auth (
AUTH_MODE=none) or HS256 static token- Env:
CRON_STATIC_JWT(only for dev ifAUTH_MODE=static).
- Env:
5) Idempotency-Key scheme (deterministic per window)
- Header:
Idempotency-Key: <stage>-<yyyymmddhhmm_window>-<sha256(payload+aud+path)[:16]> - Window minutes (env):
CRON_WINDOW_MINUTES(default 15).
Example:extract-view-20251003T1015-4f2a1c0d9e7b8c12 - Request-ID: generate once per job run, reuse across stage calls:
X-Request-ID: <UUIDv4>.
6) HTTP behavior (timeouts & retries inside the job)
- Each API call: connect timeout 2s, read timeout 45s (upsert may need 90s).
- Retries: up to 3 attempts with exponential backoff (2s, 4s, 8s); only retry on 5xx/429/timeouts.
- Treat 409 from Idempotency as success (already done).
- Log all attempts as JSON (see §9).
7) Candidate selection & batching rules
- Extract-View:
- List with
GET /portal/view_common?limit=500&cursor=.... - Select those where
help_ja_textexists ANDhelp_en_textis empty ORai_purposeJA exists and EN empty. - Batch
action_xmlidsinto chunks of 100 →POST /extract/view_commonwith:{"action_xmlids":[...],"targets":["ai_purpose","help"],"mode":"upsert_if_changed"}
- List with
- Extract-Field:
- List with
GET /portal/field?limit=500&cursor=.... - Group by
model; for each model, pick fields where JA label exists and EN missing. - For each model batch of up to 200 fields:
{"model":"sale.order","fields":["partner_id","amount_total"],"mode":"upsert_if_changed"}
- List with
- Translate:
- Single call per run:
{"limit": ${TRANSLATE_LIMIT:=200}, "entities":["field","view_common"]}
- Single call per run:
- Package:
- Single call per run:
{"entities":["field","view_common"], "lang":"ja", "limit": ${PACKAGE_LIMIT:=500}}
- Single call per run:
- Upsert:
- Single call per run:
{"limit": ${UPSERT_LIMIT:=1000}, "dry_run": ${UPSERT_DRY_RUN:=false}}
- Single call per run:
- All POSTs carry
Idempotency-KeyandX-Request-ID.
Note: These selection rules use list endpoints and batch into the required shapes (because
/extract/*requires explicit lists). This keeps the pipeline API-driven with no DB coupling inside jobs.
8) Configuration (env) to standardize across all CronJobs
- API:
API_BASE,API_TIMEOUT_CONNECT=2,API_TIMEOUT_READ=45(upsert may override) - Auth (prod):
CRON_OAUTH_TOKEN_URL,CRON_CLIENT_ID,CRON_CLIENT_SECRET,CRON_AUDIENCE,CRON_SCOPE - Auth (dev):
CRON_STATIC_JWT - Batching:
BATCH_VIEW=100,BATCH_FIELD=200 - Limits:
TRANSLATE_LIMIT=200,PACKAGE_LIMIT=500,UPSERT_LIMIT=1000,UPSERT_DRY_RUN=false - Windowing:
CRON_WINDOW_MINUTES=15 - Logging:
LOG_LEVEL=INFO
9) Observability & logging (stdout JSON)
- Each job logs a single summary JSON per step:
{ "stage":"extract-view", "request_id":"<uuid>", "attempt":1, "idempotency_key":"...", "http_status":200, "metrics":{"picked":123,"inserted":120,"updated":3,"skipped":0,"failed":0}, "duration_ms": 3456 } - Include
errors[]if the API returns details (e.g., from upsert/package/translate responses). - In prod, alert if
failed > 0or the job exits non-zero.
10) Network & security
- Use in-cluster Service DNS; apply NetworkPolicy to only allow CronJob Pods → API Service.
- Store OAuth client credentials / static tokens in a dedicated Secret (e.g.,
secret-cron-auth). - Run as non-root; read-only root filesystem if possible.
11) Safety & kill switches
- To pause the pipeline quickly:
kubectl patch cronjob <name> -p '{"spec":{"suspend":true}}'. - Dry-run switch for upsert in dev:
UPSERT_DRY_RUN=true. - If backlog occurs, adjust schedules or increase
TRANSLATE_LIMIT/PACKAGE_LIMIT/UPSERT_LIMITgradually.
12) DoD (acceptance for Phase K)
- With Cron enabled, items naturally progress queued → upserted without manual intervention.
- On induced failures (e.g., temporary DB/Chroma outage), K8s + in-job retries handle recovery; no duplicates due to Idempotency-Key.
- Audit trail present in logs (per step summary with
request_id); API side audit from Phase J is visible. - No overlapping executions (verified by
concurrencyPolicy: Forbidand schedule offsets).
ew files — Runner (API-driven batch)
jobs/cron-runner/Dockerfile
# Minimal Python runner image
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY main.py /app/main.py
# Default entrypoint; CronJobs pass the stage as an argument
CMD ["python", "/app/main.py"]
jobs/cron-runner/requirements.txt
httpx==0.27.0
pydantic==2.8.2
python-dateutil==2.9.0.post0
jobs/cron-runner/main.py
"""
Universal Cron runner for the API pipeline:
extract-view -> extract-field -> translate -> package -> upsert
- Auth:
* Prod: OAuth2 Client Credentials (token URL + client_id/secret [+ audience/scope])
* Dev: optional static JWT (or no auth)
- Headers: X-Request-ID (single UUID per run), Idempotency-Key (per POST)
- Retries: 3 attempts, exponential backoff on 5xx/429/timeouts
- Timeouts: connect=2s, read=45s (upsert may use 90s)
- Pagination: cursor-based (limit + next_cursor)
- Logging: prints single JSON summary per action to stdout
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import hashlib
import json
import os
import sys
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple
import httpx
from dateutil import tz
from pydantic import BaseModel
UTC = tz.tzutc()
# ---------- Env helpers ----------
def env_str(name: str, default: Optional[str] = None) -> Optional[str]:
v = os.getenv(name)
return v if v is not None and v != "" else default
def env_int(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except Exception:
return default
def env_bool(name: str, default: bool) -> bool:
v = os.getenv(name)
if v is None:
return default
return v.lower() in ("1", "true", "yes", "on")
# ---------- Config ----------
class RunnerConfig(BaseModel):
api_base: str = env_str("API_BASE", "http://portal-api:8000")
timeout_connect: float = float(env_int("API_TIMEOUT_CONNECT", 2))
timeout_read: float = float(env_int("API_TIMEOUT_READ", 45))
timeout_read_upsert: float = float(env_int("API_TIMEOUT_READ_UPSERT", 90))
retries: int = env_int("API_RETRIES", 3)
# batching & limits
batch_view: int = env_int("BATCH_VIEW", 100)
batch_field: int = env_int("BATCH_FIELD", 200)
translate_limit: int = env_int("TRANSLATE_LIMIT", 200)
package_limit: int = env_int("PACKAGE_LIMIT", 500)
upsert_limit: int = env_int("UPSERT_LIMIT", 1000)
upsert_dry_run: bool = env_bool("UPSERT_DRY_RUN", False)
# idempotency
window_minutes: int = env_int("CRON_WINDOW_MINUTES", 15)
# auth
oauth_token_url: Optional[str] = env_str("CRON_OAUTH_TOKEN_URL")
client_id: Optional[str] = env_str("CRON_CLIENT_ID")
client_secret: Optional[str] = env_str("CRON_CLIENT_SECRET")
audience: Optional[str] = env_str("CRON_AUDIENCE")
scope: Optional[str] = env_str("CRON_SCOPE")
static_jwt: Optional[str] = env_str("CRON_STATIC_JWT")
log_level: str = env_str("LOG_LEVEL", "INFO")
# ---------- Utilities ----------
def chunked(seq: List[Any], size: int) -> Iterable[List[Any]]:
for i in range(0, len(seq), size):
yield seq[i : i + size]
def now_epoch_ms() -> int:
return int(time.time() * 1000)
def floor_window_str(minutes: int) -> str:
t = time.gmtime() # UTC
# floor to nearest window
floored_min = (t.tm_min // minutes) * minutes
return time.strftime(f"%Y%m%dT%H{floored_min:02d}", t)
def make_idempotency_key(stage: str, payload: Dict[str, Any], path: str, audience: Optional[str], window_min: int) -> str:
window = floor_window_str(window_min)
m = hashlib.sha256()
m.update(json.dumps(payload, sort_keys=True, ensure_ascii=False, separators=(",", ":")).encode("utf-8"))
m.update(b"|")
m.update(path.encode("utf-8"))
m.update(b"|")
if audience:
m.update(audience.encode("utf-8"))
digest = m.hexdigest()[:16]
return f"{stage}-{window}-{digest}"
# ---------- HTTP client ----------
@dataclass
class ApiAuth:
token: Optional[str] = None
class ApiClient:
def __init__(self, cfg: RunnerConfig, request_id: str):
self.cfg = cfg
self.request_id = request_id
self.auth = ApiAuth()
async def _ensure_token(self):
if self.auth.token:
return
# OAuth client credentials
if self.cfg.oauth_token_url and self.cfg.client_id and self.cfg.client_secret:
data = {"grant_type": "client_credentials"}
if self.cfg.audience:
data["audience"] = self.cfg.audience
if self.cfg.scope:
data["scope"] = self.cfg.scope
async with httpx.AsyncClient(timeout=self.cfg.timeout_read) as client:
r = await client.post(self.cfg.oauth_token_url, data=data, auth=(self.cfg.client_id, self.cfg.client_secret))
r.raise_for_status()
tok = r.json().get("access_token")
if not tok:
raise RuntimeError("OAuth token response missing access_token")
self.auth.token = tok
elif self.cfg.static_jwt:
self.auth.token = self.cfg.static_jwt
else:
# No auth
self.auth.token = None
def _headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]:
h = {"X-Request-ID": self.request_id, "Accept": "application/json"}
if self.auth.token:
h["Authorization"] = f"Bearer {self.auth.token}"
if idempotency_key:
h["Idempotency-Key"] = idempotency_key
return h
async def _retry(self, func, *args, **kwargs) -> httpx.Response:
attempt = 0
last_exc: Optional[Exception] = None
while attempt < self.cfg.retries:
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
code = e.response.status_code
# 409 (idempotency conflict) is considered success by caller
if code in (429, 500, 502, 503, 504):
last_exc = e
else:
raise
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_exc = e
attempt += 1
await asyncio.sleep(2 ** attempt) # 2s, 4s, 8s
if last_exc:
raise last_exc
raise RuntimeError("request failed without exception")
async def get_paginated(self, path: str, limit: int = 500, params: Optional[Dict[str, Any]] = None) -> Iterable[Dict[str, Any]]:
await self._ensure_token()
url = f"{self.cfg.api_base}{path}"
cursor = None
p = dict(params or {})
p["limit"] = limit
async with httpx.AsyncClient(timeout=httpx.Timeout(self.cfg.timeout_read, connect=self.cfg.timeout_connect)) as client:
while True:
if cursor:
p["cursor"] = cursor
r = await self._retry(
client.get, url, headers=self._headers(), params=p
)
r.raise_for_status()
data = r.json()
items = data.get("items", [])
for it in items:
yield it
cursor = data.get("next_cursor")
if not cursor:
break
async def post_json(self, path: str, payload: Dict[str, Any], stage: str, timeout_read: Optional[float] = None) -> httpx.Response:
await self._ensure_token()
url = f"{self.cfg.api_base}{path}"
idem = make_idempotency_key(stage, payload, path, self.cfg.audience, self.cfg.window_minutes)
read_to = timeout_read if timeout_read is not None else self.cfg.timeout_read
async with httpx.AsyncClient(timeout=httpx.Timeout(read_to, connect=self.cfg.timeout_connect)) as client:
r = await self._retry(
client.post, url, headers=self._headers(idem), json=payload
)
# 409 is acceptable (already processed)
if r.status_code == 409:
return r
r.raise_for_status()
return r
# ---------- Stage logic ----------
def label_has_ja_missing_en(label_i18n: Optional[Dict[str, Any]]) -> bool:
if not isinstance(label_i18n, dict):
return False
ja = label_i18n.get("ja") or label_i18n.get("ja_JP")
en = label_i18n.get("en") or label_i18n.get("en_US")
return bool(ja) and not bool(en)
async def run_extract_view(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
t0 = now_epoch_ms()
picked = 0
inserted = 0
updated = 0
skipped = 0
failed = 0
action_ids: List[str] = []
# List candidates
async for item in client.get_paginated("/portal/view_common", limit=500):
# Conditions:
# - help_ja_text exists and help_en_text is empty
# - OR ai_purpose (ja) exists and (en) missing
help_ja = item.get("help_ja_text")
help_en = item.get("help_en_text")
ai_ja = item.get("ai_purpose")
ai_i18n = item.get("ai_purpose_i18n") or {}
ai_en = ai_i18n.get("en") or ai_i18n.get("en_US")
cond = (help_ja and not help_en) or (ai_ja and not ai_en)
if cond:
ax = item.get("action_xmlid")
if ax:
action_ids.append(ax)
picked = len(action_ids)
for chunk in chunked(action_ids, cfg.batch_view):
payload = {
"action_xmlids": chunk,
"targets": ["ai_purpose", "help"],
"mode": "upsert_if_changed",
}
try:
r = await client.post_json("/extract/view_common", payload, stage="extract-view")
# Accept both 200 and 409
if r.status_code == 409:
continue
data = r.json()
inserted += int(data.get("inserted", 0))
updated += int(data.get("updated", 0))
skipped += int(data.get("skipped_no_ja", 0)) + int(data.get("skipped_has_en", 0)) + int(data.get("skipped_not_found", 0))
except Exception:
failed += 1
return {
"stage": "extract-view",
"picked": picked,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"failed": failed,
"duration_ms": now_epoch_ms() - t0,
}
async def run_extract_field(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
t0 = now_epoch_ms()
picked = 0
inserted = 0
updated = 0
skipped = 0
failed = 0
by_model: Dict[str, List[str]] = {}
async for f in client.get_paginated("/portal/field", limit=500):
li18n = f.get("label_i18n")
if label_has_ja_missing_en(li18n):
model = f.get("model") or ""
field_name = f.get("field_name")
if model and field_name:
by_model.setdefault(model, []).append(field_name)
for model, fields in by_model.items():
picked += len(fields)
for chunk in chunked(fields, cfg.batch_field):
payload = {"model": model, "fields": chunk, "mode": "upsert_if_changed"}
try:
r = await client.post_json("/extract/field", payload, stage="extract-field")
if r.status_code == 409:
continue
data = r.json()
inserted += int(data.get("inserted", 0))
updated += int(data.get("updated", 0))
skipped += int(data.get("skipped_no_ja", 0)) + int(data.get("skipped_has_en", 0)) + int(data.get("skipped_not_found", 0))
except Exception:
failed += 1
return {
"stage": "extract-field",
"picked": picked,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"failed": failed,
"duration_ms": now_epoch_ms() - t0,
}
async def run_translate(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
t0 = now_epoch_ms()
failed = 0
translated = 0
picked = 0
payload = {"limit": cfg.translate_limit, "entities": ["field", "view_common"]}
try:
r = await client.post_json("/translate/run", payload, stage="translate")
if r.status_code != 409:
data = r.json()
picked = int(data.get("picked", 0))
translated = int(data.get("translated", 0))
failed = int(data.get("failed", 0))
except Exception:
failed = 1
return {
"stage": "translate",
"picked": picked,
"translated": translated,
"failed": failed,
"duration_ms": now_epoch_ms() - t0,
}
async def run_package(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
t0 = now_epoch_ms()
queued = 0
skipped = 0
failed = 0
payload = {
"entities": ["field", "view_common"],
"lang": "ja",
"limit": cfg.package_limit,
}
try:
r = await client.post_json("/chroma/package", payload, stage="package")
if r.status_code != 409:
data = r.json()
queued = int(data.get("queued", 0))
skipped = int(data.get("skipped_no_change", 0))
failed = int(data.get("failed", 0))
except Exception:
failed = 1
return {
"stage": "package",
"queued": queued,
"skipped": skipped,
"failed": failed,
"duration_ms": now_epoch_ms() - t0,
}
async def run_upsert(client: ApiClient, cfg: RunnerConfig) -> Dict[str, Any]:
t0 = now_epoch_ms()
processed = 0
upserted = 0
skipped = 0
failed = 0
payload = {"limit": cfg.upsert_limit, "dry_run": cfg.upsert_dry_run}
try:
r = await client.post_json("/chroma/upsert", payload, stage="upsert", timeout_read=cfg.timeout_read_upsert)
if r.status_code != 409:
data = r.json()
processed = int(data.get("processed", 0))
upserted = int(data.get("upserted", 0))
skipped = int(data.get("skipped", 0))
failed = int(data.get("failed", 0))
except Exception:
failed = 1
return {
"stage": "upsert",
"processed": processed,
"upserted": upserted,
"skipped": skipped,
"failed": failed,
"duration_ms": now_epoch_ms() - t0,
}
# ---------- CLI ----------
async def main_async(stage: str) -> int:
cfg = RunnerConfig()
request_id = str(uuid.uuid4())
client = ApiClient(cfg, request_id)
if stage == "extract-view":
res = await run_extract_view(client, cfg)
elif stage == "extract-field":
res = await run_extract_field(client, cfg)
elif stage == "translate":
res = await run_translate(client, cfg)
elif stage == "package":
res = await run_package(client, cfg)
elif stage == "upsert":
res = await run_upsert(client, cfg)
else:
print(json.dumps({"error": f"unknown stage '{stage}'"}))
return 2
# Standard summary record
res["request_id"] = request_id
print(json.dumps(res, ensure_ascii=False))
# Exit non-zero only on hard failures (network/exception); here use failed>0 as soft success (exit 0)
return 0
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Cron runner for API pipeline")
p.add_argument("stage", choices=["extract-view", "extract-field", "translate", "package", "upsert"], help="pipeline stage")
return p.parse_args(argv)
def main():
args = parse_args(sys.argv[1:])
code = asyncio.run(main_async(args.stage))
sys.exit(code)
if __name__ == "__main__":
main()
New files — Kubernetes (base)
k8s/base/cronjobs/sa-cron-runner.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: cron-runner
labels:
app.kubernetes.io/name: cron-runner
Add this ServiceAccount to your base
kustomization.yamlunderresources:.
Overlay patches — Development
k8s/overlays/dev/patch-cron-schedules.yaml
# Offsets for 30-min cadence
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
schedule: "*/30 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
schedule: "2,32 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
schedule: "5,35 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
schedule: "10,40 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
schedule: "15,45 * * * *"
k8s/overlays/dev/patch-cron-policy.yaml
# Common policy & SA; applied per CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
k8s/overlays/dev/patch-cron-image.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:dev
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:dev
k8s/overlays/dev/patch-cron-env.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: API_TIMEOUT_CONNECT
value: "2"
- name: API_TIMEOUT_READ
value: "45"
- name: CRON_WINDOW_MINUTES
value: "30"
- name: BATCH_VIEW
value: "100"
- name: LOG_LEVEL
value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: API_TIMEOUT_CONNECT
value: "2"
- name: API_TIMEOUT_READ
value: "45"
- name: CRON_WINDOW_MINUTES
value: "30"
- name: BATCH_FIELD
value: "200"
- name: LOG_LEVEL
value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: TRANSLATE_LIMIT
value: "200"
- name: LOG_LEVEL
value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: PACKAGE_LIMIT
value: "500"
- name: LOG_LEVEL
value: "INFO"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: UPSERT_LIMIT
value: "1000"
- name: UPSERT_DRY_RUN
value: "false"
- name: API_TIMEOUT_READ_UPSERT
value: "90"
- name: LOG_LEVEL
value: "INFO"
k8s/overlays/dev/patch-cron-args.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["extract-view"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["extract-field"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["translate"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["package"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["upsert"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
Overlay patches — Production
k8s/overlays/prod/patch-cron-schedules.yaml
# Offsets for 15-min cadence
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
schedule: "*/15 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
schedule: "2,17,32,47 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
schedule: "4,19,34,49 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
schedule: "6,21,36,51 * * * *"
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
schedule: "8,23,38,53 * * * *"
k8s/overlays/prod/patch-cron-policy.yaml
# Same as dev; included here for clarity/independence
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
concurrencyPolicy: Forbid
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 1
activeDeadlineSeconds: 900
template:
spec:
serviceAccountName: cron-runner
restartPolicy: Never
k8s/overlays/prod/patch-cron-image.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:${GIT_SHA}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
image: ghcr.io/your-org/cron-runner:${GIT_SHA}
k8s/overlays/prod/patch-cron-env.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: API_TIMEOUT_CONNECT
value: "2"
- name: API_TIMEOUT_READ
value: "45"
- name: CRON_WINDOW_MINUTES
value: "15"
- name: BATCH_VIEW
value: "100"
- name: LOG_LEVEL
value: "INFO"
# OAuth (from secret)
- name: CRON_OAUTH_TOKEN_URL
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
- name: CRON_CLIENT_ID
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
- name: CRON_CLIENT_SECRET
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
- name: CRON_AUDIENCE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
- name: CRON_SCOPE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: API_TIMEOUT_CONNECT
value: "2"
- name: API_TIMEOUT_READ
value: "45"
- name: CRON_WINDOW_MINUTES
value: "15"
- name: BATCH_FIELD
value: "200"
- name: LOG_LEVEL
value: "INFO"
- name: CRON_OAUTH_TOKEN_URL
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
- name: CRON_CLIENT_ID
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
- name: CRON_CLIENT_SECRET
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
- name: CRON_AUDIENCE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
- name: CRON_SCOPE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: TRANSLATE_LIMIT
value: "200"
- name: LOG_LEVEL
value: "INFO"
- name: CRON_OAUTH_TOKEN_URL
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
- name: CRON_CLIENT_ID
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
- name: CRON_CLIENT_SECRET
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
- name: CRON_AUDIENCE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
- name: CRON_SCOPE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: PACKAGE_LIMIT
value: "500"
- name: LOG_LEVEL
value: "INFO"
- name: CRON_OAUTH_TOKEN_URL
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
- name: CRON_CLIENT_ID
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
- name: CRON_CLIENT_SECRET
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
- name: CRON_AUDIENCE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
- name: CRON_SCOPE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
env:
- name: API_BASE
value: "http://portal-api:8000"
- name: UPSERT_LIMIT
value: "1000"
- name: UPSERT_DRY_RUN
value: "false"
- name: API_TIMEOUT_READ_UPSERT
value: "90"
- name: LOG_LEVEL
value: "INFO"
- name: CRON_OAUTH_TOKEN_URL
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_OAUTH_TOKEN_URL } }
- name: CRON_CLIENT_ID
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_ID } }
- name: CRON_CLIENT_SECRET
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_CLIENT_SECRET } }
- name: CRON_AUDIENCE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_AUDIENCE } }
- name: CRON_SCOPE
valueFrom: { secretKeyRef: { name: secret-cron-auth, key: CRON_SCOPE } }
k8s/overlays/prod/patch-cron-args.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-view
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["extract-view"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-extract-field
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["extract-field"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-translate-run
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["translate"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-package
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["package"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cj-chroma-upsert
spec:
jobTemplate:
spec:
template:
spec:
containers:
- name: runner
command: ["python", "/app/main.py"]
args: ["upsert"]
resources:
requests: { cpu: 50m, memory: 64Mi }
limits: { cpu: 200m, memory: 256Mi }
k8s/overlays/prod/secret-cron-auth.yaml (if you don’t already have one)
apiVersion: v1
kind: Secret
metadata:
name: secret-cron-auth
type: Opaque
stringData:
CRON_OAUTH_TOKEN_URL: "https://your-idp.example.com/oauth/token"
CRON_CLIENT_ID: "xxxxxxxxxxxxxxxx"
CRON_CLIENT_SECRET: "xxxxxxxxxxxxxxxx"
CRON_AUDIENCE: "https://portal-api.example.com"
CRON_SCOPE: "api.read api.write"
Notes (wiring)
- Add
k8s/base/cronjobs/sa-cron-runner.yamlto basekustomization.yamlresources. - Add these patch files to overlays
kustomization.yaml(dev/prod respectively). - Ensure the base CronJobs are already included as resources (as in your current tree).
1) Test matrix (what we verify)
| Area | Goal | Method | Expected |
|---|---|---|---|
| Extract-View | Picks only items with JA present & EN missing (help or ai_purpose) | Seed minimal rows, run extract-view | picked = targeted items; inserted/updated > 0; no duplicates on re-run (Idempotency). |
| Extract-Field | Picks only fields with JA present & EN missing | Seed fields across 1–2 models, run extract-field | Same as above; fields with EN present are skipped. |
| Translate | Moves pending → translated, respects limit | Run translate | translated increases up to limit; failures reported. |
| Package | Creates portal_chroma_doc with queued | Run package | queued increases; docs have doc_id = natural_key::lang. |
| Upsert | Moves queued → upserted (idempotent) | Run upsert | processed ≥ upserted; subsequent run upserts 0 or only new. |
| Idempotency | No double effects within window | Re-run same stage within 15–30 min | 200/409 with Idempotency-Status: replayed (or API returns 409); counters unchanged. |
| Retries | Backoff on transient errors | Inject 500/timeout once | Succeeds after retry (<= 3 attempts), job exits 0, summary logs attempts. |
| Auth & Headers | Propagate Request-ID, Bearer token | Inspect API logs | Correlated X-Request-ID present; 401/403 if token invalid. |
2) Sample data (SQL) — minimal but realistic
Run these in your dev database before tests. Adjust table/column names only if they differ.
-- Models (optional helper for model_table)
INSERT INTO public.portal_model (model, model_table, label_i18n, notes, created_at, updated_at)
VALUES
('sale.order', 'sale_order', '{"ja":"受注","en":null}', null, now(), now())
ON CONFLICT DO NOTHING;
-- Fields: one target (EN missing), one control (EN present)
INSERT INTO public.portal_fields (model_id, model, model_table, field_name, ttype, label_i18n, notes, origin, created_at, updated_at)
VALUES
( (SELECT id FROM public.portal_model WHERE model='sale.order'),
'sale.order', 'sale_order', 'partner_id', 'many2one',
'{"ja":"取引先","en":null}', null, 'odoo', now(), now()
),
( (SELECT id FROM public.portal_model WHERE model='sale.order'),
'sale.order', 'sale_order', 'amount_total', 'monetary',
'{"ja":"合計","en":"Total"}', null, 'odoo', now(), now()
)
ON CONFLICT DO NOTHING;
-- ViewCommon: one with help_ja present & help_en missing; one control already translated
INSERT INTO public.portal_view_common
(action_xmlid, action_name, model, model_label, model_table, view_types, primary_view_type,
help_ja_text, help_en_text, ai_purpose, ai_purpose_i18n, created_at, updated_at)
VALUES
('sale.action_orders', 'Orders', 'sale.order', '受注', 'sale_order',
ARRAY['list','form'], 'list',
'受注一覧の説明(日本語)', null,
'この画面の目的(日本語)', '{"en":null}',
now(), now()
),
('sale.action_orders_report', 'Order Reports', 'sale.order', '受注', 'sale_order',
ARRAY['list'], 'list',
'説明(日本語)', 'Description (EN done)',
'目的(日本語)', '{"en":"Purpose (done)"}',
now(), now()
)
ON CONFLICT DO NOTHING;
-- Translate & portal_chroma_doc are initially empty; the pipeline will populate them.
DELETE FROM public.translate;
DELETE FROM public.portal_chroma_doc;
What’s targeted by the runner:
- Fields:
sale.order:partner_id(JA present, EN missing) → should be extracted. - Fields (control):
sale.order:amount_total(EN already present) → should be skipped. - ViewCommon:
sale.action_orders(help_ja present & help_en missing; also ai_purpose JA present, EN missing) → should be extracted. - ViewCommon (control):
sale.action_orders_report(EN already present for help/purpose) → should be skipped.
3) How to run (local/dev)
3.1 Runner (local)
From jobs/cron-runner/:
# Build runner image locally (optional; or use python directly)
docker build -t cron-runner:dev .
# Or run with local Python (requires API reachable at API_BASE)
pip install -r requirements.txt
export API_BASE="http://localhost:8000" # or in-cluster URL if running in K8s
export CRON_WINDOW_MINUTES=30
export LOG_LEVEL=INFO
# Dev auth (Phase J: AUTH_MODE=none) or a static JWT if you use AUTH_MODE=static:
# export CRON_STATIC_JWT="eyJhbGciOi..."
python main.py extract-view
python main.py extract-field
python main.py translate
python main.py package
python main.py upsert
Each command prints a single JSON summary (one line). Save them if you want to diff across runs.
3.2 Kubernetes (dev overlay)
To accelerate tests, temporarily change schedules to every minute (offsets still apply) and apply dev overlay:
# (optional) patch schedules for fast test cycles
# cj-extract-view: "*/1 * * * *", cj-extract-field: "*/1 * * * *" (at :02), etc.
kubectl apply -k k8s/overlays/dev
# Observe
kubectl get cronjobs
kubectl get jobs --watch
kubectl logs job/<job-name> -c runner
4) Expected results (based on the sample data)
After running once in order (extract-view → extract-field → translate → package → upsert):
- Extract-View (sale.action_orders)
picked≥ 1 (exactly 1 from sample)inserted + updated≥ 1skippedmay be 0failed= 0- Translate table now has 1 row for the
view_commontarget (statuspending)
- Extract-Field (sale.order:partner_id)
picked≥ 1 (exactly 1 from sample;amount_totalis skipped)inserted + updated≥ 1skipped≥ 0 (the EN-present control should count as skipped, if accounted)failed= 0- Translate table now has 2
pendingrows total (1 from view_common + 1 from field)
- Translate
picked≥ 1 (up toTRANSLATE_LIMIT, default 200)translated= 2 (for our twopendingitems)failed= 0- Translate rows move from
pending→translatedwithtranslated_label/translated_purposepopulated (exact content depends on the provider and your configuration)
- Package
queued≥ 2 (one doc for field; one for view_common; language =ja)skipped_no_change= 0 on first runfailed= 0- portal_chroma_doc contains 2 docs with
status='queued'and uniquedoc_id = natural_key::lang
- Upsert
processed≥ 2upserted≥ 2skipped= 0 on first runfailed= 0- portal_chroma_doc rows move to
status='upserted'
- Re-run the same stages within the window (e.g., 15–30 minutes)
extract-view/extract-field:- API may return 409 (idempotency) or small counts with
skipped_*due to “no JA”/“EN present”. - Runner treats 409 as success; totals do not double.
- API may return 409 (idempotency) or small counts with
translate: no newpending→translated ≈ 0package: no changes →queuedsmall or 0;skipped_no_changemay increaseupsert:upserted ≈ 0
Side checks
- API audit logs should show one
X-Request-IDper runner execution, and Idempotency-Key on POSTs (409 or replayed behavior indicated by your middleware headers). /status/summaryshould reflecttranslate.translatedandchroma_doc.upsertedcounts consistent with the above.
5) Failure-injection & retry scenarios
Case F1 — transient 500 on first POST to /extract/view_common
- Simulate by temporarily forcing the API to return HTTP 500 once.
- Runner behavior: retries (2s → 4s → 8s) up to 3 attempts; eventually 200/409.
- Expected: job exits 0, JSON summary shows failed=0 or 1 (depending on where you count),
inserted/updatedas normal.
Case F2 — timeout on /chroma/upsert
- Simulate a slow response; runner uses read timeout 90s for upsert.
- Expected: if it times out once, it retries; if all retries fail, job still prints summary and exits non-zero only for unhandled errors (default code prints
failed > 0but exits 0; you can change policy if desired).
Case F3 — auth error (401/403)
- Provide an invalid token / audience.
- Expected: API returns 401/403; runner does not retry these; job prints JSON with minimal metrics and you’ll see 401/403 in API logs with the same
X-Request-ID.
6) Unit tests (runner) — examples with pytest + respx
Add this under jobs/cron-runner/tests/test_runner.py. It mocks HTTP calls and verifies idempotency key & retry logic.
import json
import uuid
import respx
import httpx
import pytest
from jobs.cron_runner.main import RunnerConfig, ApiClient, make_idempotency_key
def test_idempotency_key_deterministic(monkeypatch):
stage = "extract-view"
payload = {"action_xmlids": ["sale.action_orders"], "targets": ["help"], "mode":"upsert_if_changed"}
path = "/extract/view_common"
audience = "https://portal-api.example.com"
# Freeze window to ensure stable key
class T:
def __enter__(self): return self
def __exit__(self, *a): return False
# monkeypatch time.gmtime if needed to a fixed minute
k1 = make_idempotency_key(stage, payload, path, audience, 15)
k2 = make_idempotency_key(stage, payload, path, audience, 15)
assert k1 == k2
assert stage in k1
@pytest.mark.asyncio
async def test_retry_on_500(monkeypatch):
cfg = RunnerConfig(api_base="http://api", retries=3)
client = ApiClient(cfg, request_id=str(uuid.uuid4()))
with respx.mock(assert_all_called=True) as rsx:
rsx.post("http://api/extract/view_common").respond(500, json={"error":"boom"})
rsx.post("http://api/extract/view_common").respond(200, json={"inserted":1,"updated":0,"skipped_no_ja":0,"skipped_has_en":0,"skipped_not_found":0})
payload = {"action_xmlids":["sale.action_orders"],"targets":["help"],"mode":"upsert_if_changed"}
r = await client.post_json("/extract/view_common", payload, stage="extract-view")
assert r.status_code == 200
How to run:
pip install pytest respx
pytest -q jobs/cron-runner/tests
7) Integration tests (against the real dev API)
- Seed the sample SQL (Section 2).
- Run the stages locally:
export API_BASE="http://localhost:8000"
python jobs/cron-runner/main.py extract-view
python jobs/cron-runner/main.py extract-field
python jobs/cron-runner/main.py translate
python jobs/cron-runner/main.py package
python jobs/cron-runner/main.py upsert
- Verify with API or DB:
GET /status/summaryshowstranslate.translated >= 2,chroma_doc.upserted >= 2.- DB:
SELECT status FROM public.portal_chroma_doc;→ allupserted.
- Re-run the same stages within 15–30 minutes:
- Expect near-zero deltas; API may reply 409 or replay; no duplicates.
8) K8s end-to-end (dev overlay)
- Apply dev overlay (or a dedicated test overlay with every-minute schedules).
- Watch jobs for 2–3 cycles and collect stdout summaries.
- Confirm no overlaps (thanks to
concurrencyPolicy: Forbid), and the pipeline naturally advances queued → upserted.
Final notes
- Treat the runner summaries as ground truth for job-level SLOs; ingest them into your logging/observability (e.g., Loki/ELK).
- Keep idempotency windows aligned across jobs (default 15 min) to prevent duplicate effects.
- If your translation adapter throttles, reduce
TRANSLATE_LIMITand/or slow the cadence. - For production, use real OAuth client credentials and confirm the JWKS/issuer/audience checks from Phase J.
コメントを残す