๊ฐ ์ปดํฌ๋ํธ๋ฅผ ๊ฐ๋ณ ์ ์ดํ๊ณ , ์๋ฒ ๋ฉยท๋ฒกํฐ์คํ ์ดยท์ฒญ์ปค๋ฅผ ๊ต์ฒดํ๋ ๋ฐฉ๋ฒ์ ๋ค๋ฃน๋๋ค.
Retriever โ Generator โ Executor๋ฅผ ์ง์ ์กฐํฉํฉ๋๋ค.
๊ฐ ๋จ๊ณ์ ์
์ถ๋ ฅ์ด ์ฝ๋์ ๋ณด์ฌ ๋๋ฒ๊น
๊ณผ ํ๋ผ๋ฏธํฐ ํ๋์ด ์ฝ์ต๋๋ค.
from lang2sql import (
CatalogChunker,
DirectoryLoader,
RecursiveCharacterChunker,
SQLExecutor,
SQLGenerator,
VectorRetriever,
)
from lang2sql.integrations.db import SQLAlchemyDB
from lang2sql.integrations.embedding import OpenAIEmbedding
from lang2sql.integrations.llm import OpenAILLM
catalog = [
{
"name": "orders",
"description": "์ฃผ๋ฌธ ์ ๋ณด",
"columns": {
"order_id": "์ฃผ๋ฌธ ๊ณ ์ ID",
"order_date": "์ฃผ๋ฌธ ์ผ์",
"amount": "๊ฒฐ์ ๊ธ์ก",
"status": "์ฃผ๋ฌธ ์ํ",
},
}
]
# 1) ๋ฌธ์ ๋ก๋ + split
docs = DirectoryLoader("docs/business").load()
embedding = OpenAIEmbedding(model="text-embedding-3-small")
catalog_chunks = CatalogChunker().split(catalog)
doc_chunks = RecursiveCharacterChunker(chunk_size=800, chunk_overlap=80).split(docs)
# 2) Retriever ๊ตฌ์ฑ
retriever = VectorRetriever.from_chunks(
catalog_chunks + doc_chunks,
embedding=embedding,
top_n=5,
score_threshold=0.2,
)
# 3) Generator / Executor ๊ฐ๋ณ ๊ตฌ์ฑ
generator = SQLGenerator(
llm=OpenAILLM(model="gpt-4o-mini"),
db_dialect="sqlite",
)
executor = SQLExecutor(db=SQLAlchemyDB("sqlite:///sample.db"))
# 4) ์๋ ์คํ โ ๊ฐ ๋จ๊ณ ๊ฒฐ๊ณผ ์ง์ ๊ด์ธก
query = "์ง๋๋ฌ ์๋งค์ถ ํฉ๊ณ"
retrieval = retriever.run(query)
sql = generator.run(query, retrieval.schemas, context=retrieval.context)
rows = executor.run(sql)
print("SQL:", sql)
print("๊ฒฐ๊ณผ:", rows)EmbeddingPort๋ฅผ ๋ง์กฑํ๋ ๊ตฌํ์ฒด๋ผ๋ฉด ๋ฌด์์ด๋ ์ฐ๊ฒฐํ ์ ์์ต๋๋ค.
from lang2sql.integrations.embedding import (
OpenAIEmbedding,
AzureOpenAIEmbedding,
GeminiEmbedding,
BedrockEmbedding,
OllamaEmbedding,
HuggingFaceEmbedding,
)
embedding = OpenAIEmbedding(model="text-embedding-3-small")class SentenceTransformerEmbedding:
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(model_name)
def embed_query(self, text: str) -> list[float]:
return self._model.encode([text], normalize_embeddings=True)[0].tolist()
def embed_texts(self, texts: list[str]) -> list[list[float]]:
return self._model.encode(texts, normalize_embeddings=True).tolist()
# ํ์ดํ๋ผ์ธ ์ฝ๋๋ ๋์ผ
retriever = VectorRetriever.from_sources(
catalog=catalog,
embedding=SentenceTransformerEmbedding(),
)VectorStorePort์ upsert()์ search()๋ง ๊ตฌํํ๋ฉด ๋ฉ๋๋ค.
class ChromaVectorStore:
def __init__(self, collection_name: str = "lang2sql"):
import chromadb
self._client = chromadb.Client()
self._col = self._client.get_or_create_collection(collection_name)
def upsert(self, ids: list[str], vectors: list[list[float]]) -> None:
self._col.upsert(ids=ids, embeddings=vectors)
def search(self, vector: list[float], k: int) -> list[tuple[str, float]]:
results = self._col.query(query_embeddings=[vector], n_results=k)
ids = results["ids"][0]
dists = results["distances"][0]
return [(id_, 1.0 - dist) for id_, dist in zip(ids, dists)]
retriever = VectorRetriever.from_sources(
catalog=catalog,
embedding=OpenAIEmbedding(),
vectorstore=ChromaVectorStore("my_catalog"),
)from lang2sql import CatalogChunker, VectorRetriever
from lang2sql.integrations.chunking import SemanticChunker
from lang2sql.integrations.embedding import OpenAIEmbedding
embedding = OpenAIEmbedding(model="text-embedding-3-small")
# from_chunks ํจํด
doc_chunks = SemanticChunker(
embedding=embedding,
breakpoint_threshold=0.3,
min_chunk_size=100,
).split(docs)
retriever = VectorRetriever.from_chunks(
CatalogChunker().split(catalog) + doc_chunks,
embedding=embedding,
)
# from_sources ํจํด: splitter ํ๋ผ๋ฏธํฐ๋ก ์ ๋ฌ
retriever = VectorRetriever.from_sources(
catalog=catalog,
documents=docs,
embedding=embedding,
splitter=SemanticChunker(embedding=embedding),
)from langchain_text_splitters import RecursiveCharacterTextSplitter
from lang2sql import IndexedChunk, TextDocument
class LangChainChunkerAdapter:
def __init__(self, splitter):
self._splitter = splitter
def chunk(self, doc: TextDocument) -> list[IndexedChunk]:
texts = self._splitter.split_text(doc["content"])
title = doc.get("title", "")
return [
IndexedChunk(
chunk_id=f"{doc['id']}__{i}",
text=f"{title}: {text}" if title else text,
source_type="document",
source_id=doc["id"],
chunk_index=i,
metadata={"title": title, "source": doc.get("source", "")},
)
for i, text in enumerate(texts)
]
lc_chunker = LangChainChunkerAdapter(
RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
)
retriever = VectorRetriever.from_sources(
catalog=catalog,
documents=docs,
embedding=OpenAIEmbedding(),
splitter=lc_chunker,
)DataHub GMS ์๋ฒ์์ ํ
์ด๋ธ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ CatalogEntry ๋ชฉ๋ก์ผ๋ก ๋ณํํฉ๋๋ค.
์๋์ผ๋ก ์นดํ๋ก๊ทธ๋ฅผ ์์ฑํ์ง ์์๋ DataHub์ ๋ฑ๋ก๋ ์คํค๋ง ์ ๋ณด๋ฅผ ๋ฐ๋ก ์ฌ์ฉํ ์ ์์ต๋๋ค.
pip install acryl-datahubfrom lang2sql.integrations.catalog import DataHubCatalogLoader
loader = DataHubCatalogLoader(
gms_server="http://localhost:8080",
extra_headers={"Authorization": "Bearer <token>"},
)
# ์ ์ฒด URN ์กฐํ
catalog = loader.load()
# ํน์ URN๋ง ์กฐํ
catalog = loader.load(urns=[
"urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)",
])
# ๋ฐ๋ก ํ์ดํ๋ผ์ธ์ ์ฐ๊ฒฐ
from lang2sql import BaselineNL2SQL
from lang2sql.integrations.db import SQLAlchemyDB
from lang2sql.integrations.llm import OpenAILLM
pipeline = BaselineNL2SQL(
catalog=catalog,
llm=OpenAILLM(model="gpt-4o-mini"),
db=SQLAlchemyDB("postgresql://user:pass@localhost:5432/mydb"),
db_dialect="postgresql",
)
DataHubCatalogLoader๋CatalogLoaderPort๋ฅผ ๊ตฌํํฉ๋๋ค. DataHub ์์ด๋SQLAlchemyExplorer๋ก DDL์ ์ง์ ์กฐํํ๊ฑฐ๋ CSV/์๋ ์นดํ๋ก๊ทธ๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
์ปค์คํ ์ด๋ํฐ๋ฅผ ์์ฑํ ๋ ๊ตฌํํด์ผ ํ๋ ๋ฉ์๋ ๋ชฉ๋ก์ ๋๋ค.
| Port | ๋ฉ์๋ | ์๊ทธ๋์ฒ | ์ฉ๋ |
|---|---|---|---|
LLMPort |
invoke |
(messages: list[dict]) -> str |
LLM ๋ฐฑ์๋ ๊ต์ฒด |
DBPort |
execute |
(sql: str) -> list[dict] |
DB ๋ฐฑ์๋ ๊ต์ฒด |
EmbeddingPort |
embed_query |
(text: str) -> list[float] |
๋จ์ผ ํ ์คํธ ์๋ฒ ๋ฉ |
embed_texts |
(texts: list[str]) -> list[list[float]] |
๋ฐฐ์น ์๋ฒ ๋ฉ | |
VectorStorePort |
upsert |
(ids: list[str], vectors: list[list[float]]) -> None |
๋ฒกํฐ ์ ์ฅ |
search |
(vector: list[float], k: int) -> list[tuple[str, float]] |
์ ์ฌ๋ ๊ฒ์ (id, score) | |
DocumentLoaderPort |
load |
() -> list[TextDocument] |
๋ฌธ์ ๋ก๋ |
DocumentChunkerPort |
chunk |
(doc: TextDocument) -> list[IndexedChunk] |
๋ฌธ์ ๋ถํ |
CatalogLoaderPort |
load |
(urns: list[str] | None) -> list[CatalogEntry] |
์ธ๋ถ ์นดํ๋ก๊ทธ ๋ก๋ |
DBExplorerPort |
list_tables |
() -> list[str] |
ํ ์ด๋ธ ๋ชฉ๋ก |
get_ddl |
(table: str) -> str |
DDL ์กฐํ | |
sample_data |
(table: str, limit: int) -> list[dict] |
์ํ ๋ฐ์ดํฐ | |
execute_read_only |
(sql: str) -> list[dict] |
์ฝ๊ธฐ ์ ์ฉ ์ฟผ๋ฆฌ |
๋ชจ๋ Port๋ src/lang2sql/core/ports.py์ Protocol๋ก ์ ์๋์ด ์์ต๋๋ค.
ํด๋์ค ์์ ์์ด ๋ฉ์๋ ์๊ทธ๋์ฒ๋ง ๋ง์ถ๋ฉด ์ด๋ค ๊ฐ์ฒด๋ ์ฐ๊ฒฐํ ์ ์์ต๋๋ค (structural subtyping).
MemoryHook์ผ๋ก ์ปดํฌ๋ํธ ๋จ์ ์คํ ์ด๋ฒคํธ๋ฅผ ์์งํฉ๋๋ค.
from lang2sql import HybridNL2SQL, MemoryHook
from lang2sql.integrations.db import SQLAlchemyDB
from lang2sql.integrations.embedding import OpenAIEmbedding
from lang2sql.integrations.llm import OpenAILLM
hook = MemoryHook()
pipeline = HybridNL2SQL(
catalog=catalog,
llm=OpenAILLM(model="gpt-4o-mini"),
db=SQLAlchemyDB("sqlite:///sample.db"),
embedding=OpenAIEmbedding(model="text-embedding-3-small"),
documents=docs,
db_dialect="sqlite",
hook=hook,
)
pipeline.run("์ง๋๋ฌ ์๋งค์ถ ํฉ๊ณ")
for e in hook.snapshot():
print(f"{e.component:30s} {e.phase:5s} {e.duration_ms:6.1f}ms error={e.error}")์ถ๋ ฅ ์์:
HybridRetriever start 0.0ms error=None
HybridRetriever end 12.3ms error=None
SQLGenerator start 0.0ms error=None
SQLGenerator end 890.1ms error=None
SQLExecutor start 0.0ms error=None
SQLExecutor end 1.2ms error=None
์ด์ ํ๊ฒฝ์์๋ duration_ms๋ก ๋ณ๋ชฉ์ ํ์
ํ๊ณ error ์ด๋ฒคํธ๋ฅผ ์์งํด ์ฅ์ ํจํด์ ๋ถ์ํฉ๋๋ค.
description์ ํ ๋ฌธ์ฅ์ผ๋ก ํ ์ด๋ธ ์ฉ๋๋ฅผ ๋ช ํํ ๊ธฐ์columns๋ ๋น์ฆ๋์ค ์ฉ์ด์ ์ปฌ๋ผ๋ช ๋งคํ์ ์ถฉ์คํ ์์ฑ- ๊ด๋ จ ํ ์ด๋ธ ๊ฐ FK ๊ด๊ณ๋ฅผ ์ปฌ๋ผ ์ค๋ช ์ ๋ช ์
top_n: 3~8๋ก ์์ํด ์คํ (๋๋ฌด ๋ง์ผ๋ฉด LLM ํ๋กฌํํธ ๋น์ฉ ์ฆ๊ฐ)score_threshold: 0.0์ผ๋ก ์์ ํ ๊ด๋ จ ์๋ ํ ์ด๋ธ์ด ๊ฒ์๋ ๋ 0.3~0.5๋ก ์ํฅrrf_k(HybridRetriever): ๊ธฐ๋ณธ๊ฐ 60, ๊ฒ์ ๊ฒฐ๊ณผ ์์ ๋ฏผ๊ฐ๋ ์กฐ์
- ๊ธฐ๋ณธ์
RecursiveCharacterChunker - ๋ฌธ์ ํ์ง์ด ์ค์ํ๊ณ ๋น์ฉ ํ์ฉ ์
SemanticChunker๊ฒํ chunk_overlap์ ๋ฐ๋์chunk_size๋ณด๋ค ์๊ฒ ์ค์
| ์ฐ์ ์์ | ํ์ดํ๋ผ์ธ |
|---|---|
| ๋น ๋ฅธ ์์ | BaselineNL2SQL |
| ๊ฒ์ ํ์ง | HybridNL2SQL |
| ์ด์ ํ๊ฒฝ | EnrichedNL2SQL |
| ์ธ๋ฐํ ์ ์ด | ์๋ ์ปดํฌ๋ํธ ์กฐํฉ |
pip install openaiRecursiveCharacterChunker์ chunk_overlap < chunk_size ์กฐ๊ฑด ์๋ฐ.
ํ๋ผ๋ฏธํฐ๋ฅผ ์์ ํ์ธ์.
from_chunks()๋๋from_sources()๊ฐ ์ค์ ๋ก ํธ์ถ๋๋์ง ํ์ธlen(retriever._registry) > 0ํ์ธscore_threshold๋ฅผ0.0์ผ๋ก ๋ฎ์ถฐ์ ํ ์คํธ
add()๋ list[IndexedChunk]๋ง ๋ฐ์ต๋๋ค. TextDocument๋ฅผ ์ง์ ์ ๋ฌํ๋ฉด ์ค๋ฅ๊ฐ ๋ฐ์ํฉ๋๋ค.
# โ ๋์ ์ ํจ
retriever.add(docs)
# โ
์ฌ๋ฐ๋ฅธ ๋ฐฉ๋ฒ
retriever.add(RecursiveCharacterChunker().split(docs))pip install pymupdfQuestionGate๊ฐ SQL๋ก ๋ตํ ์ ์๋ค๊ณ ํ๋จํ ๊ฒฝ์ฐ์
๋๋ค.
gate_enabled=False๋ก ๋นํ์ฑํํ๊ฑฐ๋ ์ง๋ฌธ์ SQL ๊ด๋ จ์ผ๋ก ๊ตฌ์ฒดํํ์ธ์.