Spaces:
Running
Running
# --- HF model lists (single, light model for HF Spaces CPU) --- | |
# We standardize on a small, fast model that runs reliably via HF Inference | |
# and is suitable for free CPU Spaces constraints. | |
THINKING_MODELS = ["Qwen/Qwen2.5-3B-Instruct"] | |
INSTRUCT_MODELS = ["Qwen/Qwen2.5-3B-Instruct"] | |
def _current_models(): | |
return THINKING_MODELS if STATE.get("mode") == "thinking" else INSTRUCT_MODELS | |
# app.py | |
import os | |
import json | |
import hashlib | |
import logging | |
import threading | |
from pathlib import Path | |
from typing import List, Dict, Any, Tuple | |
# --- Model mode state (thinking/instruct) with simple persistence --- | |
from pathlib import Path | |
APP_DIR = Path(__file__).parent | |
DATA_DIR = APP_DIR / "data" | |
DATA_DIR.mkdir(parents=True, exist_ok=True) | |
STATE_PATH = DATA_DIR / "state.json" | |
def _load_state(): | |
if STATE_PATH.exists(): | |
try: | |
return json.loads(STATE_PATH.read_text(encoding="utf-8")) | |
except Exception: | |
pass | |
return {"mode": "instruct"} | |
def _save_state(s: dict): | |
STATE_PATH.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding="utf-8") | |
STATE = _load_state() | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from huggingface_hub import HfApi, hf_hub_download | |
from monitor import get_current_metrics, start_monitoring_thread | |
from memory import get_history, save_history | |
# ========================= | |
# إعداد السجلّات | |
# ========================= | |
logging.basicConfig( | |
level=logging.INFO, | |
format="🪵 [%(asctime)s] [%(levelname)s] %(message)s" | |
) | |
logger = logging.getLogger("app") | |
# ========================= | |
# ثوابت ومسارات | |
# ========================= | |
DATA_DIR = Path("data") | |
CACHE_DIR = DATA_DIR / "cache" | |
INDEX_DIR = DATA_DIR / "index" | |
FILES_DIR = DATA_DIR / "files" # تخزين النص الكامل لكل ملف | |
REPORT_FILE = DATA_DIR / "analysis_report.md" | |
GRAPH_FILE = DATA_DIR / "code_graph.json" | |
EMB_FILE = INDEX_DIR / "embeddings.faiss" | |
META_FILE = INDEX_DIR / "chunks.pkl" | |
HASH_MAP_FILE = INDEX_DIR / "hash_map.json" | |
for p in [DATA_DIR, CACHE_DIR, INDEX_DIR, FILES_DIR]: | |
p.mkdir(parents=True, exist_ok=True) | |
# Env | |
HF_TOKEN = os.getenv("HF_TOKEN", "") | |
# Use a single, smaller model by default for faster responses and fewer 5xx | |
MODEL_REPO = os.getenv("MODEL_REPO", "Qwen/Qwen2.5-3B-Instruct") | |
# No fallbacks by default (can be provided via env if desired) | |
FALLBACK_MODELS = [ | |
m.strip() for m in os.getenv("FALLBACK_MODELS", "").split(",") if m.strip() | |
] | |
# GGUF المحلي (إن توفر) | |
LOCAL_GGUF_REPO = os.getenv("LOCAL_GGUF_REPO", "Qwen/Qwen2.5-3B-Instruct-GGUF") | |
LOCAL_GGUF_FILE = os.getenv("LOCAL_GGUF_FILE", "qwen2.5-3b-instruct-q4_k_m.gguf") | |
LOCAL_GGUF_PATH = CACHE_DIR / LOCAL_GGUF_FILE | |
# تقسيم الشيفرة (قيمة قصوى للقراءة المؤقتة) | |
MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", str(10 * 1024 * 1024))) # 10MB احتياطيًا | |
SYSTEM_PROMPT = """<|im_start|>system | |
نفذ ما يطلبه المستخدم كاملا بدون توقف حتى تنهي الطلب. | |
<|im_end|>""" | |
# ========================= | |
# الحالة العالمية والقفل | |
# ========================= | |
all_chunks: List[Tuple[str, str]] = [] | |
code_graph: Dict[str, Any] = {"files": {}} | |
hash_map: Dict[str, str] = {} | |
index_lock = threading.RLock() # ✅ لتأمين الفهرسة/الاسترجاع | |
# ========================= | |
# LLM (محلي عبر GGUF) | |
# ========================= | |
try: | |
from llama_cpp import Llama | |
except Exception: | |
Llama = None | |
llm = None | |
logger.info(f"HF_TOKEN length: {len(HF_TOKEN)}") # تحقق من طول الtoken | |
def load_local_model_if_configured(): | |
"""تحميل نموذج GGUF من HuggingFace Hub مباشرة.""" | |
global llm | |
if Llama is None: | |
logger.warning("⚠️ llama_cpp غير متاح. لن يعمل النموذج المحلي.") | |
return | |
try: | |
logger.info(f"⬇️ تحميل نموذج GGUF: {LOCAL_GGUF_REPO}/{LOCAL_GGUF_FILE}") | |
llm = Llama.from_pretrained( | |
repo_id=LOCAL_GGUF_REPO, | |
filename=LOCAL_GGUF_FILE, | |
# Llama params (tuned for HF Spaces CPU) | |
n_ctx=int(os.getenv("N_CTX", "4096")), | |
n_threads=int(os.getenv("N_THREADS", str(os.cpu_count() or 2))), | |
n_batch=int(os.getenv("N_BATCH", "32")), | |
n_gpu_layers=int(os.getenv("N_GPU_LAYERS", "0")), | |
use_mlock=False, | |
verbose=False, | |
) | |
logger.info("✅ تم تحميل نموذج GGUF المحلي بنجاح.") | |
except Exception as e: | |
llm = None | |
logger.error(f"❌ فشل تحميل/تشغيل GGUF: {e}") | |
def call_local_llm(prompt: str, max_tokens: int = 256) -> str: | |
if llm is None: | |
raise RuntimeError("النموذج المحلي غير محمل") | |
try: | |
res = llm( | |
prompt, | |
max_tokens=min(max_tokens, 256), | |
temperature=0.5, | |
top_p=0.9, | |
top_k=40, | |
repeat_penalty=1.1, | |
stop=["<|im_end|>", "<|im_start|>"], | |
echo=False | |
) | |
return (res.get("choices", [{}])[0].get("text") or "").strip() | |
except Exception as e: | |
logger.error(f"❌ خطأ في استدعاء النموذج المحلي: {e}") | |
raise RuntimeError(f"فشل استدعاء النموذج المحلي: {e}") | |
def _call_hf_single_model(model_repo: str, prompt: str, max_new_tokens: int = 900) -> str: | |
import requests | |
if not HF_TOKEN: | |
logger.error("❌ HF_TOKEN غير معرف.") | |
raise RuntimeError("التوكن HF_TOKEN غير مضبوط ولا يوجد نموذج محلي.") | |
url = f"https://api-inference.huggingface.co/models/{model_repo}" | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_new_tokens, | |
"temperature": 0.4, | |
"top_p": 0.9, | |
"return_full_text": False | |
} | |
} | |
r = requests.post(url, headers=headers, json=payload, timeout=120) | |
if r.status_code == 503: | |
data = {} | |
try: data = r.json() | |
except Exception: pass | |
eta = data.get("estimated_time") | |
raise RuntimeError("النموذج قيد التحميل من HF (503)." + (f" متوقع {eta:.0f}ث" if isinstance(eta, (int, float)) else "")) | |
try: | |
r.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
status = e.response.status_code | |
if status == 401: raise RuntimeError("التوكن مفقود أو غير صالح (401). تأكد من HF_TOKEN.") | |
if status == 403: | |
msg = "" | |
try: msg = (e.response.json().get("error") or "").lower() | |
except Exception: pass | |
if "gated" in msg or "accept" in msg: | |
raise RuntimeError("النموذج مسيَّج (403). يجب دخول صفحة النموذج والضغط على Accept.") | |
raise RuntimeError("صلاحية الوصول مرفوضة (403).") | |
if status == 404: raise RuntimeError("النموذج غير موجود أو غير متاح عبر السيرفرلس (404).") | |
if status == 429: raise RuntimeError("تم تجاوز الحد المسموح للطلبات (429). جرّب لاحقًا.") | |
try: | |
err = e.response.json() | |
except Exception: | |
err = {"error": e.response.text} | |
raise RuntimeError(f"خطأ HF ({status}): {err.get('error') or err}") | |
data = r.json() | |
if isinstance(data, list) and data and "generated_text" in data[0]: | |
return data[0]["generated_text"] | |
if isinstance(data, dict) and "generated_text" in data: | |
return data["generated_text"] | |
if isinstance(data, dict) and "error" in data: | |
raise RuntimeError(f"HF error: {data['error']}") | |
return json.dumps(data) | |
def call_hf_inference(prompt: str, max_new_tokens: int = 900) -> str: | |
raise RuntimeError("تم تعطيل HF Inference. النموذج المحلي مستخدم فقط.") | |
def call_llm(prompt: str, max_tokens: int = 256) -> str: | |
return call_local_llm(prompt, max_tokens) | |
# ========================= | |
# بناء الـ Prompt للدردشة (نسخة مبسطة) | |
# ========================= | |
def build_chat_prompt(history: List[List[str]], message: str, extra: str = "") -> str: | |
prompt = f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" | |
for user_msg, ai_msg in history: | |
prompt += f"<|im_start|>user\n{user_msg}\n<|im_end|>\n" | |
prompt += f"<|im_start|>assistant\n{ai_msg}\n<|im_end|>\n" | |
prompt += f"<|im_start|>user\n{message}\n{extra}\n<|im_end|>\n" | |
prompt += f"<|im_start|>assistant\n" | |
return prompt | |
# ========================= | |
# FastAPI | |
# ========================= | |
# NOTE: Warm-up moved to startup_event after helper functions are defined | |
app = FastAPI(title="AI Code Analyst") | |
# --- Root endpoint for Hugging Face health checks and simple UI --- | |
from fastapi.responses import PlainTextResponse, HTMLResponse, JSONResponse | |
def root(logs: str | None = None): | |
""" | |
Minimal root endpoint so HF / healthcheck returns 200 OK. | |
Use `/?logs=container` to tail last lines from data/app.log. | |
""" | |
if logs == "container": | |
log_file = Path(DATA_DIR) / "app.log" | |
if log_file.exists(): | |
tail = "".join(log_file.read_text(encoding="utf-8", errors="ignore").splitlines(True)[-200:]) | |
return PlainTextResponse(tail) | |
return PlainTextResponse("No logs yet.", status_code=200) | |
# Minimal HTML with quick chat form | |
html = """ | |
<html> | |
<head> | |
<meta charset="utf-8"> | |
<title>AI Code Analyst</title> | |
<style> | |
body{font-family: ui-sans-serif, system-ui; padding:20px; max-width:900px; margin:auto} | |
textarea{width:100%; height:110px} | |
input[type=text]{width:260px} | |
.row{margin:8px 0} | |
.small{color:#666} | |
pre{white-space:pre-wrap; background:#f7f7f8; padding:10px; border-radius:6px} | |
</style> | |
</head> | |
<body> | |
<h1>✅ AI Code Analyst is running</h1> | |
<p>Try <a href="/docs">/docs</a>, <a href="/hf-check">/hf-check</a>, or <a href="/metrics">/metrics</a>. | Logs: <a href="/?logs=container">tail</a></p> | |
<h3>Quick Chat (server-side)</h3> | |
<div class="row"> | |
<textarea id="msg" placeholder="اكتب رسالتك هنا..."></textarea> | |
</div> | |
<div class="row"> | |
<button onclick="send()">إرسال</button> | |
<span id="status" class="small"></span> | |
</div> | |
<div class="row"> | |
<pre id="out" class="small"></pre> | |
</div> | |
<script> | |
async function send(){ | |
const msg = document.getElementById('msg').value || ''; | |
const st = document.getElementById('status'); | |
const out = document.getElementById('out'); | |
st.textContent = '...Sending'; out.textContent=''; | |
try{ | |
const r = await fetch('/chat', {method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({message: msg})}); | |
if(!r.ok){ | |
const t = await r.text(); | |
st.textContent = 'HTTP '+r.status; out.textContent = t; return; | |
} | |
const j = await r.json(); | |
st.textContent = 'OK'; | |
out.textContent = j.response || JSON.stringify(j); | |
}catch(e){ st.textContent='Network error'; out.textContent=String(e); } | |
} | |
</script> | |
</body> | |
</html> | |
""" | |
return HTMLResponse(html) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Endpoint جديد لفحص التوكن وصلاحية الوصول | |
def hf_check(): | |
api = HfApi() | |
out = { | |
"token_set": bool(HF_TOKEN), | |
"token_valid": False, | |
"model_repo": MODEL_REPO, | |
"model_access": False, | |
"model_private": None, | |
"gated_hint": False, | |
"message": "" | |
} | |
if not HF_TOKEN: | |
out["message"] = "HF_TOKEN غير مضبوط." | |
return out | |
try: | |
me = api.whoami(token=HF_TOKEN) | |
out["token_valid"] = True | |
out["message"] = f"Token OK for user: {me.get('name')}" | |
except Exception as e: | |
out["message"] = f"Token check failed: {type(e).__name__}: {e}" | |
return out | |
try: | |
info = api.model_info(MODEL_REPO, token=HF_TOKEN) | |
out["model_access"] = True | |
out["model_private"] = getattr(info, "private", None) | |
out["message"] += f" | Model reachable: {info.modelId}" | |
except Exception as e: | |
msg = str(e).lower() | |
out["message"] += f" | Model access failed: {type(e).__name__}: {e}" | |
out["gated_hint"] = ("gated" in msg or "accept" in msg) | |
return out | |
class UploadFilesRequest(BaseModel): | |
files: Dict[str, str] # fname: content | |
class DiffFilesRequest(BaseModel): | |
deleted: List[str] | |
modified: Dict[str, str] # fname: new_content | |
class AnalyzeAndReportRequest(BaseModel): | |
query: str | |
top_k: int | None = None | |
class ChatRequest(BaseModel): | |
message: str | |
class ChatResponse(BaseModel): | |
response: str | |
updated_history: List[List[str]] | |
def startup_event(): | |
# تحميل النموذج المحلي + مراقبة الموارد | |
load_local_model_if_configured() | |
start_monitoring_thread() | |
# Warm-up محلي | |
try: | |
_ = call_local_llm("ping", max_tokens=1) | |
logging.info("[LLM] Local warm-up OK") | |
except Exception as e: | |
logging.warning("[LLM] Local warm-up failed: %s", e) | |
def rebuild_index_from_files(): | |
# نسخة Lite: لا فهرسة | |
return None | |
def metrics(): | |
return get_current_metrics() | |
def upload_files(req: UploadFilesRequest): | |
"""تحميل الملفات، تحليل سريع بالنموذج، ثم حذف الملفات دون حفظ دائم.""" | |
FILES_DIR.mkdir(parents=True, exist_ok=True) | |
saved_paths = [] | |
for fname, content in req.files.items(): | |
p = FILES_DIR / fname | |
p.parent.mkdir(parents=True, exist_ok=True) | |
p.write_text(content, encoding="utf-8") | |
saved_paths.append(p) | |
try: | |
# بناء Prompt مختصر لتحليل الملفات | |
parts = [] | |
for p in saved_paths: | |
try: | |
txt = p.read_text(encoding="utf-8", errors="ignore") | |
except Exception: | |
txt = "<read error>" | |
if len(txt) > 4000: | |
txt = txt[:4000] + "\n... [truncated]" | |
parts.append(f"[File] {p.name}\n{txt}") | |
prompt = ( | |
f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" | |
f"<|im_start|>user\nAnalyze these files and summarize key issues and functions.\n\n" | |
+ "\n\n".join(parts) + "\n<|im_end|>\n<|im_start|>assistant\n" | |
) | |
summary = call_llm(prompt, max_tokens=256) | |
return {"status": "ok", "summary": summary, "files": [p.name for p in saved_paths]} | |
finally: | |
# حذف الملفات فورًا | |
for p in saved_paths: | |
try: | |
p.unlink(missing_ok=True) | |
except Exception: | |
pass | |
# تم إلغاء مسار diff-files ضمن نسخة Lite (لا فهرسة أو مقارنة) | |
# تم إلغاء analyze-and-report ضمن نسخة Lite | |
def classify_intent(history: List[List[str]], message: str) -> Dict[str, Any]: | |
"""نسخة مبسطة: لا تصنيف، مجرد دردشة مباشرة.""" | |
return {"intent": "CHAT", "confidence": 1.0, "action": "NONE", "targets": [], "reason": ""} | |
def chat(req: ChatRequest): | |
global_key = "__global__" | |
history = get_history(global_key) | |
prompt = build_chat_prompt(history, req.message, "") | |
try: | |
response_text = call_llm(prompt, max_tokens=256) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") | |
updated = (history + [[req.message, response_text]])[-4:] | |
save_history(global_key, updated) | |
return ChatResponse(response=response_text, updated_history=updated) | |
# --- Robust HF Inference with retries/fallback/warmup --- | |
import requests, time | |
HF_API_URL = "https://api-inference.huggingface.co/models/{model}" | |
HF_TOKEN = os.getenv("HF_TOKEN","") | |
def _hf_headers(): | |
hdr = {"Accept":"application/json"} | |
if HF_TOKEN: | |
hdr["Authorization"] = f"Bearer {HF_TOKEN}" | |
return hdr | |
def _hf_call_single(model: str, prompt: str, max_new_tokens: int = 256, timeout_s: int = 60) -> str: | |
url = HF_API_URL.format(model=model) | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.3}, | |
"options": {"wait_for_model": True, "use_cache": True} | |
} | |
tries, backoff = 0, 2 | |
while True: | |
tries += 1 | |
try: | |
r = requests.post(url, headers=_hf_headers(), json=payload, timeout=timeout_s) | |
if r.status_code == 503: | |
try: | |
eta = r.json().get("estimated_time", 8) | |
except Exception: | |
eta = 8 | |
time.sleep(min(30, max(2, int(eta)))) | |
continue | |
r.raise_for_status() | |
data = r.json() | |
if isinstance(data, list): | |
if data and isinstance(data[0], dict) and "generated_text" in data[0]: | |
return data[0]["generated_text"] | |
if data and isinstance(data[0], dict) and "content" in data[0]: | |
return data[0]["content"] | |
if isinstance(data, dict) and "generated_text" in data: | |
return data["generated_text"] | |
return json.dumps(data, ensure_ascii=False) | |
except requests.HTTPError as e: | |
status = getattr(e.response, "status_code", None) | |
if status in (502, 503, 504, 429) and tries < 3: | |
time.sleep(backoff); backoff *= 2; continue | |
try: | |
text = e.response.text | |
except Exception: | |
text = "" | |
raise RuntimeError(f"HF error {status} on {model}: {text}") from e | |
except requests.RequestException as e: | |
if tries < 3: | |
time.sleep(backoff); backoff *= 2; continue | |
raise RuntimeError(f"HF request failed on {model}: {e}") from e | |
def call_hf_inference_robust(prompt: str, max_new_tokens: int = 256) -> str: | |
last_err = None | |
for m in _current_models(): | |
try: | |
return _hf_call_single(m, prompt, max_new_tokens) | |
except Exception as e: | |
logging.warning(f"[HF] model {m} failed: {e}") | |
last_err = e | |
continue | |
raise RuntimeError(f"All HF models failed. Last: {last_err}") | |
from fastapi import Body | |
from pydantic import BaseModel | |
class _SetModelIn(BaseModel): | |
mode: str # 'thinking' or 'instruct' | |
def set_model_endpoint(body: _SetModelIn): | |
mode = (body.mode or "").lower().strip() | |
if mode not in ("thinking","instruct"): | |
raise HTTPException(400, "mode must be 'thinking' or 'instruct'") | |
STATE["mode"] = mode | |
_save_state(STATE) | |
# Try warm-up immediately to inform user about readiness | |
try: | |
_ = _hf_call_single(_current_models()[0], "ping", 1, timeout_s=30) | |
warmed = True | |
except Exception: | |
warmed = False | |
return {"ok": True, "mode": mode, "models": _current_models(), "warmed": warmed} | |