adgw
/

Joblib
quality_classifier_pl / main_jsonl.py
adgw's picture
fix
aa2da0e verified
# -*- coding: utf-8 -*-
"""
Skrypt do masowego przetwarzania plik贸w JSONL w celu klasyfikacji jako艣ci tekstu.
Ten modu艂 jest przeznaczony do wydajnej analizy du偶ych zbior贸w danych.
Skanuje folder wej艣ciowy w poszukiwaniu plik贸w .jsonl, przetwarza ka偶dy z nich
r贸wnolegle z u偶yciem wielu proces贸w (`multiprocessing`), a nast臋pnie zapisuje
wyniki do nowego pliku w folderze wyj艣ciowym, zachowuj膮c oryginaln膮 struktur臋
danych i dodaj膮c wyniki klasyfikacji.
"""
# --- Importy bibliotek ---
import os
import glob
import time
import pickle
import joblib
import pandas as pd
import json
import numpy as np
from tqdm import tqdm
from typing import List
from text_analyzer.analyzer import TextAnalyzer
from text_analyzer import constants
# --- 艁adowanie modeli i konfiguracja ---
with open('models/scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
classifier = joblib.load("models/model.joblib")
text_analyzer = TextAnalyzer()
batch_size = 10
class NumpyJSONEncoder(json.JSONEncoder):
"""
Specjalny enkoder JSON do obs艂ugi typ贸w danych z NumPy,
kt贸re nie s膮 domy艣lnie serializowalne.
"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super(NumpyJSONEncoder, self).default(obj)
# --- Definicje funkcji ---
def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]:
"""
Przetwarza ca艂膮 list臋 tekst贸w wsadowo i zwraca list臋 predykcji.
"""
all_features = []
# Krok 1: Ekstrakcja cech dla wszystkich tekst贸w
feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size)
for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"):
ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER]
all_features.append(ordered_features)
if not all_features:
return []
# Krok 2: Przygotowanie i skalowanie wszystkich wektor贸w naraz
features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER)
features_scaled = scaler_model.transform(features_df)
# Krok 3: Predykcja dla ca艂ej paczki
pred_probas = classifier_model.predict_proba(features_scaled)
# Krok 4: Przetworzenie wynik贸w
results = []
labels = ["LOW", "MEDIUM", "HIGH"]
for single_pred_proba in pred_probas:
category_prob = {
label: prob
for label, prob in zip(labels, single_pred_proba)
}
# Sortujemy, aby znale藕膰 kategori臋 z najwy偶szym prawdopodobie艅stwem
sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True)
most_probable_category, confidence = sorted_category_prob[0]
results.append((most_probable_category, round(float(confidence) * 100, 2)))
return results
def process_jsonl_file(input_file: str, output_file: str):
"""Orkiestruje proces przetwarzania pojedynczego pliku .jsonl wsadowo."""
original_data = []
texts_to_process = []
try:
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
json_object = json.loads(line)
original_data.append(json_object)
texts_to_process.append(json_object.get('text', ''))
except Exception as e:
print(f"Nie uda艂o si臋 wczyta膰 pliku {input_file}. B艂膮d: {e}")
return
print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...")
# Wywo艂ujemy funkcj臋 wsadow膮
results = predict_batch(texts_to_process, text_analyzer, scaler, classifier)
# Zapisywanie wynik贸w
try:
with open(output_file, 'w', encoding='utf-8') as f:
for i, (category, confidence) in enumerate(results):
output_object = original_data[i]
output_object['quality_ai'] = category
output_object['confidence'] = confidence
json_line = json.dumps(output_object, ensure_ascii=False, cls=NumpyJSONEncoder)
f.write(json_line + '\n')
except Exception as e:
print(f"Nie uda艂o si臋 zapisa膰 pliku {output_file}. B艂膮d: {e}")
# --- G艂贸wny blok wykonawczy ---
if __name__ == '__main__':
print("Inicjalizacja skryptu przetwarzania wsadowego...")
INPUT_FOLDER = 'input_jsonl'
OUTPUT_FOLDER = 'output'
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
# Skanowanie plik贸w
jsonl_files = glob.glob(os.path.join(INPUT_FOLDER, '*.jsonl'))
for file_path in jsonl_files:
start_time = time.time()
output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path))
if os.path.exists(output_file):
print(f"POMIJAM - plik ju偶 istnieje: {output_file}")
continue
print(f"\n--- Przetwarzanie pliku: {file_path} ---")
process_jsonl_file(file_path, output_file)
end_time = time.time()
print(f"Processing time: {end_time - start_time:.4f} seconds")
print("\nWszystkie pliki zosta艂y przetworzone!")