Milestone 4 — Fields CRUD + Swagger に進めます。

いまのカレントは dev-portal-k8s/ 前提で、ステップごとにやります。


Step 1) APIコードを追加(CRUD + Swagger)

1-1. 依存を追加:api/requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.30.6
SQLAlchemy==2.0.36
psycopg2-binary==2.9.9
pydantic==2.8.2
python-dotenv==1.0.1

1-2. 設定:api/app/config.py

import os
from pydantic import BaseModel

class Settings(BaseModel):
    database_url: str = os.getenv(
        "DATABASE_URL",
        "postgresql+psycopg2://dev:dev@postgres.infra.svc.cluster.local:5432/devportal",
    )
    db_schema: str = os.getenv("DB_SCHEMA", "public")
    table_fields_name: str = os.getenv("TABLE_FIELDS_NAME", "portal_fields")
    primary_key_field: str = os.getenv("PRIMARY_KEY_FIELD", "id")

settings = Settings()

1-3. DBユーティリティ:api/app/db.py

from contextlib import contextmanager
from typing import Any, Dict
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
from .config import settings

engine = create_engine(settings.database_url, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
metadata = MetaData(schema=settings.db_schema)
fields_table: Table | None = None

def reflect_tables() -> None:
    global fields_table
    metadata.reflect(bind=engine, only=[settings.table_fields_name])
    fields_table = Table(settings.table_fields_name, metadata, autoload_with=engine)

@contextmanager
def session_scope():
    s = SessionLocal()
    try:
        yield s
        s.commit()
    except Exception:
        s.rollback()
        raise
    finally:
        s.close()

def row_to_dict(row: Any) -> Dict[str, Any]:
    if row is None:
        return {}
    try:
        return dict(row._mapping)  # type: ignore[attr-defined]
    except Exception:
        return {}

1-4. スキーマ:api/app/schemas.py

from typing import Optional
from pydantic import BaseModel, ConfigDict

class FieldCreate(BaseModel):
    model_config = ConfigDict(extra="allow")  # 任意の列を許可(ポータルの列に合わせる)

class FieldUpdate(BaseModel):
    model_config = ConfigDict(extra="allow")

class QueryParams(BaseModel):
    q: Optional[str] = None
    model: Optional[str] = None
    field_name: Optional[str] = None
    limit: int = 50
    offset: int = 0

1-5. ルータ:api/app/routers/fields.py

from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query, status
from sqlalchemy import select, and_, or_, text, insert, update, delete, cast
from sqlalchemy.types import Text as SqlText
from ..db import session_scope, reflect_tables, fields_table, row_to_dict
from ..config import settings
from ..schemas import FieldCreate, FieldUpdate

router = APIRouter(prefix="/fields", tags=["fields"])

@router.get("/health")
async def health() -> Dict[str, str]:
    return {"status": "ok"}

@router.get("/schema")
async def get_schema() -> Dict[str, Any]:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")
    cols = [
        {"name": c.name, "type": str(c.type), "nullable": c.nullable, "primary_key": c.primary_key}
        for c in fields_table.columns  # type: ignore
    ]
    return {"schema": settings.db_schema, "table": settings.table_fields_name, "columns": cols}

@router.get("/")
async def list_fields(
    q: Optional[str] = Query(default=None, description="free text over model/field_name + JSONB"),
    model: Optional[str] = None,
    field_name: Optional[str] = None,
    limit: int = 50,
    offset: int = 0,
) -> Dict[str, Any]:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")

    conds = []
    if model and "model" in fields_table.c:  # type: ignore
        conds.append(fields_table.c.model == model)  # type: ignore
    if field_name and "field_name" in fields_table.c:  # type: ignore
        conds.append(fields_table.c.field_name == field_name)  # type: ignore

    if q:
        likes = []
        for key in ("model", "field_name"):
            if key in fields_table.c:  # type: ignore
                likes.append(fields_table.c[key].ilike(f"%{q}%"))  # type: ignore
        for key in ("label_i18n", "help_i18n", "placeholder_i18n", "unit_i18n", "selection_items"):
            if key in fields_table.c:  # type: ignore
                likes.append(cast(fields_table.c[key], SqlText).ilike(f"%{q}%"))  # type: ignore
        if likes:
            conds.append(or_(*likes))

    stmt = select(fields_table)  # type: ignore
    if conds:
        from sqlalchemy import and_
        stmt = stmt.where(and_(*conds))
    order_col = fields_table.c.get("updated_at", fields_table.c.get(settings.primary_key_field))  # type: ignore
    stmt = stmt.order_by(order_col.desc()).limit(limit).offset(offset)

    with session_scope() as s:
        rows = s.execute(stmt).all()
        total = s.execute(select(text("count(1)")).select_from(fields_table)).scalar_one()  # type: ignore
        return {"items": [row_to_dict(r) for r in rows], "total": total}

@router.get("/{item_id}")
async def get_field(item_id: int) -> Dict[str, Any]:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")
    pk = fields_table.c.get(settings.primary_key_field)  # type: ignore
    with session_scope() as s:
        row = s.execute(select(fields_table).where(pk == item_id)).first()  # type: ignore
        if not row:
            raise HTTPException(404, "Not found")
        return row_to_dict(row)

@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_field(payload: FieldCreate) -> Dict[str, Any]:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")
    data = payload.model_dump()
    data.pop(settings.primary_key_field, None)
    valid = set(c.name for c in fields_table.columns)  # type: ignore
    insert_data = {k: v for k, v in data.items() if k in valid}
    if not insert_data:
        raise HTTPException(400, "No valid columns provided")
    with session_scope() as s:
        row = s.execute(insert(fields_table).values(**insert_data).returning(*fields_table.c)).first()  # type: ignore
        return row_to_dict(row)

@router.put("/{item_id}")
async def update_field(item_id: int, payload: FieldUpdate) -> Dict[str, Any]:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")
    data = payload.model_dump()
    data.pop(settings.primary_key_field, None)
    valid = set(c.name for c in fields_table.columns)  # type: ignore
    update_data = {k: v for k, v in data.items() if k in valid}
    if not update_data:
        raise HTTPException(400, "No valid columns to update")
    pk = fields_table.c.get(settings.primary_key_field)  # type: ignore
    with session_scope() as s:
        exists = s.execute(select(pk).where(pk == item_id)).first()
        if not exists:
            raise HTTPException(404, "Not found")
        row = s.execute(update(fields_table).where(pk == item_id).values(**update_data).returning(*fields_table.c)).first()  # type: ignore
        return row_to_dict(row)

@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_field(item_id: int) -> None:
    if fields_table is None:
        raise HTTPException(500, "Fields table not initialized")
    pk = fields_table.c.get(settings.primary_key_field)  # type: ignore
    with session_scope() as s:
        s.execute(delete(fields_table).where(pk == item_id))  # type: ignore
        return None

def init_on_startup():
    # アプリ起動時に一度だけ反映
    reflect_tables()

1-6. エントリポイント:api/app/main.py

from fastapi import FastAPI
from .routers import fields
from .routers.fields import init_on_startup

app = FastAPI(title="Dev Portal Backend", version="0.1.0", description="Fields metadata CRUD API")
app.include_router(fields.router)

@app.on_event("startup")
async def startup():
    init_on_startup()

保存時は LF 改行にしてください(VSCode右下が「LF」)。


Step 2) コンテナをビルドして minikube へ

# カレント: dev-portal-k8s/
docker buildx build -t dev-portal-api:v1 -f api/Dockerfile . --load

PROFILE=$(kubectl config current-context)
minikube -p "$PROFILE" image load dev-portal-api:v1

Step 3) K8s マニフェストをDB接続対応に

3-1. API用 ConfigMap を追加:k8s/dev/api-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: dev-portal-config
  namespace: dev
data:
  DATABASE_URL: postgresql+psycopg2://dev:dev@postgres.infra.svc.cluster.local:5432/devportal
  DB_SCHEMA: public
  TABLE_FIELDS_NAME: portal_fields
  PRIMARY_KEY_FIELD: id

3-2. Deployment に環境変数を注入(k8s/dev/api-deployment.yaml を編集)

spec.template.spec.containers[0] の下に envFrom を追加し、image を v1 に変更:

      containers:
      - name: api
        image: dev-portal-api:v1
        imagePullPolicy: IfNotPresent
        envFrom:
        - configMapRef:
            name: dev-portal-config
        ports:
        - name: http
          containerPort: 8000
        readinessProbe:
          httpGet: { path: /fields/health, port: http }
          initialDelaySeconds: 5
          periodSeconds: 5
        livenessProbe:
          httpGet: { path: /fields/health, port: http }
          initialDelaySeconds: 10
          periodSeconds: 10

Step 4) 適用 & ロールアウト & 動作確認

kubectl -n dev apply -f k8s/dev/api-configmap.yaml
kubectl -n dev apply -f k8s/dev/api-deployment.yaml
kubectl -n dev rollout status deploy/dev-portal-api --timeout=180s

# まずはService直で
kubectl -n dev port-forward svc/dev-portal-api 8000:8000 >/tmp/pf_api.log 2>&1 &
sleep 1
curl -s http://127.0.0.1:8000/fields/health

Swagger(/docs)

# Ingress経由(既にIngress設定済みの前提)
PROFILE=$(kubectl config current-context)
MINIKUBE_IP=$(minikube -p "$PROFILE" ip)
NODE_PORT=$(kubectl -n ingress-nginx get svc ingress-nginx-controller -o jsonpath='{.spec.ports[?(@.port==80)].nodePort}')

# Health
curl -s -H 'Host: api.local' "http://$MINIKUBE_IP:$NODE_PORT/fields/health"

# ブラウザで
#   http://api.local:<NODE_PORT>/docs

Step 5) 動作テスト(POST → GET)

Swagger の POST /fields に以下を貼って実行:

{
  "model": "res.partner",
  "model_table": "res_partner",
  "field_name": "x_customer_rank",
  "ttype": "selection",
  "relation_model": null,
  "label_i18n": {"ja_JP":"顧客ランク","en_US":"Customer Rank"},
  "help_i18n": {"ja_JP":"ランクに応じて自動割引を適用します","en_US":"Apply auto discount by rank"},
  "placeholder_i18n": {"ja_JP":"ランクを選択してください","en_US":"Select rank"},
  "translate": false,
  "selection_items": [
    {"key":"bronze","label_i18n":{"ja_JP":"ブロンズ","en_US":"Bronze"}},
    {"key":"silver","label_i18n":{"ja_JP":"シルバー","en_US":"Silver"}},
    {"key":"gold","label_i18n":{"ja_JP":"ゴールド","en_US":"Gold"}}
  ],
  "default_value": "bronze",
  "widget": "radio",
  "ui_control": {"required":true,"invisible":false,"readonly":false},
  "origin": "portal",
  "code_status": "planned",
  "notes": "顧客ランクのselectionサンプル"
}

成功後、GET /fieldsGET /fields/{id} で確認できます。

方式B:Ingress経由(Hostベース, 8080)

前提:Windowsの hosts127.0.0.1 api.local が入っていること。

  1. 前面で Ingress を 8080 に転送
kubectl -n ingress-nginx port-forward svc/ingress-nginx-controller 8080:80
  1. 別ターミナル/ブラウザで確認
# 動作確認(Hostヘッダ必須)
curl -I -H 'Host: api.local' http://127.0.0.1:8080/docs
# ブラウザ
http://api.local:8080/docs

NodePort 直(http://api.local:<NODE_PORT>/docs)は Windows→WSL2 のネットワーク分離で届かないことが多いです。開発中は B の 8080 転送が一番ラクです。


Comments

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です