いまのカレントは dev-portal-k8s/ 前提で、ステップごとにやります。
Step 1) APIコードを追加(CRUD + Swagger)
1-1. 依存を追加:api/requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.30.6
SQLAlchemy==2.0.36
psycopg2-binary==2.9.9
pydantic==2.8.2
python-dotenv==1.0.1
1-2. 設定:api/app/config.py
import os
from pydantic import BaseModel
class Settings(BaseModel):
database_url: str = os.getenv(
"DATABASE_URL",
"postgresql+psycopg2://dev:dev@postgres.infra.svc.cluster.local:5432/devportal",
)
db_schema: str = os.getenv("DB_SCHEMA", "public")
table_fields_name: str = os.getenv("TABLE_FIELDS_NAME", "portal_fields")
primary_key_field: str = os.getenv("PRIMARY_KEY_FIELD", "id")
settings = Settings()
1-3. DBユーティリティ:api/app/db.py
from contextlib import contextmanager
from typing import Any, Dict
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
from .config import settings
engine = create_engine(settings.database_url, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
metadata = MetaData(schema=settings.db_schema)
fields_table: Table | None = None
def reflect_tables() -> None:
global fields_table
metadata.reflect(bind=engine, only=[settings.table_fields_name])
fields_table = Table(settings.table_fields_name, metadata, autoload_with=engine)
@contextmanager
def session_scope():
s = SessionLocal()
try:
yield s
s.commit()
except Exception:
s.rollback()
raise
finally:
s.close()
def row_to_dict(row: Any) -> Dict[str, Any]:
if row is None:
return {}
try:
return dict(row._mapping) # type: ignore[attr-defined]
except Exception:
return {}
1-4. スキーマ:api/app/schemas.py
from typing import Optional
from pydantic import BaseModel, ConfigDict
class FieldCreate(BaseModel):
model_config = ConfigDict(extra="allow") # 任意の列を許可(ポータルの列に合わせる)
class FieldUpdate(BaseModel):
model_config = ConfigDict(extra="allow")
class QueryParams(BaseModel):
q: Optional[str] = None
model: Optional[str] = None
field_name: Optional[str] = None
limit: int = 50
offset: int = 0
1-5. ルータ:api/app/routers/fields.py
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query, status
from sqlalchemy import select, and_, or_, text, insert, update, delete, cast
from sqlalchemy.types import Text as SqlText
from ..db import session_scope, reflect_tables, fields_table, row_to_dict
from ..config import settings
from ..schemas import FieldCreate, FieldUpdate
router = APIRouter(prefix="/fields", tags=["fields"])
@router.get("/health")
async def health() -> Dict[str, str]:
return {"status": "ok"}
@router.get("/schema")
async def get_schema() -> Dict[str, Any]:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
cols = [
{"name": c.name, "type": str(c.type), "nullable": c.nullable, "primary_key": c.primary_key}
for c in fields_table.columns # type: ignore
]
return {"schema": settings.db_schema, "table": settings.table_fields_name, "columns": cols}
@router.get("/")
async def list_fields(
q: Optional[str] = Query(default=None, description="free text over model/field_name + JSONB"),
model: Optional[str] = None,
field_name: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> Dict[str, Any]:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
conds = []
if model and "model" in fields_table.c: # type: ignore
conds.append(fields_table.c.model == model) # type: ignore
if field_name and "field_name" in fields_table.c: # type: ignore
conds.append(fields_table.c.field_name == field_name) # type: ignore
if q:
likes = []
for key in ("model", "field_name"):
if key in fields_table.c: # type: ignore
likes.append(fields_table.c[key].ilike(f"%{q}%")) # type: ignore
for key in ("label_i18n", "help_i18n", "placeholder_i18n", "unit_i18n", "selection_items"):
if key in fields_table.c: # type: ignore
likes.append(cast(fields_table.c[key], SqlText).ilike(f"%{q}%")) # type: ignore
if likes:
conds.append(or_(*likes))
stmt = select(fields_table) # type: ignore
if conds:
from sqlalchemy import and_
stmt = stmt.where(and_(*conds))
order_col = fields_table.c.get("updated_at", fields_table.c.get(settings.primary_key_field)) # type: ignore
stmt = stmt.order_by(order_col.desc()).limit(limit).offset(offset)
with session_scope() as s:
rows = s.execute(stmt).all()
total = s.execute(select(text("count(1)")).select_from(fields_table)).scalar_one() # type: ignore
return {"items": [row_to_dict(r) for r in rows], "total": total}
@router.get("/{item_id}")
async def get_field(item_id: int) -> Dict[str, Any]:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
pk = fields_table.c.get(settings.primary_key_field) # type: ignore
with session_scope() as s:
row = s.execute(select(fields_table).where(pk == item_id)).first() # type: ignore
if not row:
raise HTTPException(404, "Not found")
return row_to_dict(row)
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_field(payload: FieldCreate) -> Dict[str, Any]:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
data = payload.model_dump()
data.pop(settings.primary_key_field, None)
valid = set(c.name for c in fields_table.columns) # type: ignore
insert_data = {k: v for k, v in data.items() if k in valid}
if not insert_data:
raise HTTPException(400, "No valid columns provided")
with session_scope() as s:
row = s.execute(insert(fields_table).values(**insert_data).returning(*fields_table.c)).first() # type: ignore
return row_to_dict(row)
@router.put("/{item_id}")
async def update_field(item_id: int, payload: FieldUpdate) -> Dict[str, Any]:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
data = payload.model_dump()
data.pop(settings.primary_key_field, None)
valid = set(c.name for c in fields_table.columns) # type: ignore
update_data = {k: v for k, v in data.items() if k in valid}
if not update_data:
raise HTTPException(400, "No valid columns to update")
pk = fields_table.c.get(settings.primary_key_field) # type: ignore
with session_scope() as s:
exists = s.execute(select(pk).where(pk == item_id)).first()
if not exists:
raise HTTPException(404, "Not found")
row = s.execute(update(fields_table).where(pk == item_id).values(**update_data).returning(*fields_table.c)).first() # type: ignore
return row_to_dict(row)
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_field(item_id: int) -> None:
if fields_table is None:
raise HTTPException(500, "Fields table not initialized")
pk = fields_table.c.get(settings.primary_key_field) # type: ignore
with session_scope() as s:
s.execute(delete(fields_table).where(pk == item_id)) # type: ignore
return None
def init_on_startup():
# アプリ起動時に一度だけ反映
reflect_tables()
1-6. エントリポイント:api/app/main.py
from fastapi import FastAPI
from .routers import fields
from .routers.fields import init_on_startup
app = FastAPI(title="Dev Portal Backend", version="0.1.0", description="Fields metadata CRUD API")
app.include_router(fields.router)
@app.on_event("startup")
async def startup():
init_on_startup()
保存時は LF 改行にしてください(VSCode右下が「LF」)。
Step 2) コンテナをビルドして minikube へ
# カレント: dev-portal-k8s/
docker buildx build -t dev-portal-api:v1 -f api/Dockerfile . --load
PROFILE=$(kubectl config current-context)
minikube -p "$PROFILE" image load dev-portal-api:v1
Step 3) K8s マニフェストをDB接続対応に
3-1. API用 ConfigMap を追加:k8s/dev/api-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: dev-portal-config
namespace: dev
data:
DATABASE_URL: postgresql+psycopg2://dev:dev@postgres.infra.svc.cluster.local:5432/devportal
DB_SCHEMA: public
TABLE_FIELDS_NAME: portal_fields
PRIMARY_KEY_FIELD: id
3-2. Deployment に環境変数を注入(k8s/dev/api-deployment.yaml を編集)
spec.template.spec.containers[0] の下に envFrom を追加し、image を v1 に変更:
containers:
- name: api
image: dev-portal-api:v1
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: dev-portal-config
ports:
- name: http
containerPort: 8000
readinessProbe:
httpGet: { path: /fields/health, port: http }
initialDelaySeconds: 5
periodSeconds: 5
livenessProbe:
httpGet: { path: /fields/health, port: http }
initialDelaySeconds: 10
periodSeconds: 10
Step 4) 適用 & ロールアウト & 動作確認
kubectl -n dev apply -f k8s/dev/api-configmap.yaml
kubectl -n dev apply -f k8s/dev/api-deployment.yaml
kubectl -n dev rollout status deploy/dev-portal-api --timeout=180s
# まずはService直で
kubectl -n dev port-forward svc/dev-portal-api 8000:8000 >/tmp/pf_api.log 2>&1 &
sleep 1
curl -s http://127.0.0.1:8000/fields/health
Swagger(/docs)
# Ingress経由(既にIngress設定済みの前提)
PROFILE=$(kubectl config current-context)
MINIKUBE_IP=$(minikube -p "$PROFILE" ip)
NODE_PORT=$(kubectl -n ingress-nginx get svc ingress-nginx-controller -o jsonpath='{.spec.ports[?(@.port==80)].nodePort}')
# Health
curl -s -H 'Host: api.local' "http://$MINIKUBE_IP:$NODE_PORT/fields/health"
# ブラウザで
# http://api.local:<NODE_PORT>/docs
Step 5) 動作テスト(POST → GET)
Swagger の POST /fields に以下を貼って実行:
{
"model": "res.partner",
"model_table": "res_partner",
"field_name": "x_customer_rank",
"ttype": "selection",
"relation_model": null,
"label_i18n": {"ja_JP":"顧客ランク","en_US":"Customer Rank"},
"help_i18n": {"ja_JP":"ランクに応じて自動割引を適用します","en_US":"Apply auto discount by rank"},
"placeholder_i18n": {"ja_JP":"ランクを選択してください","en_US":"Select rank"},
"translate": false,
"selection_items": [
{"key":"bronze","label_i18n":{"ja_JP":"ブロンズ","en_US":"Bronze"}},
{"key":"silver","label_i18n":{"ja_JP":"シルバー","en_US":"Silver"}},
{"key":"gold","label_i18n":{"ja_JP":"ゴールド","en_US":"Gold"}}
],
"default_value": "bronze",
"widget": "radio",
"ui_control": {"required":true,"invisible":false,"readonly":false},
"origin": "portal",
"code_status": "planned",
"notes": "顧客ランクのselectionサンプル"
}
成功後、GET /fields と GET /fields/{id} で確認できます。
方式B:Ingress経由(Hostベース, 8080)
前提:Windowsの hosts に 127.0.0.1 api.local が入っていること。
- 前面で Ingress を 8080 に転送
kubectl -n ingress-nginx port-forward svc/ingress-nginx-controller 8080:80
- 別ターミナル/ブラウザで確認
# 動作確認(Hostヘッダ必須)
curl -I -H 'Host: api.local' http://127.0.0.1:8080/docs
# ブラウザ
http://api.local:8080/docs
NodePort 直(
http://api.local:<NODE_PORT>/docs)は Windows→WSL2 のネットワーク分離で届かないことが多いです。開発中は B の 8080 転送が一番ラクです。
コメントを残す