Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import pandas as pd | |
from typing import Optional, List, Dict | |
from validation import _prepare_generic, _nfkc | |
TX_EXPECTED = { | |
"transaction_id":["txn_id","transactionid","id","tx_id"], | |
"customer_id":["cust_id","user_id","client_id"], | |
"amount":["amt","amount_inr","value"], | |
"timestamp":["date","event_time","created_at","tx_time"], | |
"merchant_category":["mcc","merchant_cat","category"] | |
} | |
def prepare_transactions(df: pd.DataFrame): | |
return _prepare_generic(df, TX_EXPECTED) | |
def detect_transactions(clean_df: pd.DataFrame, colmap: Dict[str,str], high_risk_mcc: Optional[List[str]] = None): | |
high_risk = set(["HIGH_RISK","GAMBLING","CRYPTO_EXCHANGE","ESCORTS","CASINO"]) | |
if high_risk_mcc: | |
high_risk.update([_nfkc(x).strip().upper().replace(" ","_") for x in high_risk_mcc]) | |
if not all(k in colmap for k in ["customer_id","amount"]): | |
return pd.DataFrame(), "Required columns missing for detection (need at least customer_id, amount)." | |
df = clean_df.copy() | |
reasons = [] | |
amtcol = colmap.get("amount") | |
if amtcol: | |
reasons.append(df[amtcol] > 10000) # large | |
reasons.append(df[amtcol] < 0) # negative | |
if "merchant_category" in colmap: | |
mcc = colmap["merchant_category"] | |
high = df[mcc].astype(str).str.upper().str.replace(" ","_", regex=False).isin(high_risk) | |
reasons.append(high) | |
if all(k in colmap for k in ["customer_id","timestamp","amount"]): | |
cid, ts, amt = colmap["customer_id"], colmap["timestamp"], colmap["amount"] | |
daily = df.groupby([cid, df[ts].dt.date])[amt].transform("sum") | |
reasons.append(daily > 50000) | |
mask = None | |
for m in reasons: | |
mask = m if mask is None else (mask | m) | |
flagged = df[mask] if mask is not None else pd.DataFrame() | |
if not flagged.empty: | |
rr=[] | |
for _, row in flagged.iterrows(): | |
hits=[] | |
if amtcol: | |
a=row[amtcol] | |
if pd.notna(a) and a>10000: hits.append("large_amount") | |
if pd.notna(a) and a<0: hits.append("negative_amount") | |
if "merchant_category" in colmap: | |
val = str(row[colmap["merchant_category"]]).upper().replace(" ","_") | |
if val in high_risk: hits.append("mcc_high_risk") | |
try: | |
if all(k in colmap for k in ["customer_id","timestamp","amount"]): | |
sub = df[(df[colmap["customer_id"]]==row[colmap["customer_id"]]) & | |
(df[colmap["timestamp"]].dt.date==pd.to_datetime(row[colmap["timestamp"]], errors="coerce").date())] | |
if sub[colmap["amount"]].sum() > 50000: hits.append("daily_sum>50k") | |
except Exception: pass | |
rr.append(", ".join(sorted(set(hits))) or "rule_hit") | |
flagged = flagged.assign(risk_reason=rr) | |
stats = f"Transactions flagged: {len(flagged)} of {len(df)}." | |
return flagged, stats |