Spaces:
Running
Running
File size: 4,075 Bytes
b743f42 78e29af 477ca91 d299007 23ff1e8 d299007 477ca91 02b980e 477ca91 02b980e 477ca91 02b980e 477ca91 78e29af b743f42 78e29af b743f42 78e29af 477ca91 78e29af b743f42 78e29af 477ca91 b743f42 78e29af 477ca91 78e29af 477ca91 78e29af 477ca91 78e29af 477ca91 b743f42 477ca91 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# build_index.py
import os
import re
import json
from typing import List
import numpy as np
from fastembed import TextEmbedding
# Пути
MD_PATH = "knowledge/clinic_facts.md"
# В рантайме писать в репозиторий нельзя, поэтому сохраняем в /tmp
INDEX_PATH = os.getenv("INDEX_PATH", "/tmp/clinic_index.json")
# ===== выбор поддерживаемой модели fastembed =====
PREF = [
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
"intfloat/multilingual-e5-small",
"sentence-transformers/paraphrase-multilingual-mpnet-base-v2",
]
def pick_model() -> str:
try:
supported = set(TextEmbedding.list_supported_models())
for name in PREF:
if name in supported:
return name
# что-нибудь мультиязычное
for name in supported:
if "multi" in name.lower():
return name
# последнее средство — первая попавшаяся
return next(iter(supported))
except Exception:
# если что-то пошло не так — дефолт
return "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
MODEL_NAME = pick_model()
# ------------------------- утилиты ------------------------- #
def _norm_space(s: str) -> str:
return re.sub(r"[ \t]+", " ", s).strip()
def _split_markdown(text: str) -> List[str]:
blocks = re.split(r"(?m)^(?:#{1,6}\s.+$)|\n{2,}", text)
blocks = [b.strip() for b in blocks if b and b.strip()]
chunks: List[str] = []
for b in blocks:
b = re.sub(r"`{1,3}.*?`{1,3}", "", b)
b = re.sub(r"\*\*([^*]+)\*\*", r"\1", b)
b = re.sub(r"\*([^*]+)\*", r"\1", b)
b = re.sub(r"_([^_]+)_", r"\1", b)
lines = [ln.strip() for ln in b.splitlines() if ln.strip()]
if not lines:
continue
paragraph = _norm_space(" ".join(lines))
if len(paragraph) > 1200:
sents = re.split(r"(?<=[\.\!\?])\s+", paragraph)
buf = ""
for s in sents:
if len(buf) + len(s) + 1 > 700:
if buf:
chunks.append(_norm_space(buf))
buf = s
else:
buf = (buf + " " + s).strip()
if buf:
chunks.append(_norm_space(buf))
else:
chunks.append(paragraph)
chunks = [c for c in chunks if len(c) >= 40]
uniq = list(dict.fromkeys(chunks))
return uniq
def _embed_texts(texts: List[str]) -> np.ndarray:
emb = TextEmbedding(model_name=MODEL_NAME)
vecs = list(emb.embed(texts))
mat = np.vstack(vecs).astype("float32")
mat /= (np.linalg.norm(mat, axis=1, keepdims=True) + 1e-9)
return mat
def _save_index(texts: List[str], vectors: np.ndarray) -> None:
payload = {"model": MODEL_NAME, "texts": texts, "vectors": vectors.tolist()}
os.makedirs(os.path.dirname(INDEX_PATH), exist_ok=True)
with open(INDEX_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False)
# ------------------------- API ------------------------- #
def _build_impl() -> dict:
if not os.path.exists(MD_PATH):
raise FileNotFoundError(f"Файл с фактами не найден: {MD_PATH}")
with open(MD_PATH, "r", encoding="utf-8") as f:
text = f.read()
chunks = _split_markdown(text)
if not chunks:
raise RuntimeError("Не удалось выделить фрагменты из markdown.")
vectors = _embed_texts(chunks)
_save_index(chunks, vectors)
info = {
"ok": True,
"chunks": len(chunks),
"index_path": INDEX_PATH,
"model": MODEL_NAME,
}
print(f"[build_index] Saved {len(chunks)} chunks -> {INDEX_PATH}")
return info
def main() -> dict:
return _build_impl()
def build() -> dict:
return _build_impl()
def rebuild() -> dict:
return _build_impl()
if __name__ == "__main__":
main() |