This is ready to copy-paste to the offshore team. It includes:
- Decisions to finalize (act as the spec),
- Which files to modify/add with purpose,
- Code (diffs for existing files, full source for new files) + very short summaries.
1) Decisions to finalize (treat as the specification)
1) Health checks (readiness / liveness / startup)
- Endpoints:
/health/live→ always 200 (process liveness only)./health/ready→ DB required; Chroma optional byHEALTH_CHECK_CHROMA(prod: true; dev: optional)./health/startup→ returns 200 after app init to avoid premature readiness.
- Probes (guideline):
startupProbe:periodSeconds: 2,failureThreshold: 60(≈120s budget).readinessProbe:initialDelaySeconds: 3,periodSeconds: 5,failureThreshold: 3.livenessProbe:periodSeconds: 10,failureThreshold: 3.
2) Auth + Audit + Request ID
AUTH_MODE=none | static | jwks.- dev:
noneorstatic(HS256 withJWT_SECRET) - prod:
jwkswithJWKS_URL,JWT_ISSUER,JWT_AUDIENCE
- dev:
- Exempt paths (no auth):
/health/*(+ docs if desired). - Always echo
X-Request-ID(honor inbound; else generate UUIDv4). - Audit log (JSON Lines):
ts, level, request_id, user, method, path, status, latency_ms, ip, ua, bytes_in/out (optional).
No body logging by default (privacy; optional sampling).
3) Idempotency-Key (dedup for POST/PUT; backward compatible)
- Header:
Idempotency-Key. - Scope: side-effect endpoints (e.g., all
POST /portal/*,POST /chroma/upsert,POST /portal/view/*bootstrap*,POST /portal/view/set_primary, etc.).
GET/DELETE excluded; PATCH excluded by default. - Uniqueness: hash of
(method, path, body, auth_subject). - Storage: PostgreSQL table
public.idempotency_keys. TTL = 24h; GC by job. - Behavior:
- First call → run and persist response.
- Replay (same key + same hash) → return stored response with
Idempotency-Status: replayed. - Conflict (same key + diff payload) → 409 Conflict.
4) Config separation via Pydantic Settings + K8s ConfigMap/Secret
- ConfigMap (non-secret):
APP_ENV, LOG_LEVEL, ENABLE_DEV_ENDPOINTS, AUTH_MODE, JWKS_URL, JWT_ISSUER, JWT_AUDIENCE, HEALTH_CHECK_CHROMA, CHROMA_URL, EMBED_*, APP_VERSION, GIT_SHA, IDEMPOTENCY_TTL_SECONDS - Secret:
DATABASE_URL, JWT_SECRET, CHROMA_API_KEY, OPENAI_API_KEY /status/summaryreturns build meta:app_version,git_sha,auth_mode,env.
5) K8s rollout & HA
- RollingUpdate:
maxUnavailable: 0,maxSurge: 1. preStop: sleep 5,terminationGracePeriodSeconds: 30.- HPA (prod):
minReplicas: 2,maxReplicas: 5. - PDB:
minAvailable: 1. - Container: prefer
--workers 1; scale with Pods instead.
6) Errors (Problem+JSON)
- Shape:
type, title, detail, status. - Always echo
X-Request-IDon errors. - Distinguish
401(unauthenticated) vs403(unauthorized).
7) Audit minimal fields (as above) + optional per-endpoint counters in metrics field.
8) (Cron not in scope for this phase; ignore)
9) OpenAPI security policy
- prod overlay: global
security: [{ bearerAuth: [] }]. - Health endpoints:
security: [].
10) DoD (acceptance)
- Rolling restart under
maxUnavailable: 0→ no 5xx / no connection drops. - Idempotency replay behavior.
- Audit with
X-Request-IDon all responses. - Readiness flips to false if DB degraded; traffic drained by K8s.
- In prod mode: unauth’d → 401; wrong audience/issuer → 403.
2) Files to modify / add
Modify (diffs below)
api/app/config.py— add Phase-J settings (auth, health, build meta, idempotency).api/app/main.py— register Request-ID & audit middleware, Auth middleware (with exempt paths), Idempotency middleware, Health router, and dev/prod router gating.api/app/utils/audit.py— implement middleware that injects/echoesX-Request-IDand logs JSON Lines.api/app/services/chroma_client.py— addheartbeat()for readiness DB+Chroma.api/app/routers/status.py— add meta (app_version,git_sha,auth_mode,env) to summary response.api/app/openapi/openapi.yaml(andopenapi_ver2.yamlif mirrored) — add the 3 health endpoints; confirm prodsecurityoverlay.api/requirements.txt— addpython-jose[cryptography],httpx.k8s/overlays/dev/patch-api-env.yaml— add Phase-J envs.k8s/overlays/dev/patch-api-env-from-secret.yaml— (optional) auth secret ref.k8s/overlays/dev/patch-api-probes.yaml— point probes to/health/*.k8s/overlays/prod/patch-api-env.yaml— prod values (jwks, dev endpoints off).k8s/overlays/prod/patch-api-env-from-secret.yaml— prod auth secret ref.
Add (full files below)
api/app/routers/health.py— health endpoints (/live,/ready,/startup).api/app/services/auth.py— auth middleware (none | static | jwks) with JWKS caching; request user claims inrequest.state.user.api/app/repos/idempotency_repo.py— Postgres DAO for idempotency.api/app/services/idempotency.py— idempotency middleware for POST/PUT.api/sql/050_idempotency_keys.sql— DDL for idempotency store.k8s/base/portal-api/pdb.yaml— PodDisruptionBudget.k8s/overlays/dev/patch-api-prestop.yaml— graceful shutdown for dev.k8s/overlays/prod/patch-api-prestop.yaml— graceful shutdown for prod.
3) Code (diffs for existing files; full for new files)
Short note per file is included before each block. Comments are in English for offshore clarity.
(Modify) api/app/config.py — extend settings
--- a/api/app/config.py
+++ b/api/app/config.py
@@
-from pydantic import BaseSettings, Field
+from pydantic import BaseSettings, Field
+from typing import Optional
class Settings(BaseSettings):
DATABASE_URL: str = Field(..., description="SQLAlchemy DB URL")
@@
- # 既存: CHROMA_URL, OPENAI_API_KEY, EMBED_* などはそのまま
+ # Existing: CHROMA_URL, OPENAI_API_KEY, EMBED_* remain unchanged
+
+ # ── Phase J additions ───────────────────────────────────────────────────
+ APP_ENV: str = Field("dev", description="dev|prod")
+ LOG_LEVEL: str = Field("INFO")
+ ENABLE_DEV_ENDPOINTS: bool = Field(True, description="Enable x-dev-only endpoints")
+
+ # Auth mode: none / static(HS256) / jwks
+ AUTH_MODE: str = Field("none", description="none|static|jwks")
+ JWT_SECRET: Optional[str] = Field(None, description="Used when AUTH_MODE=static")
+ JWKS_URL: Optional[str] = Field(None, description="Used when AUTH_MODE=jwks")
+ JWT_ISSUER: Optional[str] = None
+ JWT_AUDIENCE: Optional[str] = None
+ JWT_CACHE_SECONDS: int = 300
+ JWT_LEEWAY_SECONDS: int = 60
+
+ # Health
+ HEALTH_CHECK_CHROMA: bool = Field(False)
+
+ # Build meta
+ APP_VERSION: str = Field("dev")
+ GIT_SHA: str = Field("unknown")
+
+ # Idempotency
+ IDEMPOTENCY_TTL_SECONDS: int = 86400
class Config:
env_file = ".env"
(Modify) api/app/main.py — register middlewares & routers
--- a/api/app/main.py
+++ b/api/app/main.py
@@
-from fastapi import FastAPI
+from fastapi import FastAPI
+from .config import Settings
+from .utils.audit import install_observability
+from .services.auth import AuthMiddleware, auth_exempt_paths
+from .services.idempotency import IdempotencyMiddleware
+from .routers import status as status_router
+from .routers.health import router as health_router
+from . import routers as app_routers
@@
-app = FastAPI()
+settings = Settings()
+app = FastAPI()
+# Request-ID + audit JSON logs
+install_observability(app, settings)
+
+# Auth (exempt health and docs)
+app.add_middleware(
+ AuthMiddleware,
+ settings=settings,
+ exempt_paths=auth_exempt_paths(default_extra=["/health/live", "/health/ready", "/health/startup"])
+)
+
+# Idempotency for POST/PUT
+app.add_middleware(IdempotencyMiddleware, settings=settings)
+
# Router registration
+# Health is always public
+app.include_router(health_router)
+
+# Dev-only gating
+if settings.ENABLE_DEV_ENDPOINTS:
+ app.include_router(app_routers.chroma)
+ app.include_router(app_routers.chroma_docs)
+ app.include_router(app_routers.extract)
+ app.include_router(app_routers.translate)
+ app.include_router(app_routers.writeback)
+ app.include_router(app_routers.package)
+else:
+ # Only public endpoints; e.g., /chroma/search
+ app.include_router(app_routers.chroma)
+
+# Common routers
+app.include_router(app_routers.portal_model)
+app.include_router(app_routers.portal_field)
+app.include_router(app_routers.portal_view_common)
+app.include_router(app_routers.portal_view)
+app.include_router(app_routers.portal_tab)
+app.include_router(app_routers.portal_smart_button)
+app.include_router(app_routers.portal_menu)
+app.include_router(status_router.router)
(Modify) api/app/utils/audit.py — Request-ID + JSON audit middleware
--- a/api/app/utils/audit.py
+++ b/api/app/utils/audit.py
@@
-import logging
+import logging, json, time, uuid
+from fastapi import Request, Response
+from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger("api")
+class _RequestContextMiddleware(BaseHTTPMiddleware):
+ def __init__(self, app, settings):
+ super().__init__(app)
+ self.settings = settings
+
+ async def dispatch(self, request: Request, call_next):
+ rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
+ start = time.time()
+ request.state.request_id = rid
+ resp: Response = None
+ try:
+ resp = await call_next(request)
+ status_code = resp.status_code
+ except Exception:
+ status_code = 500
+ raise
+ finally:
+ latency_ms = int((time.time() - start) * 1000)
+ record = {
+ "ts": time.time(),
+ "level": "INFO",
+ "request_id": rid,
+ "method": request.method,
+ "path": request.url.path,
+ "status": status_code,
+ "latency_ms": latency_ms,
+ "ip": request.client.host if request.client else None,
+ "ua": request.headers.get("user-agent"),
+ }
+ logger.info(json.dumps(record, ensure_ascii=False))
+ if resp:
+ resp.headers["X-Request-ID"] = rid
+ return resp
+
+def install_observability(app, settings):
+ logging.getLogger().setLevel(getattr(logging, settings.LOG_LEVEL, logging.INFO))
+ app.add_middleware(_RequestContextMiddleware, settings=settings)
(Modify) api/app/services/chroma_client.py — add heartbeat()
--- a/api/app/services/chroma_client.py
+++ b/api/app/services/chroma_client.py
@@
-from typing import Optional
+from typing import Optional
+import httpx
class ChromaClient:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
@@
# existing upsert/search methods here
+
+ async def heartbeat(self) -> bool:
+ """Simple reachability check for Chroma."""
+ if not self.base_url:
+ return False
+ url = f"{self.base_url}/api/v1/heartbeat"
+ headers = {}
+ if self.api_key:
+ headers["Authorization"] = f"Bearer {self.api_key}"
+ try:
+ async with httpx.AsyncClient(timeout=3.0) as client:
+ r = await client.get(url, headers=headers)
+ return r.status_code == 200
+ except Exception:
+ return False
(Modify) api/app/routers/status.py — add build meta into summary
--- a/api/app/routers/status.py
+++ b/api/app/routers/status.py
@@
-from fastapi import APIRouter
+from fastapi import APIRouter
+from ..config import Settings
router = APIRouter(prefix="/status", tags=["Status"])
@router.get("/summary")
def summary():
- # keep existing aggregation logic
- return {
- "translate": {...},
- "chroma_doc": {...},
- }
+ s = Settings()
+ # Keep existing aggregation logic; append meta below
+ data = {
+ "translate": {...}, # existing
+ "chroma_doc": {...}, # existing
+ "meta": {
+ "app_version": s.APP_VERSION,
+ "git_sha": s.GIT_SHA,
+ "auth_mode": s.AUTH_MODE,
+ "env": s.APP_ENV,
+ },
+ }
+ return data
(Modify) api/app/openapi/openapi.yaml — health endpoints & prod security
--- a/api/app/openapi/openapi.yaml
+++ b/api/app/openapi/openapi.yaml
@@
securitySchemes:
bearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
@@
paths:
+ /health/live:
+ get:
+ tags: [Health]
+ security: []
+ summary: Liveness
+ responses:
+ '200': { description: OK }
+ /health/ready:
+ get:
+ tags: [Health]
+ security: []
+ summary: Readiness (DB + optional Chroma)
+ responses:
+ '200': { description: OK }
+ '503': { description: Not Ready }
+ /health/startup:
+ get:
+ tags: [Health]
+ security: []
+ summary: Startup complete
+ responses:
+ '200': { description: OK }
Note: In prod overlay, apply global
security: [{ bearerAuth: [] }]; keepsecurity: []for health endpoints.
(Modify) api/requirements.txt — add auth/http dependencies
--- a/api/requirements.txt
+++ b/api/requirements.txt
@@
fastapi
uvicorn
sqlalchemy
psycopg2-binary
+python-jose[cryptography]
+httpx
(Modify) k8s/overlays/dev/patch-api-env.yaml — add Phase-J envs
--- a/k8s/overlays/dev/patch-api-env.yaml
+++ b/k8s/overlays/dev/patch-api-env.yaml
@@
containers:
- name: portal-api
env:
+ - name: APP_ENV
+ value: "dev"
+ - name: LOG_LEVEL
+ value: "INFO"
+ - name: ENABLE_DEV_ENDPOINTS
+ value: "true"
+ - name: AUTH_MODE
+ value: "none"
+ - name: HEALTH_CHECK_CHROMA
+ value: "false"
+ - name: APP_VERSION
+ value: "dev"
+ - name: GIT_SHA
+ value: "dev-local"
+ - name: IDEMPOTENCY_TTL_SECONDS
+ value: "86400"
(Modify) k8s/overlays/dev/patch-api-env-from-secret.yaml — (optional) dev auth secret
--- a/k8s/overlays/dev/patch-api-env-from-secret.yaml
+++ b/k8s/overlays/dev/patch-api-env-from-secret.yaml
@@
containers:
- name: portal-api
envFrom:
- secretRef:
name: secret-api-db
+ # optional:
+ # - secretRef:
+ # name: secret-api-auth
(Modify) k8s/overlays/dev/patch-api-probes.yaml — hook probes to health endpoints
--- a/k8s/overlays/dev/patch-api-probes.yaml
+++ b/k8s/overlays/dev/patch-api-probes.yaml
@@
startupProbe:
- # replace old settings
+ httpGet: { path: /health/startup, port: 8000 }
+ periodSeconds: 2
+ failureThreshold: 60
readinessProbe:
- # replace old settings
+ httpGet: { path: /health/ready, port: 8000 }
+ initialDelaySeconds: 3
+ periodSeconds: 5
+ failureThreshold: 3
livenessProbe:
- # replace old settings
+ httpGet: { path: /health/live, port: 8000 }
+ periodSeconds: 10
+ failureThreshold: 3
(Modify) k8s/overlays/prod/patch-api-env.yaml — prod envs
--- a/k8s/overlays/prod/patch-api-env.yaml
+++ b/k8s/overlays/prod/patch-api-env.yaml
@@
containers:
- name: portal-api
env:
+ - name: APP_ENV
+ value: "prod"
+ - name: LOG_LEVEL
+ value: "INFO"
+ - name: ENABLE_DEV_ENDPOINTS
+ value: "false"
+ - name: AUTH_MODE
+ value: "jwks"
+ - name: JWKS_URL
+ valueFrom: { secretKeyRef: { name: secret-api-auth, key: JWKS_URL } }
+ - name: JWT_ISSUER
+ valueFrom: { secretKeyRef: { name: secret-api-auth, key: JWT_ISSUER } }
+ - name: JWT_AUDIENCE
+ valueFrom: { secretKeyRef: { name: secret-api-auth, key: JWT_AUDIENCE } }
+ - name: HEALTH_CHECK_CHROMA
+ value: "true"
+ - name: APP_VERSION
+ value: "$(GIT_TAG)"
+ - name: GIT_SHA
+ value: "$(GIT_SHA)"
+ - name: IDEMPOTENCY_TTL_SECONDS
+ value: "86400"
(Modify) k8s/overlays/prod/patch-api-env-from-secret.yaml — prod auth secret
--- a/k8s/overlays/prod/patch-api-env-from-secret.yaml
+++ b/k8s/overlays/prod/patch-api-env-from-secret.yaml
@@
containers:
- name: portal-api
envFrom:
- secretRef:
name: secret-api-db
+ - secretRef:
+ name: secret-api-auth
(Add) api/app/routers/health.py — health endpoints (full)
Purpose: 3 health endpoints. /ready checks DB and, if enabled, Chroma heartbeat.
# api/app/routers/health.py
from fastapi import APIRouter, Response, status
from ..config import Settings
from ..db import engine
from sqlalchemy import text
import asyncio
router = APIRouter(prefix="/health", tags=["Health"])
_startup_done = asyncio.Event()
@router.get("/live")
async def live():
return {"ok": True}
@router.get("/startup")
async def startup():
if not _startup_done.is_set():
# If you gate readiness on app init, set this from main as needed.
_startup_done.set()
return {"ok": True}
@router.get("/ready")
async def ready():
s = Settings()
# DB ping
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
except Exception:
return Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
# Optional Chroma heartbeat
if s.HEALTH_CHECK_CHROMA:
from ..services.chroma_client import ChromaClient
cli = ChromaClient(s.CHROMA_URL, getattr(s, "CHROMA_API_KEY", None))
ok = await cli.heartbeat()
if not ok:
return Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
return {"ok": True}
(Add) api/app/services/auth.py — auth middleware (full)
Purpose: Bearer auth with AUTH_MODE = none | static | jwks. JWKS keys are cached. On success, claims are stored in request.state.user.
# api/app/services/auth.py
from typing import List, Dict, Any, Optional
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response, status
from jose import jwt
import httpx, time
from ..config import Settings
def auth_exempt_paths(default_extra: Optional[List[str]] = None) -> List[str]:
base = ["/openapi.json", "/docs", "/redoc"]
if default_extra:
base.extend(default_extra)
return base
class _JWKSCache:
def __init__(self, ttl: int):
self.ttl = ttl
self.cached_at = 0
self.keys = None
async def get(self, url: str):
now = int(time.time())
if self.keys and (now - self.cached_at) < self.ttl:
return self.keys
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(url)
r.raise_for_status()
data = r.json()
self.keys = data.get("keys", [])
self.cached_at = now
return self.keys
class AuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app, settings: Settings, exempt_paths: Optional[List[str]] = None):
super().__init__(app)
self.s = settings
self.exempt = set(exempt_paths or [])
self.jwks_cache = _JWKSCache(self.s.JWT_CACHE_SECONDS)
async def dispatch(self, request: Request, call_next):
path = request.url.path
if path in self.exempt:
return await call_next(request)
mode = self.s.AUTH_MODE.lower()
if mode == "none":
request.state.user = {"sub": "anonymous"}
return await call_next(request)
authz = request.headers.get("Authorization", "")
if not authz.startswith("Bearer "):
return Response(status_code=status.HTTP_401_UNAUTHORIZED)
token = authz.split(" ", 1)[1].strip()
try:
claims = await self._verify(token)
except Exception:
return Response(status_code=status.HTTP_401_UNAUTHORIZED)
# Optional iss/aud checks
if self.s.JWT_ISSUER and claims.get("iss") != self.s.JWT_ISSUER:
return Response(status_code=status.HTTP_403_FORBIDDEN)
if self.s.JWT_AUDIENCE:
aud = claims.get("aud")
if (isinstance(aud, str) and aud != self.s.JWT_AUDIENCE) or (isinstance(aud, list) and self.s.JWT_AUDIENCE not in aud):
return Response(status_code=status.HTTP_403_FORBIDDEN)
request.state.user = claims
return await call_next(request)
async def _verify(self, token: str) -> Dict[str, Any]:
mode = self.s.AUTH_MODE.lower()
options = {"verify_aud": False} # We check 'aud' manually.
if mode == "static":
if not self.s.JWT_SECRET:
raise RuntimeError("JWT_SECRET not configured")
return jwt.decode(
token,
self.s.JWT_SECRET,
algorithms=["HS256"],
options=options,
leeway=self.s.JWT_LEEWAY_SECONDS,
)
elif mode == "jwks":
if not self.s.JWKS_URL:
raise RuntimeError("JWKS_URL not configured")
header = jwt.get_unverified_header(token)
kid = header.get("kid")
keys = await self.jwks_cache.get(self.s.JWKS_URL)
key = next((k for k in keys if k.get("kid") == kid), None)
if not key:
raise RuntimeError("JWKS kid not found")
return jwt.decode(
token,
key,
algorithms=[header.get("alg", "RS256")],
options=options,
leeway=self.s.JWT_LEEWAY_SECONDS,
)
else:
raise RuntimeError(f"Unknown AUTH_MODE: {mode}")
(Add) api/app/repos/idempotency_repo.py — DB DAO (full)
Purpose: Persist & fetch idempotent responses keyed by Idempotency-Key.
# api/app/repos/idempotency_repo.py
from typing import Optional, Any, Dict
from sqlalchemy import text
from ..db import engine
def get_by_key(key: str) -> Optional[Dict[str, Any]]:
sql = text("""
SELECT key, req_hash, status_code, resp_body_json, created_at, ttl_at
FROM public.idempotency_keys
WHERE key = :key
LIMIT 1
""")
with engine.begin() as conn:
row = conn.execute(sql, {"key": key}).mappings().first()
return dict(row) if row else None
def upsert(
key: str, req_hash: str, status_code: int, resp_body_json: Dict[str, Any], ttl_seconds: int
):
sql = text("""
INSERT INTO public.idempotency_keys (key, req_hash, status_code, resp_body_json, ttl_at)
VALUES (:key, :req_hash, :status_code, CAST(:resp::jsonb), NOW() + (:ttl || ' seconds')::interval)
ON CONFLICT (key) DO UPDATE SET
req_hash = EXCLUDED.req_hash,
status_code = EXCLUDED.status_code,
resp_body_json = EXCLUDED.resp_body_json,
ttl_at = EXCLUDED.ttl_at
""")
with engine.begin() as conn:
conn.execute(sql, {
"key": key,
"req_hash": req_hash,
"status_code": status_code,
"resp": json_dump(resp_body_json),
"ttl": ttl_seconds,
})
def json_dump(obj: Any) -> str:
import json
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
(Add) api/app/services/idempotency.py — idempotency middleware (full)
Purpose: For POST/PUT with Idempotency-Key: replay stored response or store the first successful JSON response. Conflicts if payload differs.
# api/app/services/idempotency.py
import hashlib
import json
from typing import Optional
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response, status
from starlette.responses import JSONResponse
from ..config import Settings
from ..repos import idempotency_repo as repo
SIDE_EFFECT_METHODS = {"POST", "PUT"}
def _hash_request(method: str, path: str, body: bytes, subject: str) -> str:
h = hashlib.sha256()
h.update(method.encode("utf-8"))
h.update(b"|")
h.update(path.encode("utf-8"))
h.update(b"|")
h.update(body or b"")
h.update(b"|")
h.update(subject.encode("utf-8") if subject else b"")
return h.hexdigest()
class IdempotencyMiddleware(BaseHTTPMiddleware):
def __init__(self, app, settings: Settings):
super().__init__(app)
self.s = settings
async def dispatch(self, request: Request, call_next):
if request.method not in SIDE_EFFECT_METHODS:
return await call_next(request)
key = request.headers.get("Idempotency-Key")
if not key:
return await call_next(request)
# Read & re-supply body (Starlette requires this pattern)
raw = await request.body()
async def receive():
return {"type": "http.request", "body": raw, "more_body": False}
request._receive = receive # re-feed body to downstream
subject = ""
if hasattr(request.state, "user") and isinstance(request.state.user, dict):
subject = request.state.user.get("sub") or ""
req_hash = _hash_request(request.method, request.url.path, raw, subject)
existing = repo.get_by_key(key)
if existing:
if existing["req_hash"] != req_hash:
problem = {
"title": "Conflict",
"detail": "Idempotency-Key already used with different request payload.",
"status": 409,
"type": "about:blank",
}
return JSONResponse(problem, status_code=status.HTTP_409_CONFLICT)
# Replay stored response
resp = JSONResponse(existing["resp_body_json"], status_code=existing["status_code"])
resp.headers["Idempotency-Status"] = "replayed"
return resp
# First-time: run handler
response: Response = await call_next(request)
# Persist only JSON responses
try:
if "application/json" in (response.headers.get("content-type") or ""):
# consume body iterator, then reconstruct response
resp_bytes = b""
async for chunk in response.body_iterator:
resp_bytes += chunk
try:
resp_json = json.loads(resp_bytes.decode("utf-8") or "{}")
except Exception:
resp_json = {"_raw": resp_bytes.decode("utf-8", "ignore")}
new_resp = JSONResponse(resp_json, status_code=response.status_code, headers=dict(response.headers))
repo.upsert(key, req_hash, response.status_code, resp_json, self.s.IDEMPOTENCY_TTL_SECONDS)
return new_resp
except Exception:
# If persistence fails, still return the original response
pass
return response
(Add) api/sql/050_idempotency_keys.sql — DDL (full)
Purpose: Table for idempotency persistence.
-- api/sql/050_idempotency_keys.sql
CREATE TABLE IF NOT EXISTS public.idempotency_keys (
key TEXT PRIMARY KEY,
req_hash TEXT NOT NULL,
status_code INTEGER NOT NULL,
resp_body_json JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ttl_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_idempotency_ttl ON public.idempotency_keys (ttl_at);
(Add) k8s/base/portal-api/pdb.yaml — PodDisruptionBudget (full)
Purpose: Ensure at least one Pod remains available during disruptions.
# k8s/base/portal-api/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: portal-api-pdb
spec:
minAvailable: 1
selector:
matchLabels:
app: portal-api
(Add) k8s/overlays/dev/patch-api-prestop.yaml — graceful shutdown (dev)
Purpose: Give time for in-flight requests to drain.
# k8s/overlays/dev/patch-api-prestop.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: portal-api
spec:
template:
spec:
terminationGracePeriodSeconds: 30
containers:
- name: portal-api
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
(Add) k8s/overlays/prod/patch-api-prestop.yaml — graceful shutdown (prod)
Purpose: Same as dev, for production overlay.
# k8s/overlays/prod/patch-api-prestop.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: portal-api
spec:
template:
spec:
terminationGracePeriodSeconds: 30
containers:
- name: portal-api
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
Quick summary of what the code does (one-liners)
- Health endpoints:
/health/live(process alive),/health/ready(DB + optional Chroma),/health/startup(init done). - Audit middleware: Adds/echoes
X-Request-ID; logs JSON Lines with latency and status. - Auth middleware: Pluggable
none | static | jwksBearer auth; exposes user claims underrequest.state.user. - Idempotency middleware: For POST/PUT, deduplicates by
Idempotency-Key(replays or stores JSON responses). - Chroma heartbeat: Simple reachability check used by readiness.
- Status summary: Returns build/runtime meta (version, git SHA, env, auth mode).
- K8s overlays: Probes wired to health endpoints; graceful preStop; env variables split dev/prod; PDB to prevent downtime.
That’s everything needed for the offshore team to implement Phase J exactly as designed.
Below is a ready-to-run sample dataset and test procedure. It focuses on validating what we’ve completed so far—especially Phase H (Chroma upsert) and Phase J (K8s dev optimization: health/auth/audit/idempotency/config)—without depending on earlier phases. It uses direct SQL seeding into portal_chroma_doc, so you can exercise /chroma/upsert, /chroma/search, health checks, auth, and idempotency end-to-end.
0) Prerequisites & environment
- Database reachable and migrated through
030_portal_chroma_doc.sql(plus the Phase-J DDL050_idempotency_keys.sqlif you enabled idempotency persistence). - API running with:
APP_ENV=devAUTH_MODE=static(for simple testing) andJWT_SECRET=devsecretCHROMA_URLpointing to your dev Chroma (or keepHEALTH_CHECK_CHROMA=falseif you don’t have Chroma yet)
- Optional but recommended:
ENABLE_DEV_ENDPOINTS=true(so/chroma/upsertis enabled)HEALTH_CHECK_CHROMA=trueonce your Chroma is up (to validate readiness behavior)
Example .env (adjust ports/URLs to your setup)
APP_ENV=dev
LOG_LEVEL=INFO
ENABLE_DEV_ENDPOINTS=true
AUTH_MODE=static
JWT_SECRET=devsecret
HEALTH_CHECK_CHROMA=false
CHROMA_URL=http://localhost:8001
IDEMPOTENCY_TTL_SECONDS=86400
Get a test JWT (HS256)
Python one-liner (run locally):
import jwt, time
print(jwt.encode(
{"sub":"tester@example.com","iss":"local","aud":"portal-api","iat":int(time.time()),"exp":int(time.time())+3600},
"devsecret", algorithm="HS256"))
Use the printed token as Authorization: Bearer <token> in all protected calls.
1) Seed minimal test data (direct SQL)
We’ll insert two queued docs into public.portal_chroma_doc:
- a field doc for
sale.order::partner_id(Japanese text), - a view_common doc for
sale.orderaction (Japanese help/purpose).
This bypasses earlier phases so you can test
/chroma/upsertand/chroma/searchdirectly.
-- Minimal, self-contained seed for Phase H tests.
-- Adjust schema/table names if your schema is not "public".
-- 1) Ensure idempotency table exists (Phase J)
CREATE TABLE IF NOT EXISTS public.idempotency_keys (
key TEXT PRIMARY KEY,
req_hash TEXT NOT NULL,
status_code INTEGER NOT NULL,
resp_body_json JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ttl_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_idempotency_ttl ON public.idempotency_keys (ttl_at);
-- 2) Seed two queued docs (field + view_common) in Japanese
INSERT INTO public.portal_chroma_doc (
doc_id, natural_key, lang, collection, doc_text,
entity, model, model_table, field_name, action_xmlid, target,
metadata, status, updated_at
) VALUES
(
'field::sale.order::partner_id::ja',
'field::sale.order::partner_id',
'ja',
'portal_field_ja',
'顧客(partner_id):受注に関連付く取引先。見積/受注の相手先を指定します。',
'field',
'sale.order',
'sale_order',
'partner_id',
NULL,
NULL,
jsonb_build_object(
'entity','field',
'model','sale.order',
'natural_key','field::sale.order::partner_id',
'collection','portal_field_ja',
'lang','ja'
),
'queued',
NOW()
),
(
'view_common::sale.action_quotations::ja',
'view_common::sale.action_quotations',
'ja',
'portal_view_common_ja',
'見積一覧の画面です。見積の作成・確認・検索が行えます。AI目的:営業担当の素早い見積管理を支援。',
'view_common',
'sale.order',
'sale_order',
NULL,
'sale.action_quotations',
'help',
jsonb_build_object(
'entity','view_common',
'model','sale.order',
'natural_key','view_common::sale.action_quotations',
'collection','portal_view_common_ja',
'lang','ja'
),
'queued',
NOW()
);
Validate the seed:
SELECT doc_id, status FROM public.portal_chroma_doc ORDER BY doc_id;
-- Expect both rows with status='queued'
2) Smoke test the health endpoints (Phase J)
# live (always 200)
curl -sS http://localhost:8000/health/live
# startup (200 after init)
curl -sS http://localhost:8000/health/startup
# ready (DB required; add Chroma heartbeat if HEALTH_CHECK_CHROMA=true)
curl -sS http://localhost:8000/health/ready
If HEALTH_CHECK_CHROMA=true and Chroma is down, /health/ready must return 503.
3) Test auth (Phase J)
Use your HS256 token:
TOKEN="<paste token>"
curl -i -H "Authorization: Bearer $TOKEN" http://localhost:8000/status/summary
- Without a token (or with a bad token) you should get 401.
- With wrong
iss/aud(if you enforce them) you should get 403.
4) Test idempotency (Phase J)
We’ll call /chroma/upsert twice with the same Idempotency-Key.
TOKEN="<paste token>"
# (A) First call: dry run (no DB/Chroma state change expected)
curl -i -X POST "http://localhost:8000/chroma/upsert" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test-upsert-001" \
-d '{"collections":["portal_field_ja","portal_view_common_ja"],"limit":100,"dry_run":true}'
# (B) Replay with identical body: expect same response + Idempotency-Status: replayed
curl -i -X POST "http://localhost:8000/chroma/upsert" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test-upsert-001" \
-d '{"collections":["portal_field_ja","portal_view_common_ja"],"limit":100,"dry_run":true}'
# (C) Conflict test: same key, different body -> 409
curl -i -X POST "http://localhost:8000/chroma/upsert" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test-upsert-001" \
-d '{"collections":["portal_field_ja"],"limit":50,"dry_run":true}'
Expected:
- (A)
200with counts in body (processed=2, upserted=0 for dry-run). - (B)
200+ headerIdempotency-Status: replayed. - (C)
409 ConflictProblem+JSON.
5) Test Chroma upsert end-to-end (Phase H)
For a real upsert, ensure your Chroma dev instance is reachable at
CHROMA_URL, then setdry_run=false.
TOKEN="<paste token>"
# (1) Real upsert
curl -i -X POST "http://localhost:8000/chroma/upsert" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: upsert-real-001" \
-d '{"collections":["portal_field_ja","portal_view_common_ja"],"limit":100,"dry_run":false}'
Verify DB state changed:
SELECT doc_id, status FROM public.portal_chroma_doc ORDER BY doc_id;
-- Expect both rows now 'upserted' (or 'failed' with last_error if something went wrong)
6) Test search (Phase H; the single prod-exposed endpoint)
Assuming /chroma/search is registered in your environment:
TOKEN="<paste token>"
curl -sS -X POST "http://localhost:8000/chroma/search" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"collection": "portal_field_ja",
"query": "顧客",
"top_k": 5,
"where": {"entity":"field","model":"sale.order"}
}' | jq .
Expect an array with your field doc among results.
(If embeddings or Chroma aren’t configured, results may be empty—validate that the request doesn’t error, and that upserted docs exist in DB.)
7) K8s-specific checks (Phase J acceptance)
- Probes
startupProbehits/health/startupreadinessProbehits/health/readylivenessProbehits/health/live- With
HEALTH_CHECK_CHROMA=true, stop your Chroma pod → readiness should flip to false.
- Rolling update – zero downtime
kubectl -n <ns> rollout restart deploy/portal-api kubectl -n <ns> rollout status deploy/portal-api- With
maxUnavailable: 0, verify no 5xx spikes in the ingress/API logs and continuous success on a looped GET to/health/live.
- With
- PDB & graceful shutdown
- Ensure PDB keeps at least one pod available.
- Confirm preStop + termination grace allow in-flight requests to complete (e.g., run a long request and restart).
8) Optional: full-pipeline via APIs (if you want to test earlier phases too)
If your dev environment has the earlier endpoints:
/extract/fieldand/or/extract/view_commonwith JA text present and EN empty./translate/runwith a smalllimitand provider configured (or a dev stub)./chroma/packageto queue intoportal_chroma_doc./chroma/upsert(as above)./chroma/searchto verify hits.
Re-run the same extract request to confirm no duplicate
pendingrows (idempotent aggregation).
9) Clean up (optional)
DELETE FROM public.portal_chroma_doc WHERE doc_id IN
('field::sale.order::partner_id::ja','view_common::sale.action_quotations::ja');
DELETE FROM public.idempotency_keys
WHERE created_at < NOW() - INTERVAL '1 hour';
What to look for (summary)
- Health:
/health/*behave as specified; readiness fails when Chroma (optionally) or DB is down. - Auth: 401 without token; 403 with wrong
iss/aud(if enforced); 200 with valid HS256 token. - Audit: Every response includes
X-Request-ID; logs show JSON lines with method/path/status/latency. - Idempotency: Same key + same payload → replayed; same key + different payload → 409.
- Upsert:
queued → upsertedafter real/chroma/upsert;dry_run=truedoes not change DB. - Search: Query returns your upserted JA docs (when Chroma + embeddings are correctly wired).
コメントを残す