Video-R1 / create_data.py
DingZhenDojoCat's picture
Add files using upload-large-folder tool
bb7f76d verified
# import re
# from pathlib import Path
# from datasets import load_dataset, Dataset, DatasetDict, Features, Value, Image
# import re
# from typing import Dict, List, Optional
# from pathlib import Path
# from datasets import Dataset, DatasetDict, concatenate_datasets, Features, Value, Sequence
# # ------------------------------------------------------------------
# # 0) Load your JSON β†’ `raw_ds` exactly as before
# # ------------------------------------------------------------------
# files = [
# "pool_multiple_choice_chunk_01.json",
# "pool_multiple_choice_chunk_02.json",
# "pool_multiple_choice_chunk_03.json",
# "pool_multiple_choice_chunk_04.json",
# "pool_numerical_chunk_01.json",
# "pool_numerical_chunk_02.json",
# "pool_numerical_chunk_03.json",
# "pool_regression_chunk_01.json",
# ]
# # ---- 1-4. load, trim, normalise ----------------------------------------
# def load_trim_normalise(fp, cap=10_000):
# ds = Dataset.from_json(fp)
# # a) truncate
# ds = ds.select(range(min(cap, len(ds))))
# # b) make sure `options` exists and is always list[str]
# if "options" not in ds.column_names:
# ds = ds.add_column("options", [[]] * len(ds))
# else:
# ds = ds.map(
# lambda ex: {"options": [str(o) for o in (ex["options"] or [])]},
# remove_columns=[], num_proc=4,
# )
# return ds
# ds_list = [load_trim_normalise(fp) for fp in files]
# # ---- 4. align feature schema explicitly (all files now identical) -------
# common_features = Features({
# "problem_id" : Value("int64"),
# "problem" : Value("string"),
# "data_type" : Value("string"),
# "problem_type": Value("string"),
# "options" : Sequence(Value("string")),
# "solution" : Value("string"),
# "path" : Value("string"),
# "data_source" : Value("string"),
# })
# ds_list = [d.cast(common_features) for d in ds_list]
# # ---- 5. concatenate -----------------------------------------------------
# raw_train = concatenate_datasets(ds_list)
# raw_ds = DatasetDict({"train": raw_train})
# # ------------------------------------------------------------------
# # 1) Build the question (unchanged)
# # ------------------------------------------------------------------
# def build_question(example):
# q = (
# example["problem"] + " Options:\n" + "\n".join(example["options"])
# if example["problem_type"] == "multiple choice"
# else example["problem"]
# )
# example["problem"] = q
# return example
# def extract_answer(predict: str) -> Optional[str]:
# """
# Extracts the content of the <answer>…</answer> block from `predict`.
# Returns the inner text (with leading/trailing whitespace stripped),
# or None if no <answer> tag is found.
# """
# match = re.search(r"<answer>([\s\S]*?)</answer>", predict, re.DOTALL)
# if not match:
# return predict
# return match.group(1).strip()
# def add_answer(example):
# # assumes the ground-truth answer (tagged) is in `solution`
# example["answer"] = extract_answer(example["solution"])
# return example
# # ------------------------------------------------------------------
# # 3) Embed image bytes (column name stays "images")
# # ------------------------------------------------------------------
# def to_embedded_image(example):
# if example["data_type"] != "image":
# example["images"] = None
# return example
# with open(example["path"], "rb") as f:
# img_bytes = f.read()
# example["images"] = {"bytes": img_bytes, "path": None}
# return example
# # ------------------------------------------------------------------
# # 4) Full pipeline
# # ------------------------------------------------------------------
# processed = (
# raw_ds["train"]
# .map(build_question, num_proc=4)
# .map(add_answer, num_proc=4)
# .map(to_embedded_image, num_proc=4)
# .remove_columns([
# "path", "data_type", "options", "problem_type", "solution",
# "problem_id", "data_source" # ← drop these too
# ])
# )
# # ------------------------------------------------------------------
# # 5) Schema must match the final column names
# # ------------------------------------------------------------------
# features = Features({
# "problem": Value("string"),
# "answer" : Value("string"),
# "images" : Image(), # keep plural name
# })
# processed = processed.cast(features)
# # ------------------------------------------------------------------
# # 6) Write Parquet shards (file prefix inside the folder)
# # ------------------------------------------------------------------
# out_dir = Path("qwen2.5_vl_portable")
# out_dir.mkdir(parents=True, exist_ok=True)
# # processed.to_parquet(str(out_dir / "train.parquet")) # β†’ train-00000-of-00001.parquet
# processed.to_parquet(str("./hf_data/train.parquet"))
# print("βœ“ Dataset written with embedded images and answers β†’", out_dir.resolve())
# import re
# from pathlib import Path
# from typing import Dict, List, Optional
# from datasets import (
# Dataset,
# DatasetDict,
# concatenate_datasets,
# Features,
# Value,
# Sequence,
# Image,
# )
# # ------------------------------------------------------------------
# # 0) Inputs
# # ------------------------------------------------------------------
# files = [
# "pool_multiple_choice_chunk_01.json",
# "pool_multiple_choice_chunk_02.json",
# "pool_multiple_choice_chunk_03.json",
# "pool_multiple_choice_chunk_04.json",
# "pool_numerical_chunk_01.json",
# "pool_numerical_chunk_02.json",
# "pool_numerical_chunk_03.json",
# "pool_regression_chunk_01.json",
# ]
# # ------------------------------------------------------------------
# # 1) Define common meta schema (what you want to keep in the output)
# # ------------------------------------------------------------------
# common_features = Features({
# "problem_id" : Value("int64"),
# "problem" : Value("string"),
# "data_type" : Value("string"),
# "problem_type": Value("string"),
# "options" : Sequence(Value("string")),
# "solution" : Value("string"),
# "path" : Value("string"),
# "data_source" : Value("string"),
# })
# # Final (superset) schema to write: meta + new columns
# full_features = common_features.copy()
# full_features["answer"] = Value("string")
# full_features["images"] = Image() # plural name kept, binary-friendly
# # ------------------------------------------------------------------
# # 2) Load + normalize each JSON
# # ------------------------------------------------------------------
# def load_trim_normalise(fp: str, cap: int = 10_000) -> Dataset:
# ds = Dataset.from_json(fp)
# # truncate if desired
# ds = ds.select(range(min(cap, len(ds))))
# # ensure `options` exists and is always list[str]
# if "options" not in ds.column_names:
# ds = ds.add_column("options", [[]] * len(ds))
# else:
# ds = ds.map(
# lambda ex: {"options": [str(o) for o in (ex["options"] or [])]},
# remove_columns=[],
# num_proc=4,
# )
# # align to the common meta schema early (helps concat)
# # Some JSONs may not have all fields; add missing with defaults first.
# missing_cols = [k for k in common_features.keys() if k not in ds.column_names]
# for mc in missing_cols:
# # create sensible defaults
# if mc == "options":
# ds = ds.add_column(mc, [[]] * len(ds))
# elif common_features[mc].dtype == "int64":
# ds = ds.add_column(mc, [0] * len(ds))
# else:
# ds = ds.add_column(mc, [""] * len(ds))
# ds = ds.cast(common_features)
# return ds
# ds_list = [load_trim_normalise(fp) for fp in files]
# # Concatenate shards
# raw_train = concatenate_datasets(ds_list)
# raw_ds = DatasetDict({"train": raw_train})
# # ------------------------------------------------------------------
# # 3) Processing fns
# # ------------------------------------------------------------------
# def build_question(example: Dict) -> Dict:
# """
# If multiple-choice, append the options to the text.
# Overwrites the `problem` field in-place (kept in output).
# """
# if example["problem_type"] == "multiple choice":
# opts = example.get("options") or []
# q = example["problem"] + " Options:\n" + "\n".join(opts)
# example["problem"] = q
# return example
# def extract_answer(predict: str) -> Optional[str]:
# """
# Return inner text of <answer>...</answer>, stripped.
# If no tag is found, return the original string.
# """
# if predict is None:
# return None
# match = re.search(r"<answer>([\s\S]*?)</answer>", predict, re.DOTALL)
# if not match:
# return predict
# return match.group(1).strip()
# def add_answer(example: Dict) -> Dict:
# example["answer"] = extract_answer(example.get("solution", ""))
# return example
# def to_embedded_image(example: Dict) -> Dict:
# """
# If data_type == 'image', embed bytes for HF Image() feature.
# Otherwise leave as None.
# """
# if example.get("data_type") != "image":
# example["images"] = None
# return example
# path = example.get("path")
# if not path:
# example["images"] = None
# return example
# try:
# with open(path, "rb") as f:
# img_bytes = f.read()
# example["images"] = {"bytes": img_bytes, "path": None}
# except Exception:
# # If image is missing or unreadable, keep None so cast still works
# example["images"] = None
# return example
# # ------------------------------------------------------------------
# # 4) Apply pipeline (do NOT drop meta columns you want to keep)
# # ------------------------------------------------------------------
# processed = (
# raw_ds["train"]
# .map(build_question, num_proc=4)
# .map(add_answer, num_proc=4)
# .map(to_embedded_image, num_proc=4)
# .cast(full_features) # <- ensure final schema
# )
# # Optional: control output column ordering
# processed = processed.select_columns(list(full_features.keys()))
# # ------------------------------------------------------------------
# # 5) Write Parquet
# # ------------------------------------------------------------------
# out_dir = Path("./hf_data")
# out_dir.mkdir(parents=True, exist_ok=True)
# out_path = out_dir / "train.parquet"
# processed.to_parquet(str(out_path))
# print("βœ“ Wrote:", out_path.resolve())
# print("Columns:", list(processed.features.keys()))
# ------------------------------------------------------------------
# 4.1) Downsample to 30k, mainly reducing math-heavy sources
# ------------------------------------------------------------------
from collections import Counter
TARGET_SIZE = 30_000
MATH_SHARE = 0.20 # keep ~20% math (tweak if you want)
SEED = 2025
# Define which sources are "mathy"
MATH_SOURCES = {
"Multimath-300k",
"TabMWP",
"Geometry3K",
"CLEVR-Math",
"DVQA",
"FigureQA",
"ChartQA",
"PlotQA",
"EXAMS-V-train/Mathematics",
"UniGeo",
"GeoQA+",
}
def is_math_source(name: Optional[str]) -> bool:
if not name:
return False
return name in MATH_SOURCES or ("math" in name.lower())
# Split
math_ds = processed.filter(lambda ex: is_math_source(ex.get("data_source")), num_proc=4)
non_math_ds = processed.filter(lambda ex: not is_math_source(ex.get("data_source")), num_proc=4)
# Decide quotas
non_math_quota = min(len(non_math_ds), int(TARGET_SIZE * (1 - MATH_SHARE)))
math_quota = TARGET_SIZE - non_math_quota
math_quota = min(math_quota, len(math_ds)) # guard if math is too small
# Sample deterministically
non_math_sample = non_math_ds.shuffle(seed=SEED).select(range(non_math_quota))
math_sample = math_ds.shuffle(seed=SEED).select(range(math_quota))
# Combine and shuffle
final = concatenate_datasets([non_math_sample, math_sample]).shuffle(seed=SEED)
# Quick sanity printout
cnt = Counter(final["data_source"])
total = len(final)
print(f"Final size: {total} (non-math {non_math_quota}, math {math_quota})")
for name, n in sorted(cnt.items(), key=lambda x: -x[1])[:25]:
pct = n / total
print(f"{name:30s} {n:6d} {pct:7.3%}")
# Use this 'final' dataset for writing
processed = final
out_path = out_dir / "train_30k.parquet"
processed.to_parquet(str(out_path))
print("βœ“ Wrote:", out_path.resolve())