coder-demo / app.py
AhmadA82's picture
fast-response
ac29781 verified
# --- HF model lists (single, light model for HF Spaces CPU) ---
# We standardize on a small, fast model that runs reliably via HF Inference
# and is suitable for free CPU Spaces constraints.
THINKING_MODELS = ["Qwen/Qwen2.5-3B-Instruct"]
INSTRUCT_MODELS = ["Qwen/Qwen2.5-3B-Instruct"]
def _current_models():
return THINKING_MODELS if STATE.get("mode") == "thinking" else INSTRUCT_MODELS
# app.py
import os
import json
import hashlib
import logging
import threading
from pathlib import Path
from typing import List, Dict, Any, Tuple
# --- Model mode state (thinking/instruct) with simple persistence ---
from pathlib import Path
APP_DIR = Path(__file__).parent
DATA_DIR = APP_DIR / "data"
DATA_DIR.mkdir(parents=True, exist_ok=True)
STATE_PATH = DATA_DIR / "state.json"
def _load_state():
if STATE_PATH.exists():
try:
return json.loads(STATE_PATH.read_text(encoding="utf-8"))
except Exception:
pass
return {"mode": "instruct"}
def _save_state(s: dict):
STATE_PATH.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding="utf-8")
STATE = _load_state()
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from huggingface_hub import HfApi, hf_hub_download
from monitor import get_current_metrics, start_monitoring_thread
from memory import get_history, save_history
# =========================
# إعداد السجلّات
# =========================
logging.basicConfig(
level=logging.INFO,
format="🪵 [%(asctime)s] [%(levelname)s] %(message)s"
)
logger = logging.getLogger("app")
# =========================
# ثوابت ومسارات
# =========================
DATA_DIR = Path("data")
CACHE_DIR = DATA_DIR / "cache"
INDEX_DIR = DATA_DIR / "index"
FILES_DIR = DATA_DIR / "files" # تخزين النص الكامل لكل ملف
REPORT_FILE = DATA_DIR / "analysis_report.md"
GRAPH_FILE = DATA_DIR / "code_graph.json"
EMB_FILE = INDEX_DIR / "embeddings.faiss"
META_FILE = INDEX_DIR / "chunks.pkl"
HASH_MAP_FILE = INDEX_DIR / "hash_map.json"
for p in [DATA_DIR, CACHE_DIR, INDEX_DIR, FILES_DIR]:
p.mkdir(parents=True, exist_ok=True)
# Env
HF_TOKEN = os.getenv("HF_TOKEN", "")
# Use a single, smaller model by default for faster responses and fewer 5xx
MODEL_REPO = os.getenv("MODEL_REPO", "Qwen/Qwen2.5-3B-Instruct")
# No fallbacks by default (can be provided via env if desired)
FALLBACK_MODELS = [
m.strip() for m in os.getenv("FALLBACK_MODELS", "").split(",") if m.strip()
]
# GGUF المحلي (إن توفر)
LOCAL_GGUF_REPO = os.getenv("LOCAL_GGUF_REPO", "Qwen/Qwen2.5-3B-Instruct-GGUF")
LOCAL_GGUF_FILE = os.getenv("LOCAL_GGUF_FILE", "qwen2.5-3b-instruct-q4_k_m.gguf")
LOCAL_GGUF_PATH = CACHE_DIR / LOCAL_GGUF_FILE
# تقسيم الشيفرة (قيمة قصوى للقراءة المؤقتة)
MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", str(10 * 1024 * 1024))) # 10MB احتياطيًا
SYSTEM_PROMPT = """<|im_start|>system
نفذ ما يطلبه المستخدم كاملا بدون توقف حتى تنهي الطلب.
<|im_end|>"""
# =========================
# الحالة العالمية والقفل
# =========================
all_chunks: List[Tuple[str, str]] = []
code_graph: Dict[str, Any] = {"files": {}}
hash_map: Dict[str, str] = {}
index_lock = threading.RLock() # ✅ لتأمين الفهرسة/الاسترجاع
# =========================
# LLM (محلي عبر GGUF)
# =========================
try:
from llama_cpp import Llama
except Exception:
Llama = None
llm = None
logger.info(f"HF_TOKEN length: {len(HF_TOKEN)}") # تحقق من طول الtoken
def load_local_model_if_configured():
"""تحميل نموذج GGUF من HuggingFace Hub مباشرة."""
global llm
if Llama is None:
logger.warning("⚠️ llama_cpp غير متاح. لن يعمل النموذج المحلي.")
return
try:
logger.info(f"⬇️ تحميل نموذج GGUF: {LOCAL_GGUF_REPO}/{LOCAL_GGUF_FILE}")
llm = Llama.from_pretrained(
repo_id=LOCAL_GGUF_REPO,
filename=LOCAL_GGUF_FILE,
# Llama params (tuned for HF Spaces CPU)
n_ctx=int(os.getenv("N_CTX", "4096")),
n_threads=int(os.getenv("N_THREADS", str(os.cpu_count() or 2))),
n_batch=int(os.getenv("N_BATCH", "32")),
n_gpu_layers=int(os.getenv("N_GPU_LAYERS", "0")),
use_mlock=False,
verbose=False,
)
logger.info("✅ تم تحميل نموذج GGUF المحلي بنجاح.")
except Exception as e:
llm = None
logger.error(f"❌ فشل تحميل/تشغيل GGUF: {e}")
def call_local_llm(prompt: str, max_tokens: int = 256) -> str:
if llm is None:
raise RuntimeError("النموذج المحلي غير محمل")
try:
res = llm(
prompt,
max_tokens=min(max_tokens, 256),
temperature=0.5,
top_p=0.9,
top_k=40,
repeat_penalty=1.1,
stop=["<|im_end|>", "<|im_start|>"],
echo=False
)
return (res.get("choices", [{}])[0].get("text") or "").strip()
except Exception as e:
logger.error(f"❌ خطأ في استدعاء النموذج المحلي: {e}")
raise RuntimeError(f"فشل استدعاء النموذج المحلي: {e}")
def _call_hf_single_model(model_repo: str, prompt: str, max_new_tokens: int = 900) -> str:
import requests
if not HF_TOKEN:
logger.error("❌ HF_TOKEN غير معرف.")
raise RuntimeError("التوكن HF_TOKEN غير مضبوط ولا يوجد نموذج محلي.")
url = f"https://api-inference.huggingface.co/models/{model_repo}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_new_tokens,
"temperature": 0.4,
"top_p": 0.9,
"return_full_text": False
}
}
r = requests.post(url, headers=headers, json=payload, timeout=120)
if r.status_code == 503:
data = {}
try: data = r.json()
except Exception: pass
eta = data.get("estimated_time")
raise RuntimeError("النموذج قيد التحميل من HF (503)." + (f" متوقع {eta:.0f}ث" if isinstance(eta, (int, float)) else ""))
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status == 401: raise RuntimeError("التوكن مفقود أو غير صالح (401). تأكد من HF_TOKEN.")
if status == 403:
msg = ""
try: msg = (e.response.json().get("error") or "").lower()
except Exception: pass
if "gated" in msg or "accept" in msg:
raise RuntimeError("النموذج مسيَّج (403). يجب دخول صفحة النموذج والضغط على Accept.")
raise RuntimeError("صلاحية الوصول مرفوضة (403).")
if status == 404: raise RuntimeError("النموذج غير موجود أو غير متاح عبر السيرفرلس (404).")
if status == 429: raise RuntimeError("تم تجاوز الحد المسموح للطلبات (429). جرّب لاحقًا.")
try:
err = e.response.json()
except Exception:
err = {"error": e.response.text}
raise RuntimeError(f"خطأ HF ({status}): {err.get('error') or err}")
data = r.json()
if isinstance(data, list) and data and "generated_text" in data[0]:
return data[0]["generated_text"]
if isinstance(data, dict) and "generated_text" in data:
return data["generated_text"]
if isinstance(data, dict) and "error" in data:
raise RuntimeError(f"HF error: {data['error']}")
return json.dumps(data)
def call_hf_inference(prompt: str, max_new_tokens: int = 900) -> str:
raise RuntimeError("تم تعطيل HF Inference. النموذج المحلي مستخدم فقط.")
def call_llm(prompt: str, max_tokens: int = 256) -> str:
return call_local_llm(prompt, max_tokens)
# =========================
# بناء الـ Prompt للدردشة (نسخة مبسطة)
# =========================
def build_chat_prompt(history: List[List[str]], message: str, extra: str = "") -> str:
prompt = f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n"
for user_msg, ai_msg in history:
prompt += f"<|im_start|>user\n{user_msg}\n<|im_end|>\n"
prompt += f"<|im_start|>assistant\n{ai_msg}\n<|im_end|>\n"
prompt += f"<|im_start|>user\n{message}\n{extra}\n<|im_end|>\n"
prompt += f"<|im_start|>assistant\n"
return prompt
# =========================
# FastAPI
# =========================
# NOTE: Warm-up moved to startup_event after helper functions are defined
app = FastAPI(title="AI Code Analyst")
# --- Root endpoint for Hugging Face health checks and simple UI ---
from fastapi.responses import PlainTextResponse, HTMLResponse, JSONResponse
@app.get("/", response_class=HTMLResponse)
def root(logs: str | None = None):
"""
Minimal root endpoint so HF / healthcheck returns 200 OK.
Use `/?logs=container` to tail last lines from data/app.log.
"""
if logs == "container":
log_file = Path(DATA_DIR) / "app.log"
if log_file.exists():
tail = "".join(log_file.read_text(encoding="utf-8", errors="ignore").splitlines(True)[-200:])
return PlainTextResponse(tail)
return PlainTextResponse("No logs yet.", status_code=200)
# Minimal HTML with quick chat form
html = """
<html>
<head>
<meta charset="utf-8">
<title>AI Code Analyst</title>
<style>
body{font-family: ui-sans-serif, system-ui; padding:20px; max-width:900px; margin:auto}
textarea{width:100%; height:110px}
input[type=text]{width:260px}
.row{margin:8px 0}
.small{color:#666}
pre{white-space:pre-wrap; background:#f7f7f8; padding:10px; border-radius:6px}
</style>
</head>
<body>
<h1>✅ AI Code Analyst is running</h1>
<p>Try <a href="/docs">/docs</a>, <a href="/hf-check">/hf-check</a>, or <a href="/metrics">/metrics</a>. | Logs: <a href="/?logs=container">tail</a></p>
<h3>Quick Chat (server-side)</h3>
<div class="row">
<textarea id="msg" placeholder="اكتب رسالتك هنا..."></textarea>
</div>
<div class="row">
<button onclick="send()">إرسال</button>
<span id="status" class="small"></span>
</div>
<div class="row">
<pre id="out" class="small"></pre>
</div>
<script>
async function send(){
const msg = document.getElementById('msg').value || '';
const st = document.getElementById('status');
const out = document.getElementById('out');
st.textContent = '...Sending'; out.textContent='';
try{
const r = await fetch('/chat', {method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({message: msg})});
if(!r.ok){
const t = await r.text();
st.textContent = 'HTTP '+r.status; out.textContent = t; return;
}
const j = await r.json();
st.textContent = 'OK';
out.textContent = j.response || JSON.stringify(j);
}catch(e){ st.textContent='Network error'; out.textContent=String(e); }
}
</script>
</body>
</html>
"""
return HTMLResponse(html)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Endpoint جديد لفحص التوكن وصلاحية الوصول
@app.get("/hf-check")
def hf_check():
api = HfApi()
out = {
"token_set": bool(HF_TOKEN),
"token_valid": False,
"model_repo": MODEL_REPO,
"model_access": False,
"model_private": None,
"gated_hint": False,
"message": ""
}
if not HF_TOKEN:
out["message"] = "HF_TOKEN غير مضبوط."
return out
try:
me = api.whoami(token=HF_TOKEN)
out["token_valid"] = True
out["message"] = f"Token OK for user: {me.get('name')}"
except Exception as e:
out["message"] = f"Token check failed: {type(e).__name__}: {e}"
return out
try:
info = api.model_info(MODEL_REPO, token=HF_TOKEN)
out["model_access"] = True
out["model_private"] = getattr(info, "private", None)
out["message"] += f" | Model reachable: {info.modelId}"
except Exception as e:
msg = str(e).lower()
out["message"] += f" | Model access failed: {type(e).__name__}: {e}"
out["gated_hint"] = ("gated" in msg or "accept" in msg)
return out
class UploadFilesRequest(BaseModel):
files: Dict[str, str] # fname: content
class DiffFilesRequest(BaseModel):
deleted: List[str]
modified: Dict[str, str] # fname: new_content
class AnalyzeAndReportRequest(BaseModel):
query: str
top_k: int | None = None
class ChatRequest(BaseModel):
message: str
class ChatResponse(BaseModel):
response: str
updated_history: List[List[str]]
@app.on_event("startup")
def startup_event():
# تحميل النموذج المحلي + مراقبة الموارد
load_local_model_if_configured()
start_monitoring_thread()
# Warm-up محلي
try:
_ = call_local_llm("ping", max_tokens=1)
logging.info("[LLM] Local warm-up OK")
except Exception as e:
logging.warning("[LLM] Local warm-up failed: %s", e)
def rebuild_index_from_files():
# نسخة Lite: لا فهرسة
return None
@app.get("/metrics")
def metrics():
return get_current_metrics()
@app.post("/upload-files")
def upload_files(req: UploadFilesRequest):
"""تحميل الملفات، تحليل سريع بالنموذج، ثم حذف الملفات دون حفظ دائم."""
FILES_DIR.mkdir(parents=True, exist_ok=True)
saved_paths = []
for fname, content in req.files.items():
p = FILES_DIR / fname
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
saved_paths.append(p)
try:
# بناء Prompt مختصر لتحليل الملفات
parts = []
for p in saved_paths:
try:
txt = p.read_text(encoding="utf-8", errors="ignore")
except Exception:
txt = "<read error>"
if len(txt) > 4000:
txt = txt[:4000] + "\n... [truncated]"
parts.append(f"[File] {p.name}\n{txt}")
prompt = (
f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n"
f"<|im_start|>user\nAnalyze these files and summarize key issues and functions.\n\n"
+ "\n\n".join(parts) + "\n<|im_end|>\n<|im_start|>assistant\n"
)
summary = call_llm(prompt, max_tokens=256)
return {"status": "ok", "summary": summary, "files": [p.name for p in saved_paths]}
finally:
# حذف الملفات فورًا
for p in saved_paths:
try:
p.unlink(missing_ok=True)
except Exception:
pass
# تم إلغاء مسار diff-files ضمن نسخة Lite (لا فهرسة أو مقارنة)
# تم إلغاء analyze-and-report ضمن نسخة Lite
def classify_intent(history: List[List[str]], message: str) -> Dict[str, Any]:
"""نسخة مبسطة: لا تصنيف، مجرد دردشة مباشرة."""
return {"intent": "CHAT", "confidence": 1.0, "action": "NONE", "targets": [], "reason": ""}
@app.post("/chat", response_model=ChatResponse)
def chat(req: ChatRequest):
global_key = "__global__"
history = get_history(global_key)
prompt = build_chat_prompt(history, req.message, "")
try:
response_text = call_llm(prompt, max_tokens=256)
except Exception as e:
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}")
updated = (history + [[req.message, response_text]])[-4:]
save_history(global_key, updated)
return ChatResponse(response=response_text, updated_history=updated)
# --- Robust HF Inference with retries/fallback/warmup ---
import requests, time
HF_API_URL = "https://api-inference.huggingface.co/models/{model}"
HF_TOKEN = os.getenv("HF_TOKEN","")
def _hf_headers():
hdr = {"Accept":"application/json"}
if HF_TOKEN:
hdr["Authorization"] = f"Bearer {HF_TOKEN}"
return hdr
def _hf_call_single(model: str, prompt: str, max_new_tokens: int = 256, timeout_s: int = 60) -> str:
url = HF_API_URL.format(model=model)
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.3},
"options": {"wait_for_model": True, "use_cache": True}
}
tries, backoff = 0, 2
while True:
tries += 1
try:
r = requests.post(url, headers=_hf_headers(), json=payload, timeout=timeout_s)
if r.status_code == 503:
try:
eta = r.json().get("estimated_time", 8)
except Exception:
eta = 8
time.sleep(min(30, max(2, int(eta))))
continue
r.raise_for_status()
data = r.json()
if isinstance(data, list):
if data and isinstance(data[0], dict) and "generated_text" in data[0]:
return data[0]["generated_text"]
if data and isinstance(data[0], dict) and "content" in data[0]:
return data[0]["content"]
if isinstance(data, dict) and "generated_text" in data:
return data["generated_text"]
return json.dumps(data, ensure_ascii=False)
except requests.HTTPError as e:
status = getattr(e.response, "status_code", None)
if status in (502, 503, 504, 429) and tries < 3:
time.sleep(backoff); backoff *= 2; continue
try:
text = e.response.text
except Exception:
text = ""
raise RuntimeError(f"HF error {status} on {model}: {text}") from e
except requests.RequestException as e:
if tries < 3:
time.sleep(backoff); backoff *= 2; continue
raise RuntimeError(f"HF request failed on {model}: {e}") from e
def call_hf_inference_robust(prompt: str, max_new_tokens: int = 256) -> str:
last_err = None
for m in _current_models():
try:
return _hf_call_single(m, prompt, max_new_tokens)
except Exception as e:
logging.warning(f"[HF] model {m} failed: {e}")
last_err = e
continue
raise RuntimeError(f"All HF models failed. Last: {last_err}")
from fastapi import Body
from pydantic import BaseModel
class _SetModelIn(BaseModel):
mode: str # 'thinking' or 'instruct'
@app.post("/set-model")
def set_model_endpoint(body: _SetModelIn):
mode = (body.mode or "").lower().strip()
if mode not in ("thinking","instruct"):
raise HTTPException(400, "mode must be 'thinking' or 'instruct'")
STATE["mode"] = mode
_save_state(STATE)
# Try warm-up immediately to inform user about readiness
try:
_ = _hf_call_single(_current_models()[0], "ping", 1, timeout_s=30)
warmed = True
except Exception:
warmed = False
return {"ok": True, "mode": mode, "models": _current_models(), "warmed": warmed}