M7 ChromaDB連携

1) 追加/変更する Python ファイル

置き場所はすべて api/app/ 配下(あなたのリポ構成に合わせたもの)。
既存の FastAPI プロジェクトにそのまま追加してください。

1-1. api/app/services/embeddings.py

from __future__ import annotations
import os
from typing import List
from openai import OpenAI

_client = None

def _client_get() -> OpenAI:
    global _client
    if _client is None:
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise RuntimeError("OPENAI_API_KEY is not set")
        _client = OpenAI(api_key=api_key)
    return _client

def embed_texts(texts: List[str]) -> List[List[float]]:
    """
    OpenAI Embeddings API を使ってテキスト配列をベクトル化する。
    """
    model = os.getenv("OPENAI_EMBEDDINGS_MODEL", "text-embedding-3-large")
    if not texts:
        return []
    resp = _client_get().embeddings.create(model=model, input=texts)
    return [d.embedding for d in resp.data]

1-2. api/app/services/chroma_client.py

from __future__ import annotations
import os
import chromadb

def get_client():
    """
    CHROMA_URL= http://chromadb.infra.svc.cluster.local:8000 の想定。
    未設定ならローカルの in-process クライアント(デバッグ用)。
    """
    url = os.getenv("CHROMA_URL")
    if not url:
        return chromadb.Client()

    # 例: http://host:8000 -> host, 8000 を取り出す
    host = url.split("://", 1)[1].split(":", 1)[0]
    port = int(url.rsplit(":", 1)[1])
    return chromadb.HttpClient(host=host, port=port)

def get_collection(name: str = "portal_fields"):
    client = get_client()
    try:
        return client.get_collection(name)
    except Exception:
        # cosine 指定(任意)
        return client.create_collection(name=name, metadata={"hnsw:space": "cosine"})

1-3. api/app/services/chroma_export.py

from __future__ import annotations
from typing import Any, Dict, List, Optional
from sqlalchemy import select, text as sql_text
from .chroma_client import get_collection
from .embeddings import embed_texts
from .. import db

def _compose_text(row: Dict[str, Any]) -> str:
    """
    portal_fields の1行を、埋め込み対象テキストに整形。
    i18n は ja_JP / en_US を両方展開する。
    """
    parts: List[str] = []

    def add(label: str, v: Any):
        if not v:
            return
        if isinstance(v, dict):
            ja = v.get("ja_JP")
            en = v.get("en_US")
            if ja:
                parts.append(f"{label} (ja): {ja}")
            if en:
                parts.append(f"{label} (en): {en}")
        else:
            parts.append(f"{label}: {v}")

    add("label", row.get("label_i18n"))
    add("help", row.get("help_i18n"))
    add("placeholder", row.get("placeholder_i18n"))
    add("unit", row.get("unit_i18n"))
    add("notes", row.get("notes_i18n") or {})   # ここで notes_i18n を採用

    # selection_items の各 label_i18n を列挙
    si = row.get("selection_items") or []
    if isinstance(si, list):
        for it in si:
            key = it.get("key")
            lab = it.get("label_i18n") or {}
            j = lab.get("ja_JP")
            e = lab.get("en_US")
            if j or e:
                parts.append(f"selection[{key}] (ja): {j or ''}")
                parts.append(f"selection[{key}] (en): {e or ''}")

    # ttype / widget など短い属性も足す
    for k in ("ttype", "widget"):
        v = row.get(k)
        if v:
            parts.append(f"{k}: {v}")

    return "\n".join(parts)

def export_portal_fields_to_chroma(since: Optional[str], limit: int = 200) -> Dict[str, Any]:
    """
    portal_fields から doc+meta を作って Chroma に upsert。
    `since` は updated_at の下限(文字列)。None の場合は指定なし。
    """
    t = db.fields_table
    stmt = select(t).limit(limit)
    if since:
        stmt = stmt.where(sql_text("updated_at > :since")).params(since=since)

    conn = db.engine.connect()
    try:
        rows = conn.execute(stmt).mappings().all()
        if not rows:
            return {"exported": 0}

        ids: List[str] = []
        docs: List[str] = []
        metas: List[Dict[str, Any]] = []

        for r in rows:
            rid = f"{r['model']}::{r['field_name']}"
            ids.append(rid)
            docs.append(_compose_text(r))
            metas.append({
                "model": r["model"],
                "model_table": r["model_table"],
                "field_name": r["field_name"],
                "ttype": r["ttype"],
                "origin": r.get("origin"),
                "code_status": r.get("code_status"),
                "updated_at": str(r.get("updated_at")),
            })

        # 埋め込み生成 → upsert
        embs = embed_texts(docs)
        col = get_collection("portal_fields")
        col.upsert(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
        return {"exported": len(ids)}
    finally:
        conn.close()

1-4. api/app/routers/chroma.py

from __future__ import annotations
from typing import Optional, Dict, Any
from fastapi import APIRouter, Query
from ..services.chroma_export import export_portal_fields_to_chroma
from ..services.chroma_client import get_collection

router = APIRouter(prefix="/fields", tags=["fields"])

@router.get("/export/chroma")
def export_chroma(
    since: Optional[str] = Query(None, description="updated_at の下限(例: 2025-09-01T00:00:00)"),
    limit: int = Query(200, ge=1, le=1000),
) -> Dict[str, Any]:
    """
    portal_fields → ChromaDB へ upsert。
    """
    return export_portal_fields_to_chroma(since=since, limit=limit)

@router.get("/search")
def search_fields(
    q: str = Query(..., min_length=1),
    k: int = Query(5, ge=1, le=20),
    model: Optional[str] = Query(None, description="model でメタフィルタ(例: res.partner)"),
) -> Dict[str, Any]:
    """
    RAG 検索(ベクトル検索)。メタデータで model をフィルタ可能。
    """
    col = get_collection("portal_fields")
    where = {"model": model} if model else None
    res = col.query(query_texts=[q], n_results=k, where=where)

    # Chroma の返り値を軽く整形
    out = []
    ids = res.get("ids", [[]])[0]
    docs = res.get("documents", [[]])[0]
    mets = res.get("metadatas", [[]])[0]
    dists = res.get("distances", [[]])[0] if "distances" in res else [None] * len(ids)

    for i in range(len(ids)):
        out.append({
            "id": ids[i],
            "distance": dists[i],
            "document": docs[i],
            "metadata": mets[i],
        })
    return {"q": q, "k": k, "results": out}

1-5. api/app/main.py にルータ登録(抜粋)

# 既存の main.py のどこか(app = FastAPI() の後)に追加
from .routers import chroma as chroma_router
app.include_router(chroma_router.router)

2) 依存関係(requirements.txt)

api/requirements.txt以下2行が入っていること:

openai>=1.0.0
chromadb>=0.5.0

3) ChromaDB の K8s マニフェスト(infra namespace)

kubectl apply -f - で流せるよう、Deployment + Service を一つに。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: chromadb
  namespace: infra
  labels: { app: chromadb }
spec:
  replicas: 1
  selector: { matchLabels: { app: chromadb } }
  template:
    metadata: { labels: { app: chromadb } }
    spec:
      containers:
        - name: chromadb
          image: chromadb/chromadb:latest
          ports:
            - containerPort: 8000
          env:
            - name: ANONYMIZED_TELEMETRY
              value: "False"
          readinessProbe:
            httpGet: { path: /api/v1/heartbeat, port: 8000 }
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet: { path: /api/v1/heartbeat, port: 8000 }
            initialDelaySeconds: 10
            periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
  name: chromadb
  namespace: infra
  labels: { app: chromadb }
spec:
  selector: { app: chromadb }
  ports:
    - name: http
      port: 8000
      targetPort: 8000
      protocol: TCP
  type: ClusterIP

4) 実行コマンド(順番にそのまま)

4-1. ChromaDB を起動(infra)

kubectl apply -f - <<'YAML'
# ここに上の YAML をそのまま貼る
YAML

kubectl -n infra rollout status deploy/chromadb
kubectl -n infra get svc chromadb

4-2. Dev-Portal API に環境変数を注入(dev)

# デプロイ名/コンテナ名を取得
DEPLOY=$(kubectl -n dev get deploy -o jsonpath='{.items[0].metadata.name}')
CN=$(kubectl -n dev get deploy $DEPLOY -o jsonpath='{.spec.template.spec.containers[0].name}')

# Chroma 接続URLと Embeddings モデルを設定
kubectl -n dev set env deploy/$DEPLOY CHROMA_URL=http://chromadb.infra.svc.cluster.local:8000
kubectl -n dev set env deploy/$DEPLOY OPENAI_EMBEDDINGS_MODEL=text-embedding-3-large
# OPENAI_API_KEY は既に Secret で注入済み想定(未設定なら注入→再起動)

4-3. イメージビルド → ロールアウト

# Minikube の Docker を使う(必要な場合)
eval $(minikube -p dev32 docker-env)

# API イメージをビルド(api/ 直下に Dockerfile がある前提)
cd /home/kenji/test_k8s/project_portal/dev-portal-k8s/dev-portal-k8s/api
DOCKER_BUILDKIT=0 docker build -t dev-portal-api:v11 .

# 差し替え & ロールアウト
kubectl -n dev set image deploy/$DEPLOY $CN=dev-portal-api:v11
kubectl -n dev rollout status deploy/$DEPLOY

5) 動作確認コマンド

5-1. エクスポート(Chroma へ投入)

# すべて投入(上限200行の例)
curl -s "http://api.local:8080/fields/export/chroma?since=1970-01-01T00:00:00&limit=200" | python3 -m json.tool
# => {"exported": N}

5-2. 検索(日本語でもOK)

curl -s "http://api.local:8080/fields/search?q=顧客ランク&k=5" | python3 -m json.tool

# model で絞り込み
curl -s "http://api.local:8080/fields/search?q=ランク&k=5&model=res.partner" | python3 -m json.tool

6) 参考(notes_i18n 列が未追加なら)

kubectl -n infra exec -it postgres-0 -- psql -U dev -d devportal -c \
"ALTER TABLE public.portal_fields ADD COLUMN IF NOT EXISTS notes_i18n JSONB;
 CREATE INDEX IF NOT EXISTS idx_portal_fields_notes_i18n_gin ON public.portal_fiel

Comments

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です