Spaces:
Sleeping
Sleeping
import os | |
import zipfile | |
import torch | |
import faiss | |
import numpy as np | |
import gradio as gr | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from sentence_transformers import SentenceTransformer | |
#from langchain.document_loaders import TextLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
#from langchain.embeddings import HuggingFaceEmbeddings | |
#from langchain.vectorstores import FAISS as LangChainFAISS | |
#from langchain.docstore import InMemoryDocstore | |
from langchain.schema import Document | |
#from langchain.llms import HuggingFacePipeline | |
from huggingface_hub import login | |
from huggingface_hub import upload_file | |
from langchain_community.document_loaders import TextLoader | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS as LangChainFAISS | |
from langchain_community.docstore.in_memory import InMemoryDocstore | |
from langchain_community.llms import HuggingFacePipeline | |
# Extract the Knowledge Base ZIP | |
if os.path.exists("md_knowledge_base.zip"): | |
with zipfile.ZipFile("md_knowledge_base.zip", "r") as zip_ref: | |
zip_ref.extractall("md_knowledge_base") | |
print("✅ Knowledge base extracted.") | |
# Load Markdown Files | |
KB_PATH = "md_knowledge_base" | |
files = [os.path.join(dp, f) for dp, _, fn in os.walk(KB_PATH) for f in fn if f.endswith(".md")] | |
docs = [doc for f in files for doc in TextLoader(f, encoding="utf-8").load()] | |
print(f"✅ Loaded {len(docs)} documents.") | |
# Chunking | |
def get_dynamic_chunk_size(text): | |
if len(text) < 1000: | |
return 300 | |
elif len(text) < 5000: | |
return 500 | |
else: | |
return 1000 | |
chunks = [] | |
for doc in docs: | |
chunk_size = get_dynamic_chunk_size(doc.page_content) | |
chunk_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=100) | |
chunks.extend(chunk_splitter.split_documents([doc])) | |
texts = [chunk.page_content for chunk in chunks] | |
# Vectorstore (FAISS) | |
embed_model_id = "sentence-transformers/all-MiniLM-L6-v2" | |
embedder = SentenceTransformer(embed_model_id) | |
embeddings = embedder.encode(texts, show_progress_bar=False) | |
dim = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dim) | |
index.add(np.array(embeddings, dtype="float32")) | |
docs = [Document(page_content=t) for t in texts] | |
docstore = InMemoryDocstore({str(i): docs[i] for i in range(len(docs))}) | |
id_map = {i: str(i) for i in range(len(docs))} | |
embed_fn = HuggingFaceEmbeddings(model_name=embed_model_id) | |
vectorstore = LangChainFAISS( | |
index=index, | |
docstore=docstore, | |
index_to_docstore_id=id_map, | |
embedding_function=embed_fn | |
) | |
print("✅ FAISS vectorstore ready.") | |
# Load Falcon-e-1B-Instruct | |
model_id = "tiiuae/falcon-e-1b-instruct" | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
torch_dtype=torch.bfloat16 | |
).to("cuda" if torch.cuda.is_available() else "cpu") | |
text_gen_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
torch_dtype=torch.bfloat16, | |
device=0 if torch.cuda.is_available() else -1, | |
return_full_text=False, | |
do_sample=False, | |
max_new_tokens=200, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
llm = HuggingFacePipeline(pipeline=text_gen_pipeline) | |
def truncate_context(context, max_length=1024): | |
tokens = tokenizer.encode(context) | |
if len(tokens) > max_length: | |
tokens = tokens[:max_length] | |
return tokenizer.decode(tokens, skip_special_tokens=True) | |
def format_prompt(context, question): | |
return ( | |
"You are the Hull University Assistant—a friendly, knowledgeable chatbot dedicated to " | |
"helping students with questions about courses, admissions, tuition fees, and student life. " | |
"Use ONLY the information provided in the context below to answer the question. " | |
"If the answer cannot be found in the context, reply: \"I’m sorry, but I don’t have that " | |
"information available right now.\"\n\n" | |
f"Context:\n{truncate_context(context)}\n\n" | |
f"Student Question: {question}\n" | |
"Assistant Answer:" | |
) | |
def answer_fn(question): | |
docs = vectorstore.similarity_search(question, k=5) | |
if not docs: | |
return "I'm sorry, I couldn't find any relevant information for your query." | |
context = "\n\n".join(d.page_content for d in docs) | |
prompt = format_prompt(context, question) | |
try: | |
response = llm.invoke(prompt).strip() | |
return response | |
except Exception as e: | |
return f"An error occurred: {e}" | |
# Gradio Interface | |
def chat_fn(user_message, history): | |
bot_response = answer_fn(user_message) | |
history = history + [{"role": "user", "content": user_message}, {"role": "assistant", "content": bot_response}] | |
return history, history | |
#def chat_fn(user_message, history): | |
# bot_response = answer_fn(user_message) | |
# history = history + [(user_message, bot_response)] | |
# return history, history | |
with gr.Blocks() as demo: | |
gr.Markdown("## 📘 University of Hull Assistant") | |
#chatbot = gr.Chatbot() | |
chatbot = gr.Chatbot(label="University of Hull Assistant", type="messages") | |
state = gr.State([]) | |
user_input = gr.Textbox(placeholder="Ask a question about University of Hull...", show_label=False) | |
user_input.submit(fn=chat_fn, inputs=[user_input, state], outputs=[chatbot, state]) | |
# demo.launch() | |
if __name__ == "__main__": | |
demo.launch(show_api=False) | |