1) 追加/変更する Python ファイル
置き場所はすべて
api/app/配下(あなたのリポ構成に合わせたもの)。
既存の FastAPI プロジェクトにそのまま追加してください。
1-1. api/app/services/embeddings.py
from __future__ import annotations
import os
from typing import List
from openai import OpenAI
_client = None
def _client_get() -> OpenAI:
global _client
if _client is None:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set")
_client = OpenAI(api_key=api_key)
return _client
def embed_texts(texts: List[str]) -> List[List[float]]:
"""
OpenAI Embeddings API を使ってテキスト配列をベクトル化する。
"""
model = os.getenv("OPENAI_EMBEDDINGS_MODEL", "text-embedding-3-large")
if not texts:
return []
resp = _client_get().embeddings.create(model=model, input=texts)
return [d.embedding for d in resp.data]
1-2. api/app/services/chroma_client.py
from __future__ import annotations
import os
import chromadb
def get_client():
"""
CHROMA_URL= http://chromadb.infra.svc.cluster.local:8000 の想定。
未設定ならローカルの in-process クライアント(デバッグ用)。
"""
url = os.getenv("CHROMA_URL")
if not url:
return chromadb.Client()
# 例: http://host:8000 -> host, 8000 を取り出す
host = url.split("://", 1)[1].split(":", 1)[0]
port = int(url.rsplit(":", 1)[1])
return chromadb.HttpClient(host=host, port=port)
def get_collection(name: str = "portal_fields"):
client = get_client()
try:
return client.get_collection(name)
except Exception:
# cosine 指定(任意)
return client.create_collection(name=name, metadata={"hnsw:space": "cosine"})
1-3. api/app/services/chroma_export.py
from __future__ import annotations
from typing import Any, Dict, List, Optional
from sqlalchemy import select, text as sql_text
from .chroma_client import get_collection
from .embeddings import embed_texts
from .. import db
def _compose_text(row: Dict[str, Any]) -> str:
"""
portal_fields の1行を、埋め込み対象テキストに整形。
i18n は ja_JP / en_US を両方展開する。
"""
parts: List[str] = []
def add(label: str, v: Any):
if not v:
return
if isinstance(v, dict):
ja = v.get("ja_JP")
en = v.get("en_US")
if ja:
parts.append(f"{label} (ja): {ja}")
if en:
parts.append(f"{label} (en): {en}")
else:
parts.append(f"{label}: {v}")
add("label", row.get("label_i18n"))
add("help", row.get("help_i18n"))
add("placeholder", row.get("placeholder_i18n"))
add("unit", row.get("unit_i18n"))
add("notes", row.get("notes_i18n") or {}) # ここで notes_i18n を採用
# selection_items の各 label_i18n を列挙
si = row.get("selection_items") or []
if isinstance(si, list):
for it in si:
key = it.get("key")
lab = it.get("label_i18n") or {}
j = lab.get("ja_JP")
e = lab.get("en_US")
if j or e:
parts.append(f"selection[{key}] (ja): {j or ''}")
parts.append(f"selection[{key}] (en): {e or ''}")
# ttype / widget など短い属性も足す
for k in ("ttype", "widget"):
v = row.get(k)
if v:
parts.append(f"{k}: {v}")
return "\n".join(parts)
def export_portal_fields_to_chroma(since: Optional[str], limit: int = 200) -> Dict[str, Any]:
"""
portal_fields から doc+meta を作って Chroma に upsert。
`since` は updated_at の下限(文字列)。None の場合は指定なし。
"""
t = db.fields_table
stmt = select(t).limit(limit)
if since:
stmt = stmt.where(sql_text("updated_at > :since")).params(since=since)
conn = db.engine.connect()
try:
rows = conn.execute(stmt).mappings().all()
if not rows:
return {"exported": 0}
ids: List[str] = []
docs: List[str] = []
metas: List[Dict[str, Any]] = []
for r in rows:
rid = f"{r['model']}::{r['field_name']}"
ids.append(rid)
docs.append(_compose_text(r))
metas.append({
"model": r["model"],
"model_table": r["model_table"],
"field_name": r["field_name"],
"ttype": r["ttype"],
"origin": r.get("origin"),
"code_status": r.get("code_status"),
"updated_at": str(r.get("updated_at")),
})
# 埋め込み生成 → upsert
embs = embed_texts(docs)
col = get_collection("portal_fields")
col.upsert(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
return {"exported": len(ids)}
finally:
conn.close()
1-4. api/app/routers/chroma.py
from __future__ import annotations
from typing import Optional, Dict, Any
from fastapi import APIRouter, Query
from ..services.chroma_export import export_portal_fields_to_chroma
from ..services.chroma_client import get_collection
router = APIRouter(prefix="/fields", tags=["fields"])
@router.get("/export/chroma")
def export_chroma(
since: Optional[str] = Query(None, description="updated_at の下限(例: 2025-09-01T00:00:00)"),
limit: int = Query(200, ge=1, le=1000),
) -> Dict[str, Any]:
"""
portal_fields → ChromaDB へ upsert。
"""
return export_portal_fields_to_chroma(since=since, limit=limit)
@router.get("/search")
def search_fields(
q: str = Query(..., min_length=1),
k: int = Query(5, ge=1, le=20),
model: Optional[str] = Query(None, description="model でメタフィルタ(例: res.partner)"),
) -> Dict[str, Any]:
"""
RAG 検索(ベクトル検索)。メタデータで model をフィルタ可能。
"""
col = get_collection("portal_fields")
where = {"model": model} if model else None
res = col.query(query_texts=[q], n_results=k, where=where)
# Chroma の返り値を軽く整形
out = []
ids = res.get("ids", [[]])[0]
docs = res.get("documents", [[]])[0]
mets = res.get("metadatas", [[]])[0]
dists = res.get("distances", [[]])[0] if "distances" in res else [None] * len(ids)
for i in range(len(ids)):
out.append({
"id": ids[i],
"distance": dists[i],
"document": docs[i],
"metadata": mets[i],
})
return {"q": q, "k": k, "results": out}
1-5. api/app/main.py にルータ登録(抜粋)
# 既存の main.py のどこか(app = FastAPI() の後)に追加
from .routers import chroma as chroma_router
app.include_router(chroma_router.router)
2) 依存関係(requirements.txt)
api/requirements.txt に 以下2行が入っていること:
openai>=1.0.0
chromadb>=0.5.0
3) ChromaDB の K8s マニフェスト(infra namespace)
kubectl apply -f - で流せるよう、Deployment + Service を一つに。
apiVersion: apps/v1
kind: Deployment
metadata:
name: chromadb
namespace: infra
labels: { app: chromadb }
spec:
replicas: 1
selector: { matchLabels: { app: chromadb } }
template:
metadata: { labels: { app: chromadb } }
spec:
containers:
- name: chromadb
image: chromadb/chromadb:latest
ports:
- containerPort: 8000
env:
- name: ANONYMIZED_TELEMETRY
value: "False"
readinessProbe:
httpGet: { path: /api/v1/heartbeat, port: 8000 }
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet: { path: /api/v1/heartbeat, port: 8000 }
initialDelaySeconds: 10
periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
name: chromadb
namespace: infra
labels: { app: chromadb }
spec:
selector: { app: chromadb }
ports:
- name: http
port: 8000
targetPort: 8000
protocol: TCP
type: ClusterIP
4) 実行コマンド(順番にそのまま)
4-1. ChromaDB を起動(infra)
kubectl apply -f - <<'YAML'
# ここに上の YAML をそのまま貼る
YAML
kubectl -n infra rollout status deploy/chromadb
kubectl -n infra get svc chromadb
4-2. Dev-Portal API に環境変数を注入(dev)
# デプロイ名/コンテナ名を取得
DEPLOY=$(kubectl -n dev get deploy -o jsonpath='{.items[0].metadata.name}')
CN=$(kubectl -n dev get deploy $DEPLOY -o jsonpath='{.spec.template.spec.containers[0].name}')
# Chroma 接続URLと Embeddings モデルを設定
kubectl -n dev set env deploy/$DEPLOY CHROMA_URL=http://chromadb.infra.svc.cluster.local:8000
kubectl -n dev set env deploy/$DEPLOY OPENAI_EMBEDDINGS_MODEL=text-embedding-3-large
# OPENAI_API_KEY は既に Secret で注入済み想定(未設定なら注入→再起動)
4-3. イメージビルド → ロールアウト
# Minikube の Docker を使う(必要な場合)
eval $(minikube -p dev32 docker-env)
# API イメージをビルド(api/ 直下に Dockerfile がある前提)
cd /home/kenji/test_k8s/project_portal/dev-portal-k8s/dev-portal-k8s/api
DOCKER_BUILDKIT=0 docker build -t dev-portal-api:v11 .
# 差し替え & ロールアウト
kubectl -n dev set image deploy/$DEPLOY $CN=dev-portal-api:v11
kubectl -n dev rollout status deploy/$DEPLOY
5) 動作確認コマンド
5-1. エクスポート(Chroma へ投入)
# すべて投入(上限200行の例)
curl -s "http://api.local:8080/fields/export/chroma?since=1970-01-01T00:00:00&limit=200" | python3 -m json.tool
# => {"exported": N}
5-2. 検索(日本語でもOK)
curl -s "http://api.local:8080/fields/search?q=顧客ランク&k=5" | python3 -m json.tool
# model で絞り込み
curl -s "http://api.local:8080/fields/search?q=ランク&k=5&model=res.partner" | python3 -m json.tool
6) 参考(notes_i18n 列が未追加なら)
kubectl -n infra exec -it postgres-0 -- psql -U dev -d devportal -c \
"ALTER TABLE public.portal_fields ADD COLUMN IF NOT EXISTS notes_i18n JSONB;
CREATE INDEX IF NOT EXISTS idx_portal_fields_notes_i18n_gin ON public.portal_fiel
コメントを残す