# import re | |
# from pathlib import Path | |
# from datasets import load_dataset, Dataset, DatasetDict, Features, Value, Image | |
# import re | |
# from typing import Dict, List, Optional | |
# from pathlib import Path | |
# from datasets import Dataset, DatasetDict, concatenate_datasets, Features, Value, Sequence | |
# # ------------------------------------------------------------------ | |
# # 0) Load your JSON β `raw_ds` exactly as before | |
# # ------------------------------------------------------------------ | |
# files = [ | |
# "pool_multiple_choice_chunk_01.json", | |
# "pool_multiple_choice_chunk_02.json", | |
# "pool_multiple_choice_chunk_03.json", | |
# "pool_multiple_choice_chunk_04.json", | |
# "pool_numerical_chunk_01.json", | |
# "pool_numerical_chunk_02.json", | |
# "pool_numerical_chunk_03.json", | |
# "pool_regression_chunk_01.json", | |
# ] | |
# # ---- 1-4. load, trim, normalise ---------------------------------------- | |
# def load_trim_normalise(fp, cap=10_000): | |
# ds = Dataset.from_json(fp) | |
# # a) truncate | |
# ds = ds.select(range(min(cap, len(ds)))) | |
# # b) make sure `options` exists and is always list[str] | |
# if "options" not in ds.column_names: | |
# ds = ds.add_column("options", [[]] * len(ds)) | |
# else: | |
# ds = ds.map( | |
# lambda ex: {"options": [str(o) for o in (ex["options"] or [])]}, | |
# remove_columns=[], num_proc=4, | |
# ) | |
# return ds | |
# ds_list = [load_trim_normalise(fp) for fp in files] | |
# # ---- 4. align feature schema explicitly (all files now identical) ------- | |
# common_features = Features({ | |
# "problem_id" : Value("int64"), | |
# "problem" : Value("string"), | |
# "data_type" : Value("string"), | |
# "problem_type": Value("string"), | |
# "options" : Sequence(Value("string")), | |
# "solution" : Value("string"), | |
# "path" : Value("string"), | |
# "data_source" : Value("string"), | |
# }) | |
# ds_list = [d.cast(common_features) for d in ds_list] | |
# # ---- 5. concatenate ----------------------------------------------------- | |
# raw_train = concatenate_datasets(ds_list) | |
# raw_ds = DatasetDict({"train": raw_train}) | |
# # ------------------------------------------------------------------ | |
# # 1) Build the question (unchanged) | |
# # ------------------------------------------------------------------ | |
# def build_question(example): | |
# q = ( | |
# example["problem"] + " Options:\n" + "\n".join(example["options"]) | |
# if example["problem_type"] == "multiple choice" | |
# else example["problem"] | |
# ) | |
# example["problem"] = q | |
# return example | |
# def extract_answer(predict: str) -> Optional[str]: | |
# """ | |
# Extracts the content of the <answer>β¦</answer> block from `predict`. | |
# Returns the inner text (with leading/trailing whitespace stripped), | |
# or None if no <answer> tag is found. | |
# """ | |
# match = re.search(r"<answer>([\s\S]*?)</answer>", predict, re.DOTALL) | |
# if not match: | |
# return predict | |
# return match.group(1).strip() | |
# def add_answer(example): | |
# # assumes the ground-truth answer (tagged) is in `solution` | |
# example["answer"] = extract_answer(example["solution"]) | |
# return example | |
# # ------------------------------------------------------------------ | |
# # 3) Embed image bytes (column name stays "images") | |
# # ------------------------------------------------------------------ | |
# def to_embedded_image(example): | |
# if example["data_type"] != "image": | |
# example["images"] = None | |
# return example | |
# with open(example["path"], "rb") as f: | |
# img_bytes = f.read() | |
# example["images"] = {"bytes": img_bytes, "path": None} | |
# return example | |
# # ------------------------------------------------------------------ | |
# # 4) Full pipeline | |
# # ------------------------------------------------------------------ | |
# processed = ( | |
# raw_ds["train"] | |
# .map(build_question, num_proc=4) | |
# .map(add_answer, num_proc=4) | |
# .map(to_embedded_image, num_proc=4) | |
# .remove_columns([ | |
# "path", "data_type", "options", "problem_type", "solution", | |
# "problem_id", "data_source" # β drop these too | |
# ]) | |
# ) | |
# # ------------------------------------------------------------------ | |
# # 5) Schema must match the final column names | |
# # ------------------------------------------------------------------ | |
# features = Features({ | |
# "problem": Value("string"), | |
# "answer" : Value("string"), | |
# "images" : Image(), # keep plural name | |
# }) | |
# processed = processed.cast(features) | |
# # ------------------------------------------------------------------ | |
# # 6) Write Parquet shards (file prefix inside the folder) | |
# # ------------------------------------------------------------------ | |
# out_dir = Path("qwen2.5_vl_portable") | |
# out_dir.mkdir(parents=True, exist_ok=True) | |
# # processed.to_parquet(str(out_dir / "train.parquet")) # β train-00000-of-00001.parquet | |
# processed.to_parquet(str("./hf_data/train.parquet")) | |
# print("β Dataset written with embedded images and answers β", out_dir.resolve()) | |
# import re | |
# from pathlib import Path | |
# from typing import Dict, List, Optional | |
# from datasets import ( | |
# Dataset, | |
# DatasetDict, | |
# concatenate_datasets, | |
# Features, | |
# Value, | |
# Sequence, | |
# Image, | |
# ) | |
# # ------------------------------------------------------------------ | |
# # 0) Inputs | |
# # ------------------------------------------------------------------ | |
# files = [ | |
# "pool_multiple_choice_chunk_01.json", | |
# "pool_multiple_choice_chunk_02.json", | |
# "pool_multiple_choice_chunk_03.json", | |
# "pool_multiple_choice_chunk_04.json", | |
# "pool_numerical_chunk_01.json", | |
# "pool_numerical_chunk_02.json", | |
# "pool_numerical_chunk_03.json", | |
# "pool_regression_chunk_01.json", | |
# ] | |
# # ------------------------------------------------------------------ | |
# # 1) Define common meta schema (what you want to keep in the output) | |
# # ------------------------------------------------------------------ | |
# common_features = Features({ | |
# "problem_id" : Value("int64"), | |
# "problem" : Value("string"), | |
# "data_type" : Value("string"), | |
# "problem_type": Value("string"), | |
# "options" : Sequence(Value("string")), | |
# "solution" : Value("string"), | |
# "path" : Value("string"), | |
# "data_source" : Value("string"), | |
# }) | |
# # Final (superset) schema to write: meta + new columns | |
# full_features = common_features.copy() | |
# full_features["answer"] = Value("string") | |
# full_features["images"] = Image() # plural name kept, binary-friendly | |
# # ------------------------------------------------------------------ | |
# # 2) Load + normalize each JSON | |
# # ------------------------------------------------------------------ | |
# def load_trim_normalise(fp: str, cap: int = 10_000) -> Dataset: | |
# ds = Dataset.from_json(fp) | |
# # truncate if desired | |
# ds = ds.select(range(min(cap, len(ds)))) | |
# # ensure `options` exists and is always list[str] | |
# if "options" not in ds.column_names: | |
# ds = ds.add_column("options", [[]] * len(ds)) | |
# else: | |
# ds = ds.map( | |
# lambda ex: {"options": [str(o) for o in (ex["options"] or [])]}, | |
# remove_columns=[], | |
# num_proc=4, | |
# ) | |
# # align to the common meta schema early (helps concat) | |
# # Some JSONs may not have all fields; add missing with defaults first. | |
# missing_cols = [k for k in common_features.keys() if k not in ds.column_names] | |
# for mc in missing_cols: | |
# # create sensible defaults | |
# if mc == "options": | |
# ds = ds.add_column(mc, [[]] * len(ds)) | |
# elif common_features[mc].dtype == "int64": | |
# ds = ds.add_column(mc, [0] * len(ds)) | |
# else: | |
# ds = ds.add_column(mc, [""] * len(ds)) | |
# ds = ds.cast(common_features) | |
# return ds | |
# ds_list = [load_trim_normalise(fp) for fp in files] | |
# # Concatenate shards | |
# raw_train = concatenate_datasets(ds_list) | |
# raw_ds = DatasetDict({"train": raw_train}) | |
# # ------------------------------------------------------------------ | |
# # 3) Processing fns | |
# # ------------------------------------------------------------------ | |
# def build_question(example: Dict) -> Dict: | |
# """ | |
# If multiple-choice, append the options to the text. | |
# Overwrites the `problem` field in-place (kept in output). | |
# """ | |
# if example["problem_type"] == "multiple choice": | |
# opts = example.get("options") or [] | |
# q = example["problem"] + " Options:\n" + "\n".join(opts) | |
# example["problem"] = q | |
# return example | |
# def extract_answer(predict: str) -> Optional[str]: | |
# """ | |
# Return inner text of <answer>...</answer>, stripped. | |
# If no tag is found, return the original string. | |
# """ | |
# if predict is None: | |
# return None | |
# match = re.search(r"<answer>([\s\S]*?)</answer>", predict, re.DOTALL) | |
# if not match: | |
# return predict | |
# return match.group(1).strip() | |
# def add_answer(example: Dict) -> Dict: | |
# example["answer"] = extract_answer(example.get("solution", "")) | |
# return example | |
# def to_embedded_image(example: Dict) -> Dict: | |
# """ | |
# If data_type == 'image', embed bytes for HF Image() feature. | |
# Otherwise leave as None. | |
# """ | |
# if example.get("data_type") != "image": | |
# example["images"] = None | |
# return example | |
# path = example.get("path") | |
# if not path: | |
# example["images"] = None | |
# return example | |
# try: | |
# with open(path, "rb") as f: | |
# img_bytes = f.read() | |
# example["images"] = {"bytes": img_bytes, "path": None} | |
# except Exception: | |
# # If image is missing or unreadable, keep None so cast still works | |
# example["images"] = None | |
# return example | |
# # ------------------------------------------------------------------ | |
# # 4) Apply pipeline (do NOT drop meta columns you want to keep) | |
# # ------------------------------------------------------------------ | |
# processed = ( | |
# raw_ds["train"] | |
# .map(build_question, num_proc=4) | |
# .map(add_answer, num_proc=4) | |
# .map(to_embedded_image, num_proc=4) | |
# .cast(full_features) # <- ensure final schema | |
# ) | |
# # Optional: control output column ordering | |
# processed = processed.select_columns(list(full_features.keys())) | |
# # ------------------------------------------------------------------ | |
# # 5) Write Parquet | |
# # ------------------------------------------------------------------ | |
# out_dir = Path("./hf_data") | |
# out_dir.mkdir(parents=True, exist_ok=True) | |
# out_path = out_dir / "train.parquet" | |
# processed.to_parquet(str(out_path)) | |
# print("β Wrote:", out_path.resolve()) | |
# print("Columns:", list(processed.features.keys())) | |
# ------------------------------------------------------------------ | |
# 4.1) Downsample to 30k, mainly reducing math-heavy sources | |
# ------------------------------------------------------------------ | |
from collections import Counter | |
TARGET_SIZE = 30_000 | |
MATH_SHARE = 0.20 # keep ~20% math (tweak if you want) | |
SEED = 2025 | |
# Define which sources are "mathy" | |
MATH_SOURCES = { | |
"Multimath-300k", | |
"TabMWP", | |
"Geometry3K", | |
"CLEVR-Math", | |
"DVQA", | |
"FigureQA", | |
"ChartQA", | |
"PlotQA", | |
"EXAMS-V-train/Mathematics", | |
"UniGeo", | |
"GeoQA+", | |
} | |
def is_math_source(name: Optional[str]) -> bool: | |
if not name: | |
return False | |
return name in MATH_SOURCES or ("math" in name.lower()) | |
# Split | |
math_ds = processed.filter(lambda ex: is_math_source(ex.get("data_source")), num_proc=4) | |
non_math_ds = processed.filter(lambda ex: not is_math_source(ex.get("data_source")), num_proc=4) | |
# Decide quotas | |
non_math_quota = min(len(non_math_ds), int(TARGET_SIZE * (1 - MATH_SHARE))) | |
math_quota = TARGET_SIZE - non_math_quota | |
math_quota = min(math_quota, len(math_ds)) # guard if math is too small | |
# Sample deterministically | |
non_math_sample = non_math_ds.shuffle(seed=SEED).select(range(non_math_quota)) | |
math_sample = math_ds.shuffle(seed=SEED).select(range(math_quota)) | |
# Combine and shuffle | |
final = concatenate_datasets([non_math_sample, math_sample]).shuffle(seed=SEED) | |
# Quick sanity printout | |
cnt = Counter(final["data_source"]) | |
total = len(final) | |
print(f"Final size: {total} (non-math {non_math_quota}, math {math_quota})") | |
for name, n in sorted(cnt.items(), key=lambda x: -x[1])[:25]: | |
pct = n / total | |
print(f"{name:30s} {n:6d} {pct:7.3%}") | |
# Use this 'final' dataset for writing | |
processed = final | |
out_path = out_dir / "train_30k.parquet" | |
processed.to_parquet(str(out_path)) | |
print("β Wrote:", out_path.resolve()) | |