|
import os |
|
import json |
|
import math |
|
import torch |
|
import pandas as pd |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from datasets import Dataset |
|
import transformers |
|
from transformers import AutoModelForCausalLM, DataCollatorForLanguageModeling, Trainer, TrainingArguments |
|
from peft import LoraConfig, get_peft_model |
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
|
|
|
|
|
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" |
|
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
os.environ["HF_DATASETS_CACHE"] = "/tmp/huggingface_cache" |
|
os.environ["HF_METRICS_CACHE"] = "/tmp/huggingface_cache" |
|
os.environ["WANDB_MODE"] = "disabled" |
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
tokenizer = transformers.AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") |
|
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
|
model = None |
|
|
|
|
|
|
|
|
|
class LoraLinear(nn.Module): |
|
def __init__(self, in_features, out_features, r=8, lora_alpha=16, lora_dropout=0.05, bias=False): |
|
super().__init__() |
|
self.in_features = in_features |
|
self.out_features = out_features |
|
self.r = r |
|
self.scaling = lora_alpha / r if r > 0 else 1.0 |
|
self.weight = nn.Parameter(torch.empty(out_features, in_features), requires_grad=False) |
|
self.bias = nn.Parameter(torch.zeros(out_features), requires_grad=False) if bias else None |
|
|
|
if r > 0: |
|
self.lora_A = nn.Parameter(torch.zeros((r, in_features))) |
|
self.lora_B = nn.Parameter(torch.zeros((out_features, r))) |
|
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) |
|
nn.init.zeros_(self.lora_B) |
|
self.lora_dropout = nn.Dropout(p=lora_dropout) |
|
else: |
|
self.lora_A, self.lora_B, self.lora_dropout = None, None, None |
|
|
|
def forward(self, x): |
|
result = F.linear(x, self.weight, self.bias) |
|
if self.r > 0: |
|
lora_out = self.lora_dropout(x) @ self.lora_A.T @ self.lora_B.T |
|
result = result + self.scaling * lora_out |
|
return result |
|
|
|
class MoELoRALinear(nn.Module): |
|
def __init__(self, base_linear, r, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): |
|
super().__init__() |
|
self.base_linear = base_linear |
|
self.num_experts = num_experts |
|
self.k = k |
|
self.experts = nn.ModuleList([ |
|
LoraLinear(base_linear.in_features, base_linear.out_features, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout) |
|
for _ in range(num_experts) |
|
]) |
|
self.gate = nn.Linear(base_linear.in_features, num_experts) |
|
|
|
def forward(self, x): |
|
base_out = self.base_linear(x) |
|
gate_scores = torch.softmax(self.gate(x), dim=-1) |
|
expert_out = 0 |
|
for i, expert in enumerate(self.experts): |
|
expert_out += gate_scores[..., i:i+1] * expert(x) |
|
return base_out + expert_out |
|
|
|
def replace_proj_with_moe_lora(model, r=8, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): |
|
for layer in model.model.layers: |
|
for proj_name in ["up_proj", "down_proj"]: |
|
old = getattr(layer.mlp, proj_name) |
|
moe = MoELoRALinear( |
|
base_linear=old, |
|
r=r, |
|
num_experts=num_experts, |
|
k=k, |
|
lora_alpha=lora_alpha, |
|
lora_dropout=lora_dropout, |
|
).to(next(old.parameters()).device) |
|
setattr(layer.mlp, proj_name, moe) |
|
return model |
|
|
|
|
|
|
|
|
|
def preprocess(example): |
|
tokens = tokenizer(example['text'], truncation=True, padding=False) |
|
text = example['text'] |
|
assistant_index = text.find("<|assistant|>") |
|
prefix_ids = tokenizer(text[:assistant_index], add_special_tokens=False)['input_ids'] |
|
prefix_len = len(prefix_ids) |
|
labels = tokens['input_ids'].copy() |
|
labels[:prefix_len] = [-100] * prefix_len |
|
tokens['labels'] = labels |
|
return tokens |
|
|
|
|
|
|
|
|
|
def load_and_train(model_id="TinyLlama/TinyLlama-1.1B-Chat-v1.0"): |
|
global model |
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
json_file_path = os.path.join(current_dir, 'makemytrip_qa_full.json') |
|
|
|
with open(json_file_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
|
|
df = pd.DataFrame(data) |
|
print(f"Loaded dataset containing {len(df)} questions") |
|
|
|
system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." |
|
training_data = [ |
|
{"text": f"<|system|>\n{system_prompt}</s>\n<|user|>\n{row['question']}</s>\n<|assistant|>\n{row['answer']}</s>"} |
|
for _, row in df.iterrows() |
|
] |
|
dataset = Dataset.from_list(training_data) |
|
tokenized_dataset = dataset.map(preprocess, remove_columns=["text"]) |
|
|
|
base_model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True).to(device) |
|
model = replace_proj_with_moe_lora(base_model) |
|
peft_config = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.05, target_modules=["o_proj"], bias="none", task_type="CAUSAL_LM") |
|
model = get_peft_model(model, peft_config) |
|
|
|
model.config.use_cache = False |
|
model.gradient_checkpointing_disable() |
|
|
|
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) |
|
|
|
training_args = TrainingArguments( |
|
learning_rate=1e-4, |
|
lr_scheduler_type="cosine", |
|
output_dir="./results", |
|
num_train_epochs=10, |
|
per_device_train_batch_size=1, |
|
gradient_accumulation_steps=4, |
|
logging_steps=1, |
|
save_steps=10, |
|
save_total_limit=2, |
|
fp16=True, |
|
bf16=False, |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
args=training_args, |
|
train_dataset=tokenized_dataset, |
|
data_collator=data_collator |
|
) |
|
|
|
print(torch.cuda.is_available()) |
|
print(next(model.parameters()).device) |
|
|
|
print("Training started") |
|
trainer.train() |
|
model.eval() |
|
|
|
|
|
BLOCKED_TERMS = ["weather", "cricket", "movie", "song", "football", "holiday", |
|
"travel", "recipe", "music", "game", "sports", "politics", "election"] |
|
|
|
FINANCE_DOMAINS = [ |
|
"financial reporting", "balance sheet", "income statement", |
|
"assets and liabilities", "equity", "revenue", "profit and loss", |
|
"goodwill impairment", "cash flow", "dividends", "taxation", |
|
"investment", "valuation", "capital structure", "ownership interests", |
|
"subsidiaries", "shareholders equity", "expenses", "earnings", |
|
"debt", "amortization", "depreciation" |
|
] |
|
finance_embeds = embed_model.encode(FINANCE_DOMAINS, convert_to_tensor=True) |
|
|
|
|
|
|
|
|
|
def validate_query(query: str, threshold: float = 0.5) -> bool: |
|
q_lower = query.lower() |
|
if any(bad in q_lower for bad in BLOCKED_TERMS): |
|
print("[Guardrail] Rejected by blocklist.") |
|
return False |
|
q_emb = embed_model.encode(query, convert_to_tensor=True) |
|
sim_scores = util.cos_sim(q_emb, finance_embeds) |
|
max_score = float(sim_scores.max()) |
|
if max_score > threshold: |
|
print(f"[Guardrail] Accepted (semantic match {max_score:.2f})") |
|
return True |
|
else: |
|
print(f"[Guardrail] Rejected (low semantic score {max_score:.2f})") |
|
return False |
|
|
|
|
|
|
|
|
|
def generate_answer(prompt, max_tokens=200): |
|
if prompt.strip() == "": |
|
return "Please enter a prompt!" |
|
|
|
if not validate_query(prompt): |
|
print("Query rejected: Not finance-related.") |
|
return "Query rejected: Please ask finance-related questions." |
|
|
|
system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." |
|
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}] |
|
input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
inputs = tokenizer(input_text, return_tensors="pt").to(device) |
|
|
|
with torch.no_grad(): |
|
outputs = model.generate( |
|
**inputs, |
|
max_new_tokens=max_tokens, |
|
do_sample=True, |
|
top_p=0.9, |
|
temperature=0.7, |
|
) |
|
|
|
decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
answer_start_token = '<|assistant|>' |
|
answer_start_index = decoded_output.rfind(answer_start_token) |
|
if answer_start_index != -1: |
|
generated_answer = decoded_output[answer_start_index + len(answer_start_token):].strip() |
|
if generated_answer.endswith('</s>'): |
|
generated_answer = generated_answer[:-len('</s>')].strip() |
|
else: |
|
generated_answer = "Could not extract answer from model output." |
|
|
|
return generated_answer |