liuyang commited on
Commit
a4ab88e
·
1 Parent(s): a095ed4

Add AudioJob integration to app.py with UI for running audio jobs and handling manifests. Updated requirements.txt to include webrtcvad and boto3.

Browse files
Files changed (3) hide show
  1. app.py +58 -0
  2. audiojob.py +1016 -0
  3. requirements.txt +3 -1
app.py CHANGED
@@ -2,6 +2,11 @@ import gradio as gr
2
  import requests
3
  import tempfile
4
  import os
 
 
 
 
 
5
  from pydub import AudioSegment
6
  from typing import Optional, Tuple
7
  import logging
@@ -240,6 +245,59 @@ with gr.Blocks(title="Audio Editor", theme=gr.themes.Soft()) as demo:
240
  outputs=[audio_output, status_output]
241
  )
242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
  # Launch the app
244
  if __name__ == "__main__":
245
  demo.launch()
 
2
  import requests
3
  import tempfile
4
  import os
5
+ import json
6
+ import traceback
7
+
8
+ # AudioJob integration
9
+ from audiojob import AudioJobRunner, LocalStorageAdapter
10
  from pydub import AudioSegment
11
  from typing import Optional, Tuple
12
  import logging
 
245
  outputs=[audio_output, status_output]
246
  )
247
 
248
+ with gr.Tab("AudioJob Runner"):
249
+ gr.Markdown("### AudioJob: preprocess -> split (inspect manifest)")
250
+ with gr.Row():
251
+ with gr.Column():
252
+ aj_source_input = gr.Textbox(
253
+ label="Source URI",
254
+ placeholder="e.g. /abs/path/to/file.wav or s3://bucket/key",
255
+ info="Source URI for AudioJobRunner"
256
+ )
257
+ aj_manifest_input = gr.Textbox(
258
+ label="Manifest JSON (optional)",
259
+ placeholder="Paste existing manifest JSON to resume (optional)",
260
+ lines=10
261
+ )
262
+ aj_s3_prefix = gr.Textbox(
263
+ label="S3 Prefix",
264
+ placeholder="Optional prefix for uploaded working copies (e.g. jobs/)",
265
+ info="Uploaded keys will be prefixed with this value",
266
+ )
267
+ aj_run_button = gr.Button("Run AudioJob", variant="primary")
268
+ with gr.Column():
269
+ aj_output = gr.Textbox(label="AudioJob Output (manifest)", lines=30, interactive=False)
270
+
271
+ def run_audiojob_ui(source_uri: str, manifest_json: str, s3_prefix: str) -> str:
272
+ try:
273
+ manifest = None
274
+ if manifest_json and manifest_json.strip():
275
+ manifest = json.loads(manifest_json)
276
+
277
+ work_root = tempfile.mkdtemp(prefix="audiojob_")
278
+ storage = LocalStorageAdapter()
279
+ # allow presets from top-level presets if desired; using defaults here
280
+ runner = AudioJobRunner(
281
+ manifest=manifest,
282
+ source_uri=None if manifest else source_uri,
283
+ work_root=work_root,
284
+ storage=storage,
285
+ presets={
286
+ # Read bucket and endpoint from environment where possible
287
+ "s3_bucket": os.environ.get("S3_BUCKET"),
288
+ "s3_region": "auto",
289
+ "s3_prefix": s3_prefix or "",
290
+ "s3_endpoint": os.environ.get("S3_ENDPOINT", "")
291
+ }
292
+ )
293
+ out_manifest = runner.run_until_split()
294
+ return json.dumps(out_manifest, ensure_ascii=False, indent=2)
295
+ except Exception as e:
296
+ tb = traceback.format_exc()
297
+ return f"Error: {e}\n\n{tb}"
298
+
299
+ aj_run_button.click(fn=run_audiojob_ui, inputs=[aj_source_input, aj_manifest_input], outputs=[aj_output])
300
+
301
  # Launch the app
302
  if __name__ == "__main__":
303
  demo.launch()
audiojob.py ADDED
@@ -0,0 +1,1016 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ Audio preprocess runner v2 (until split), checkpointable & resumable.
5
+
6
+ Key upgrades:
7
+ - Robust subprocess with retries/backoff/timeout
8
+ - Content-addressed cache for preprocess outputs
9
+ - Unique temp files + cleanup
10
+ - Explicit stream mapping (-map 0:a:0)
11
+ - Filter availability detection with graceful fallbacks
12
+ - VAD streams PCM from ffmpeg (no giant temp WAV), with progress updates
13
+ - Split plan exposes per-channel source_uris (for virtual slicing)
14
+ - Stage error breadcrumbs + atomic manifest rev bumps
15
+ - Pluggable StorageAdapter (LocalStorageAdapter provided)
16
+
17
+ Author: you
18
+ """
19
+
20
+ import os
21
+ import re
22
+ import io
23
+ import sys
24
+ import json
25
+ import math
26
+ import time
27
+ import shutil
28
+ import hashlib
29
+ import tempfile
30
+ import datetime as dt
31
+ import subprocess
32
+ import uuid
33
+ from typing import Optional, Dict, Any, List, Tuple, BinaryIO
34
+
35
+ # ============================================================
36
+ # Storage Adapters
37
+ # ============================================================
38
+
39
+ class StorageAdapter:
40
+ """Abstract interface for reading/writing blobs and metadata."""
41
+ def exists(self, uri: str) -> bool: raise NotImplementedError
42
+ def open_read(self, uri: str) -> BinaryIO: raise NotImplementedError
43
+ def open_write(self, uri: str) -> BinaryIO: raise NotImplementedError
44
+ def save_json(self, uri: str, obj: dict) -> None:
45
+ with self.open_write(uri) as f:
46
+ f.write(json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8"))
47
+ def load_json(self, uri: str) -> dict:
48
+ with self.open_read(uri) as f:
49
+ return json.loads(f.read().decode("utf-8"))
50
+ def stat(self, uri: str) -> Dict[str, Any]:
51
+ """Return {'bytes': int|None, 'sha256': str|None, 'etag': str|None} where possible."""
52
+ return {"bytes": None, "sha256": None, "etag": None}
53
+ def presign(self, uri: str, method: str = "GET", ttl: int = 3600) -> str:
54
+ """Return a URL suitable for HTTP reads/writes. Local adapter may return a file:// path."""
55
+ return uri
56
+
57
+ class LocalStorageAdapter(StorageAdapter):
58
+ """Treats absolute paths as 'uris' (file system)."""
59
+ def exists(self, uri: str) -> bool:
60
+ return os.path.exists(uri)
61
+ def open_read(self, uri: str) -> BinaryIO:
62
+ return open(uri, "rb")
63
+ def open_write(self, uri: str) -> BinaryIO:
64
+ os.makedirs(os.path.dirname(uri), exist_ok=True)
65
+ return open(uri, "wb")
66
+ def stat(self, uri: str) -> Dict[str, Any]:
67
+ if not os.path.exists(uri):
68
+ return {"bytes": None, "sha256": None, "etag": None}
69
+ st = os.stat(uri)
70
+ return {"bytes": st.st_size, "sha256": None, "etag": None}
71
+ def presign(self, uri: str, method: str = "GET", ttl: int = 3600) -> str:
72
+ return uri # consumers should handle file paths
73
+
74
+ # Stub you can implement for S3/R2 later:
75
+ class S3LikeStorageAdapter(StorageAdapter):
76
+ """S3-like storage adapter using boto3. Exposes simple operations for
77
+ checking existence, reading, writing (via temp files) and presigning URLs.
78
+
79
+ Usage:
80
+ adapter = S3LikeStorageAdapter(bucket="my-bucket", region_name="us-east-1")
81
+ adapter.upload_file(local_path, key)
82
+ url = adapter.presign(key, "GET", ttl=3600)
83
+ """
84
+ def __init__(self, bucket: str, region_name: Optional[str] = None,
85
+ aws_access_key_id: Optional[str] = None,
86
+ aws_secret_access_key: Optional[str] = None,
87
+ aws_session_token: Optional[str] = None,
88
+ endpoint_url: Optional[str] = None):
89
+ try:
90
+ import boto3
91
+ except Exception:
92
+ raise RuntimeError("boto3 is required for S3LikeStorageAdapter but is not installed")
93
+ session_kwargs = {}
94
+ if aws_access_key_id and aws_secret_access_key:
95
+ session_kwargs.update({
96
+ "aws_access_key_id": aws_access_key_id,
97
+ "aws_secret_access_key": aws_secret_access_key,
98
+ })
99
+ if aws_session_token:
100
+ session_kwargs["aws_session_token"] = aws_session_token
101
+
102
+ session = boto3.session.Session(**session_kwargs)
103
+ client_kwargs = {"region_name": region_name}
104
+ if endpoint_url:
105
+ client_kwargs["endpoint_url"] = endpoint_url
106
+ self.s3 = session.client("s3", **client_kwargs)
107
+ self.bucket = bucket
108
+
109
+ def exists(self, key: str) -> bool:
110
+ try:
111
+ self.s3.head_object(Bucket=self.bucket, Key=key)
112
+ return True
113
+ except Exception:
114
+ return False
115
+
116
+ def open_read(self, key: str) -> BinaryIO:
117
+ # Download into memory (caller should avoid very large files via this API)
118
+ obj = self.s3.get_object(Bucket=self.bucket, Key=key)
119
+ body = obj["Body"].read()
120
+ return io.BytesIO(body)
121
+
122
+ def open_write(self, key: str) -> BinaryIO:
123
+ # Provide a temp file that will be uploaded on close
124
+ tmp = tempfile.NamedTemporaryFile(delete=False)
125
+
126
+ class _S3Writer(io.BufferedWriter):
127
+ def __init__(self, tmp_file_path: str, outer: "S3LikeStorageAdapter", key: str):
128
+ self._tmp_path = tmp_file_path
129
+ self._outer = outer
130
+ self._key = key
131
+ f = open(tmp_file_path, "r+b")
132
+ super().__init__(f)
133
+
134
+ def close(self):
135
+ try:
136
+ super().close()
137
+ finally:
138
+ # Upload using boto3's upload_file which handles multipart for large files
139
+ try:
140
+ self._outer.s3.upload_file(self._tmp_path, self._outer.bucket, self._key)
141
+ except Exception as e:
142
+ raise
143
+ finally:
144
+ try:
145
+ os.remove(self._tmp_path)
146
+ except Exception:
147
+ pass
148
+
149
+ tmp_path = tmp.name
150
+ tmp.close()
151
+ return _S3Writer(tmp_path, self, key)
152
+
153
+ def stat(self, key: str) -> Dict[str, Any]:
154
+ try:
155
+ r = self.s3.head_object(Bucket=self.bucket, Key=key)
156
+ return {"bytes": int(r.get("ContentLength", 0)), "sha256": None, "etag": r.get("ETag")}
157
+ except Exception:
158
+ return {"bytes": None, "sha256": None, "etag": None}
159
+
160
+ def presign(self, key: str, method: str = "GET", ttl: int = 3600) -> str:
161
+ # Return a presigned URL for the given key
162
+ params = {"Bucket": self.bucket, "Key": key}
163
+ http_method = method.upper()
164
+ return self.s3.generate_presigned_url(
165
+ ClientMethod="get_object" if http_method == "GET" else "put_object",
166
+ Params=params,
167
+ ExpiresIn=int(ttl),
168
+ )
169
+
170
+ def upload_file(self, local_path: str, key: str) -> str:
171
+ """Upload a local file to the S3 bucket under `key`.
172
+ Uses boto3's managed uploader which supports multipart uploads for large files.
173
+ Returns the uploaded key on success.
174
+ """
175
+ self.s3.upload_file(local_path, self.bucket, key)
176
+ return key
177
+
178
+ # ============================================================
179
+ # Utilities
180
+ # ============================================================
181
+
182
+ def utc_now_iso() -> str:
183
+ return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
184
+
185
+ def sha256_file(path: str, chunk_size: int = 1024 * 1024) -> str:
186
+ h = hashlib.sha256()
187
+ with open(path, "rb") as f:
188
+ for chunk in iter(lambda: f.read(chunk_size), b""):
189
+ h.update(chunk)
190
+ return h.hexdigest()
191
+
192
+ def clamp(v: float, lo: float, hi: float) -> float:
193
+ return max(lo, min(hi, v))
194
+
195
+ def sec_to_hms(seconds: float) -> str:
196
+ s = max(0.0, float(seconds))
197
+ h = int(s // 3600)
198
+ m = int((s % 3600) // 60)
199
+ ss = s - h * 3600 - m * 60
200
+ return f"{h:02d}:{m:02d}:{ss:06.3f}"
201
+
202
+ def float_or_none(x: Any) -> Optional[float]:
203
+ try: return float(x)
204
+ except Exception: return None
205
+
206
+ # Robust subprocess with retries/backoff/timeout
207
+ def run(cmd: List[str], timeout: Optional[int] = None) -> Tuple[int, str, str]:
208
+ proc = subprocess.Popen(
209
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
210
+ )
211
+ try:
212
+ out, err = proc.communicate(timeout=timeout)
213
+ except subprocess.TimeoutExpired:
214
+ proc.kill()
215
+ out, err = proc.communicate()
216
+ return 124, out, err + "\nTIMEOUT"
217
+ return proc.returncode, out, err
218
+
219
+ def run_with_retry(cmd: List[str], retries: int = 3, timeout: Optional[int] = None, backoff: float = 1.5) -> str:
220
+ last = None
221
+ for i in range(retries):
222
+ code, out, err = run(cmd, timeout)
223
+ if code == 0:
224
+ return out
225
+ last = (code, err)
226
+ time.sleep(backoff ** i)
227
+ raise RuntimeError(f"Command failed after {retries} attempts: {' '.join(cmd)}\n{last}")
228
+
229
+ # ============================================================
230
+ # Defaults / presets
231
+ # ============================================================
232
+
233
+ DEFAULT_PRESETS = {
234
+ "materialize_chunks": False, # virtual slicing by default
235
+ "sample_rate_target": 16000,
236
+ "container_target": "flac",
237
+ "channel_policy": "auto", # auto | split | downmix | keep
238
+ "normalize": "light", # none | light | r128
239
+ "denoise": "auto", # none | light | auto
240
+ "chunk_policy": "vad_fallback_fixed",
241
+ "chunk_target_ms": 1800000,
242
+ "overlap_ms": 300,
243
+ "vad_aggressiveness": 2, # 0..3 if webrtcvad available
244
+ "highpass_hz": 60,
245
+ "max_gain_db": 6.0,
246
+ "min_mean_dbfs_for_gain": -30.0,
247
+ "stereo_side_mid_threshold_db": 20.0, # side <= mid - 20 dB => mono OK
248
+ "ff_timeout_sec": 600, # per ffmpeg/ffprobe call
249
+ "ff_retries": 3,
250
+ # Optional S3 uploader settings (for working copy uploads only)
251
+ "s3_bucket": None,
252
+ "s3_region": None,
253
+ "s3_prefix": "",
254
+ }
255
+
256
+ # ============================================================
257
+ # Main runner
258
+ # ============================================================
259
+
260
+ class AudioJobRunner:
261
+ """
262
+ Drives the workflow until 'split' is complete (transcription not included).
263
+ Now with retries, content-addressed cache, streaming VAD, and StorageAdapter.
264
+
265
+ Usage:
266
+ storage = LocalStorageAdapter()
267
+ runner = AudioJobRunner(
268
+ manifest=None,
269
+ source_uri="/abs/path/to/audio.wav", # or r2://bucket/key if your adapter supports it
270
+ work_root="/tmp/jobwork",
271
+ storage=storage,
272
+ presets={"chunk_target_ms": 45000}
273
+ )
274
+ manifest = runner.run_until_split()
275
+ """
276
+
277
+ def __init__(
278
+ self,
279
+ manifest: Optional[Dict[str, Any]],
280
+ source_uri: Optional[str], # should always be a url, upstream should determine and convert the filekey to url with proper domain
281
+ work_root: str,
282
+ storage: StorageAdapter,
283
+ presets: Optional[Dict[str, Any]] = None,
284
+ ):
285
+ self.storage = storage
286
+ self.work_root = os.path.abspath(work_root)
287
+ os.makedirs(self.work_root, exist_ok=True)
288
+
289
+ self.presets = dict(DEFAULT_PRESETS)
290
+ if presets:
291
+ self.presets.update(presets)
292
+
293
+ # Detect tools & filters
294
+ self.tool_versions = self._detect_tool_versions()
295
+ self.filter_caps = self._detect_filter_caps()
296
+
297
+ # Initialize or load manifest
298
+ if manifest is None:
299
+ if not source_uri:
300
+ raise ValueError("source_uri is required for a new job.")
301
+ self.manifest = self._init_manifest(source_uri)
302
+ else:
303
+ self.manifest = manifest
304
+ self.manifest.setdefault("tool_versions", self.tool_versions)
305
+
306
+ self.manifest.setdefault("version", "2.0")
307
+ self.manifest.setdefault("rev", 0)
308
+ self._touch_updated()
309
+
310
+ # --------------------------------------------------------
311
+ # Public API
312
+ # --------------------------------------------------------
313
+
314
+ def run_until_split(self) -> Dict[str, Any]:
315
+ try:
316
+ if self._stage_status("probe") != "done":
317
+ self._run_probe()
318
+
319
+ if self._stage_status("preprocess") != "done":
320
+ self._run_preprocess()
321
+
322
+ if self._stage_status("split") != "done":
323
+ self._run_split_plan()
324
+
325
+ return self.manifest
326
+
327
+ except Exception as e:
328
+ # Leave breadcrumb on the current running stage if any
329
+ for stage in ("split", "preprocess", "probe"):
330
+ if self._stage_status(stage) == "running":
331
+ self._set_stage(stage, "failed", 0.0, {"last_error": str(e), "ended_at": utc_now_iso()})
332
+ break
333
+ raise
334
+
335
+ # --------------------------------------------------------
336
+ # Manifest helpers
337
+ # --------------------------------------------------------
338
+
339
+ def _init_manifest(self, source_uri: str) -> Dict[str, Any]:
340
+ job_id = str(uuid.uuid4())
341
+ m = {
342
+ "version": "2.0",
343
+ "rev": 0,
344
+ "job_id": job_id,
345
+ "created_at": utc_now_iso(),
346
+ "updated_at": utc_now_iso(),
347
+ "source": {
348
+ "uri": source_uri,
349
+ "sha256": None,
350
+ "etag": None,
351
+ "bytes": None,
352
+ "container": None,
353
+ "codec": None,
354
+ "duration_ms": None,
355
+ "sample_rate": None,
356
+ "channels": None,
357
+ },
358
+ "tool_versions": self.tool_versions,
359
+ "policy": dict(self.presets),
360
+ "stages": {
361
+ "probe": {"status": "pending", "progress": 0.0},
362
+ "preprocess": {"status": "pending", "progress": 0.0},
363
+ "split": {"status": "pending", "progress": 0.0},
364
+ "transcribe": {"status": "pending", "progress": 0.0},
365
+ },
366
+ "stitch": {"status": "pending", "progress": 0.0},
367
+ "outputs": {
368
+ "transcript_uri": None,
369
+ "srt_uri": None,
370
+ "vtt_uri": None,
371
+ "txt_uri": None,
372
+ "qc": {"passed": None, "issues": []},
373
+ },
374
+ }
375
+ return m
376
+
377
+ def _touch_updated(self):
378
+ self.manifest["updated_at"] = utc_now_iso()
379
+ self.manifest["rev"] = int(self.manifest.get("rev", 0)) + 1
380
+
381
+ def _stage_status(self, name: str) -> str:
382
+ return self.manifest.get("stages", {}).get(name, {}).get("status", "pending")
383
+
384
+ def _set_stage(self, name: str, status: str, progress: float, extra: Dict[str, Any] = None):
385
+ st = self.manifest["stages"].setdefault(name, {})
386
+ st["status"] = status
387
+ st["progress"] = clamp(progress, 0.0, 1.0)
388
+ if extra:
389
+ st.update(extra)
390
+ self._touch_updated()
391
+
392
+ # --------------------------------------------------------
393
+ # Tool/Filter detection
394
+ # --------------------------------------------------------
395
+
396
+ def _detect_tool_versions(self) -> Dict[str, str]:
397
+ vers = {}
398
+ for tool in ("ffmpeg", "ffprobe"):
399
+ try:
400
+ out = run_with_retry([tool, "-version"], retries=1, timeout=10)
401
+ first = out.splitlines()[0]
402
+ m = re.search(r"version\s+([^\s]+)", first)
403
+ vers[tool] = m.group(1) if m else first
404
+ except Exception:
405
+ vers[tool] = "unknown"
406
+ try:
407
+ import webrtcvad # noqa
408
+ vers["webrtcvad"] = "installed"
409
+ except Exception:
410
+ vers["webrtcvad"] = "missing"
411
+ return vers
412
+
413
+ def _detect_filter_caps(self) -> Dict[str, bool]:
414
+ caps = {"arnndn": False, "adeclip": False, "highpass": True}
415
+ try:
416
+ out = run_with_retry(["ffmpeg", "-hide_banner", "-filters"], retries=1, timeout=10)
417
+ txt = "\n".join(out.splitlines())
418
+ for name in list(caps.keys()):
419
+ if f" {name} " in txt:
420
+ caps[name] = True
421
+ except Exception:
422
+ pass
423
+ return caps
424
+
425
+ # --------------------------------------------------------
426
+ # Optional S3 uploader for working copies only
427
+ # --------------------------------------------------------
428
+
429
+ def _get_s3_uploader(self) -> Optional[S3LikeStorageAdapter]:
430
+ bucket = self.presets.get("s3_bucket")
431
+ if not bucket:
432
+ return None
433
+ region = self.presets.get("s3_region")
434
+ prefix = self.presets.get("s3_prefix", "") # kept in presets for key generation
435
+ endpoint = self.presets.get("s3_endpoint")
436
+ try:
437
+ return S3LikeStorageAdapter(bucket=bucket, region_name=region, endpoint_url=endpoint)
438
+ except Exception:
439
+ return None
440
+
441
+ def _maybe_upload_working_to_s3(self, working: Dict[str, Any], local_map: Dict[str, str]) -> None:
442
+ uploader = self._get_s3_uploader()
443
+ if not uploader:
444
+ return
445
+ prefix = str(self.presets.get("s3_prefix", "")).strip()
446
+ jobid = self.manifest.get("job_id", "")
447
+ for chan, local_path in local_map.items():
448
+ ext = os.path.splitext(local_path)[1].lstrip(".") or working.get("format", "flac")
449
+ key = f"{jobid}_{'main' if chan == 'mono' else ('ch1' if chan == 'L' else 'ch2')}.{ext}"
450
+ if prefix:
451
+ key = os.path.join(prefix, key)
452
+ try:
453
+ uploader.upload_file(local_path, key)
454
+ working["uris_remote"][chan] = key
455
+ except Exception:
456
+ # best-effort; continue without failing the job
457
+ pass
458
+
459
+ # --------------------------------------------------------
460
+ # Stage: Probe
461
+ # --------------------------------------------------------
462
+
463
+ def _run_probe(self):
464
+ self._set_stage("probe", "running", 0.05, {"started_at": utc_now_iso()})
465
+
466
+ src = self.manifest["source"]["uri"]
467
+
468
+ # Try to stat source (size/etag); SHA for local files
469
+ st = self.storage.stat(src)
470
+ self.manifest["source"]["bytes"] = st.get("bytes")
471
+ self.manifest["source"]["etag"] = st.get("etag")
472
+
473
+ if os.path.isabs(src) and os.path.isfile(src):
474
+ try:
475
+ self.manifest["source"]["sha256"] = sha256_file(src)
476
+ except Exception:
477
+ self.manifest["source"]["sha256"] = None
478
+
479
+ info = self._ffprobe_streams(src)
480
+ fmt = info.get("format", {})
481
+ streams = info.get("streams", [])
482
+ audio = next((s for s in streams if s.get("codec_type") == "audio"), {})
483
+
484
+ self.manifest["source"].update({
485
+ "container": fmt.get("format_name"),
486
+ "codec": audio.get("codec_name"),
487
+ "duration_ms": int(float(fmt.get("duration", 0)) * 1000) if fmt.get("duration") else None,
488
+ "sample_rate": int(audio.get("sample_rate", 0)) if audio.get("sample_rate") else None,
489
+ "channels": int(audio.get("channels", 0)) if audio.get("channels") else None,
490
+ })
491
+ self._set_stage("probe", "running", 0.6)
492
+
493
+ # Stereo assessment (if 2ch)
494
+ if self.manifest["source"]["channels"] == 2:
495
+ stereo_metrics = self._stereo_metrics(src)
496
+ else:
497
+ stereo_metrics = {
498
+ "rms_L": None, "rms_R": None,
499
+ "mid_rms_db": None, "side_rms_db": None,
500
+ "max_dbfs": None, "clipping_pct": None,
501
+ "near_silent_channel": False, "corr": None,
502
+ "recommended_mode": "mono" if self.manifest["source"]["channels"] == 1 else "as_is"
503
+ }
504
+
505
+ actions = self._decide_actions(stereo_metrics)
506
+ self.manifest["stages"]["probe"].update({
507
+ "metrics": {
508
+ "rms_dbfs_L": stereo_metrics.get("rms_L"),
509
+ "rms_dbfs_R": stereo_metrics.get("rms_R"),
510
+ "max_dbfs": stereo_metrics.get("max_dbfs"),
511
+ "clipping_pct": stereo_metrics.get("clipping_pct"),
512
+ "stereo": stereo_metrics,
513
+ },
514
+ "actions": actions,
515
+ })
516
+ self._set_stage("probe", "done", 1.0, {"ended_at": utc_now_iso()})
517
+
518
+ def _ffprobe_streams(self, uri: str) -> Dict[str, Any]:
519
+ cmd = ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_streams",
520
+ "-show_format", "-of", "json", uri]
521
+ out = run_with_retry(cmd, retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
522
+ return json.loads(out)
523
+
524
+ def _stereo_metrics(self, uri: str) -> Dict[str, Any]:
525
+ # Unique temp files to avoid collisions
526
+ base = os.path.join(tempfile.gettempdir(), f"stmetrics_{uuid.uuid4().hex}")
527
+ L_txt, R_txt, MID_txt, SIDE_txt = [base + s for s in (".L.txt",".R.txt",".MID.txt",".SIDE.txt")]
528
+
529
+ try:
530
+ # astats + mid/side
531
+ cmd = [
532
+ "ffmpeg", "-nostdin", "-hide_banner", "-v", "error",
533
+ "-i", uri, "-map", "0:a:0",
534
+ "-filter_complex",
535
+ (
536
+ "channelsplit=channel_layout=stereo[chl][chr];"
537
+ "[chl]astats=metadata=1:reset=1,ametadata=print:file={L};"
538
+ "[chr]astats=metadata=1:reset=1,ametadata=print:file={R};"
539
+ "pan=stereo|c0=0.5*c0+0.5*c1|c1=0.5*c0-0.5*c1[mid][side];"
540
+ "[mid]astats=metadata=1:reset=1,ametadata=print:file={MID};"
541
+ "[side]astats=metadata=1:reset=1,ametadata=print:file={SIDE}"
542
+ ).format(L=L_txt, R=R_txt, MID=MID_txt, SIDE=SIDE_txt),
543
+ "-f", "null", "-"
544
+ ]
545
+ run_with_retry(cmd, retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
546
+
547
+ # volumedetect
548
+ vd = run_with_retry([
549
+ "ffmpeg", "-nostdin", "-hide_banner", "-v", "error",
550
+ "-i", uri, "-map", "0:a:0", "-af", "volumedetect", "-f", "null", "-"
551
+ ], retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
552
+
553
+ def parse_overall_rms(txt_path: str) -> Optional[float]:
554
+ if not os.path.exists(txt_path): return None
555
+ with open(txt_path, "r", encoding="utf-8", errors="ignore") as f:
556
+ data = f.read()
557
+ m = re.findall(r"Overall RMS level:\s*([-\d\.]+)\s*dB", data)
558
+ return float(m[-1]) if m else None
559
+
560
+ def parse_max_dbfs(vol_text: str) -> Optional[float]:
561
+ m = re.findall(r"max_volume:\s*([-\d\.]+)\s*dB", vol_text)
562
+ return float(m[-1]) if m else None
563
+
564
+ def parse_clipping(vol_text: str) -> Optional[float]:
565
+ m = re.findall(r"number of clipped samples:\s*(\d+)", vol_text)
566
+ return 100.0 if (m and int(m[-1]) > 0) else 0.0
567
+
568
+ rms_L = parse_overall_rms(L_txt)
569
+ rms_R = parse_overall_rms(R_txt)
570
+ mid_rms = parse_overall_rms(MID_txt)
571
+ side_rms = parse_overall_rms(SIDE_txt)
572
+
573
+ # Decide near-silent channel
574
+ near_silent = False
575
+ if rms_L is not None and rms_R is not None:
576
+ if (rms_L < -45.0 and (rms_R - rms_L) > 15.0) or (rms_R < -45.0 and (rms_L - rms_R) > 15.0):
577
+ near_silent = True
578
+
579
+ # Recommended mode
580
+ rec_mode = "split"
581
+ thr = float(self.presets["stereo_side_mid_threshold_db"])
582
+ if (mid_rms is not None and side_rms is not None and (side_rms <= (mid_rms - thr))) or near_silent:
583
+ rec_mode = "mono"
584
+
585
+ return {
586
+ "rms_L": rms_L, "rms_R": rms_R,
587
+ "mid_rms_db": mid_rms, "side_rms_db": side_rms,
588
+ "max_dbfs": parse_max_dbfs(vd), "clipping_pct": parse_clipping(vd),
589
+ "near_silent_channel": near_silent, "corr": None,
590
+ "recommended_mode": rec_mode
591
+ }
592
+ finally:
593
+ for p in (L_txt, R_txt, MID_txt, SIDE_txt):
594
+ try:
595
+ if os.path.exists(p): os.remove(p)
596
+ except Exception:
597
+ pass
598
+
599
+ def _decide_actions(self, stereo_metrics: Dict[str, Any]) -> Dict[str, Any]:
600
+ src_ch = self.manifest["source"]["channels"] or 1
601
+ policy = self.manifest["policy"]
602
+
603
+ # Channel policy
604
+ if policy.get("channel_policy") == "auto":
605
+ if src_ch == 1:
606
+ ch_pol = "downmix"
607
+ else:
608
+ ch_pol = "split" if stereo_metrics.get("recommended_mode") == "split" else "downmix"
609
+ else:
610
+ ch_pol = policy.get("channel_policy")
611
+
612
+ # Denoise: auto -> light if very quiet
613
+ denoise = policy.get("denoise", "auto")
614
+ if denoise == "auto":
615
+ rms_L = stereo_metrics.get("rms_L")
616
+ rms_R = stereo_metrics.get("rms_R")
617
+ denoise_flag = bool((rms_L and rms_L < -35.0) or (rms_R and rms_R < -35.0))
618
+ denoise = "light" if denoise_flag else "none"
619
+
620
+ # Normalize
621
+ normalize = policy.get("normalize", "light")
622
+
623
+ return {
624
+ "will_resample": True,
625
+ "will_split_stereo": (ch_pol == "split"),
626
+ "will_downmix": (ch_pol == "downmix"),
627
+ "will_denoise": (denoise == "light" and self.filter_caps.get("arnndn", False)),
628
+ "will_normalize": (normalize != "none"),
629
+ "channel_policy": ch_pol,
630
+ "normalize_mode": normalize,
631
+ "denoise_mode": denoise if self.filter_caps.get("arnndn", False) else "none",
632
+ }
633
+
634
+ # --------------------------------------------------------
635
+ # Stage: Preprocess (with content-addressed cache)
636
+ # --------------------------------------------------------
637
+
638
+ def _run_preprocess(self):
639
+ self._set_stage("preprocess", "running", 0.05, {"started_at": utc_now_iso()})
640
+
641
+ src = self.manifest["source"]["uri"]
642
+ actions = self.manifest["stages"]["probe"]["actions"]
643
+ policy = self.manifest["policy"]
644
+
645
+ # Build filtergraph with soft-fallbacks
646
+ filters = []
647
+ if self.filter_caps.get("highpass", True):
648
+ filters.append(f"highpass=f={int(policy.get('highpass_hz', 60))}")
649
+ if self.filter_caps.get("adeclip", False):
650
+ filters.append("adeclip")
651
+ if actions["will_denoise"] and self.filter_caps.get("arnndn", False):
652
+ filters.append("arnndn")
653
+
654
+ # Gentle gain if needed
655
+ metrics = self.manifest["stages"]["probe"].get("metrics", {})
656
+ mean_db = None
657
+ if metrics.get("stereo", {}).get("mid_rms_db") is not None:
658
+ mean_db = metrics["stereo"]["mid_rms_db"]
659
+ elif metrics.get("rms_dbfs_L") is not None and metrics.get("rms_dbfs_R") is not None:
660
+ mean_db = (metrics["rms_dbfs_L"] + metrics["rms_dbfs_R"]) / 2.0
661
+ if mean_db is not None and mean_db < float(self.presets["min_mean_dbfs_for_gain"]):
662
+ target_boost = min(float(self.presets["max_gain_db"]),
663
+ abs(float(self.presets["min_mean_dbfs_for_gain"]) - mean_db))
664
+ filters.append(f"volume={target_boost:.1f}dB")
665
+
666
+ filtergraph = ",".join(filters) if filters else "anull"
667
+ sr = int(policy["sample_rate_target"])
668
+ target_container = policy["container_target"].lower()
669
+ ch_policy = actions["channel_policy"]
670
+
671
+ # Compute idempotency key BEFORE encoding (source hash/etag + params)
672
+ idem_src = self.manifest["source"].get("sha256") or self.manifest["source"].get("etag") or self.manifest["source"]["uri"]
673
+ idem_payload = json.dumps({
674
+ "src": idem_src, "filter": filtergraph, "sr": sr,
675
+ "fmt": target_container, "ch_policy": ch_policy,
676
+ "tools": self.tool_versions
677
+ }, sort_keys=True).encode("utf-8")
678
+ idem_key = hashlib.sha256(idem_payload).hexdigest()
679
+
680
+ # Content-addressed working dir
681
+ base_dir = os.path.join(self.work_root, self.manifest["job_id"], idem_key)
682
+ os.makedirs(base_dir, exist_ok=True)
683
+
684
+ def out_path(name: str) -> str:
685
+ return os.path.join(base_dir, f"{name}.{target_container}")
686
+
687
+ # Note: Do not store local paths in manifest. Only store remote keys.
688
+ working = {"format": target_container, "sample_rate": sr, "channel_map": [], "uris_remote": {}, "filtergraph": filtergraph}
689
+ produced_any = False
690
+
691
+ try:
692
+ if ch_policy == "split" and (self.manifest["source"]["channels"] == 2):
693
+ # L/R mono outputs
694
+ outL, outR = out_path("ch1"), out_path("ch2")
695
+ if not (self.storage.exists(outL) and self.storage.exists(outR)):
696
+ cmd = [
697
+ "ffmpeg", "-nostdin", "-hide_banner", "-y", "-v", "error",
698
+ "-i", self.storage.presign(src), "-map", "0:a:0",
699
+ "-map_channel", "0.0.0", "-ac", "1", "-ar", str(sr), "-af", filtergraph, outL,
700
+ "-map_channel", "0.0.1", "-ac", "1", "-ar", str(sr), "-af", filtergraph, outR
701
+ ]
702
+ run_with_retry(cmd, retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
703
+ produced_any = True
704
+ working["channel_map"] = ["L", "R"]
705
+ # Upload to S3/R2 if configured; keep local files but do not store local paths in manifest
706
+ self._maybe_upload_working_to_s3(working, {"L": outL, "R": outR})
707
+
708
+ else:
709
+ # Single mono output
710
+ outM = out_path("main")
711
+ if not self.storage.exists(outM):
712
+ cmd = [
713
+ "ffmpeg", "-nostdin", "-hide_banner", "-y", "-v", "error",
714
+ "-i", self.storage.presign(src), "-map", "0:a:0",
715
+ "-ac", "1", "-ar", str(sr), "-af", filtergraph, outM
716
+ ]
717
+ run_with_retry(cmd, retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
718
+ produced_any = True
719
+ working["channel_map"] = ["mono"]
720
+ self._maybe_upload_working_to_s3(working, {"mono": outM})
721
+
722
+ self.manifest["stages"]["preprocess"].update({
723
+ "idempotency_key": idem_key, "working": working, "ended_at": utc_now_iso()
724
+ })
725
+ self._set_stage("preprocess", "done", 1.0)
726
+
727
+ except Exception as e:
728
+ self._set_stage("preprocess", "failed", 0.0, {"last_error": str(e), "ended_at": utc_now_iso()})
729
+ raise
730
+
731
+ # --------------------------------------------------------
732
+ # Stage: Split plan (virtual by default, VAD streaming)
733
+ # --------------------------------------------------------
734
+
735
+ def _run_split_plan(self):
736
+ self._set_stage("split", "running", 0.05, {"started_at": utc_now_iso()})
737
+
738
+ policy = self.manifest["policy"]
739
+ chunk_target = int(policy["chunk_target_ms"])
740
+ overlap = int(policy["overlap_ms"])
741
+ materialize = bool(policy["materialize_chunks"])
742
+
743
+ work = self.manifest["stages"]["preprocess"]["working"]
744
+ channels = work["channel_map"]
745
+
746
+ chunks: List[Dict[str, Any]] = []
747
+ total_chunks = 0
748
+ plan_source_uris = {}
749
+ proc_source: Dict[str, str] = {}
750
+
751
+ try:
752
+ for chan in channels:
753
+ # Decide source for processing: local file if exists, else remote if available
754
+ local_candidate = None
755
+ # Build the expected local path based on idempotent working dir
756
+ base_dir = os.path.join(self.work_root, self.manifest["job_id"], self.manifest["stages"]["preprocess"]["idempotency_key"])
757
+ fname = "main" if chan == "mono" else ("ch1" if chan == "L" else "ch2")
758
+ local_candidate = os.path.join(base_dir, f"{fname}.{work['format']}")
759
+ remote_key = work.get("uris_remote", {}).get(chan)
760
+
761
+ # Expose preferred source to downstream: presigned remote if available, else local path
762
+ if remote_key:
763
+ try:
764
+ uploader = self._get_s3_uploader()
765
+ plan_source_uris[chan] = uploader.presign(remote_key, "GET") if uploader else local_candidate
766
+ except Exception:
767
+ plan_source_uris[chan] = local_candidate
768
+ else:
769
+ plan_source_uris[chan] = local_candidate
770
+
771
+ # Choose ffmpeg input: local if present, else presigned remote, else raw key (ffmpeg may support s3/http)
772
+ if os.path.exists(local_candidate):
773
+ ffmpeg_src = local_candidate
774
+ elif remote_key:
775
+ try:
776
+ uploader = self._get_s3_uploader()
777
+ ffmpeg_src = uploader.presign(remote_key, "GET") if uploader else remote_key
778
+ except Exception:
779
+ ffmpeg_src = remote_key
780
+ else:
781
+ ffmpeg_src = local_candidate # may not exist; will fail predictably
782
+
783
+ # cache processing source for materialization stage
784
+ proc_source[chan] = ffmpeg_src
785
+
786
+ info = self._ffprobe_streams(ffmpeg_src)
787
+ dur_ms = int(float(info.get("format", {}).get("duration", 0.0)) * 1000)
788
+
789
+ # Build ranges via streaming VAD if requested/available
790
+ ranges = self._build_chunks_vad_or_fixed_streaming(ffmpeg_src, dur_ms, chunk_target, overlap)
791
+ for idx, (start_ms, dur) in enumerate(ranges):
792
+ chunks.append({"idx": idx if len(channels) == 1 else f"{idx}{chan}",
793
+ "chan": chan, "start_ms": int(start_ms), "dur_ms": int(dur),
794
+ "status": "queued"})
795
+ total_chunks += len(ranges)
796
+ # Progress rough update per channel
797
+ self._set_stage("split", "running", clamp(0.05 + 0.4 * (total_chunks / max(1, total_chunks)), 0.05, 0.9))
798
+
799
+ plan = {
800
+ "mode": "materialized" if materialize else "virtual",
801
+ "channels": channels,
802
+ "source_uris": plan_source_uris, # <--- expose per-channel sources
803
+ "chunk_policy": policy["chunk_policy"],
804
+ "chunk_target_ms": chunk_target,
805
+ "overlap_ms": overlap,
806
+ "total_chunks": total_chunks,
807
+ "chunks": chunks[:2000] if total_chunks <= 2000 else [], # avoid bloating manifest
808
+ }
809
+
810
+ if materialize:
811
+ # Accurate-seek pattern for better boundaries:
812
+ # - Fast seek before -i
813
+ # - Fine seek after -i (optional) + atrim
814
+ out_dir = os.path.join(self.work_root, self.manifest["job_id"], "chunks")
815
+ os.makedirs(out_dir, exist_ok=True)
816
+ for c in chunks:
817
+ st_s = c["start_ms"] / 1000.0
818
+ du_s = c["dur_ms"] / 1000.0
819
+ chan = c["chan"]
820
+ outp = os.path.join(out_dir, f"chunk_{c['idx']}.flac")
821
+ inp = proc_source.get(chan, None) or plan_source_uris.get(chan)
822
+ # fast seek near the start, then fine trim with atrim to be exact
823
+ cmd = [
824
+ "ffmpeg", "-nostdin", "-hide_banner", "-y", "-v", "error",
825
+ "-ss", sec_to_hms(max(0.0, st_s - 0.05)), # fast seek slightly earlier
826
+ "-i", inp, "-map", "0:a:0",
827
+ "-ss", "0.05", "-t", f"{du_s:.3f}",
828
+ "-af", f"atrim=start=0:end={du_s:.3f}",
829
+ "-ac", "1", "-ar", str(self.presets["sample_rate_target"]),
830
+ outp
831
+ ]
832
+ run_with_retry(cmd, retries=self.presets["ff_retries"], timeout=self.presets["ff_timeout_sec"])
833
+ c["uri"] = outp
834
+ c["status"] = "ready"
835
+
836
+ # Save plan
837
+ self.manifest["stages"]["split"]["plan"] = plan
838
+ self._set_stage("split", "done", 1.0, {"ended_at": utc_now_iso()})
839
+
840
+ # Initialize downstream counters
841
+ self.manifest["stages"]["transcribe"].update({
842
+ "status": "pending",
843
+ "progress": 0.0,
844
+ "chunks": {"total": total_chunks, "done": 0, "running": 0, "failed": 0, "queued": total_chunks},
845
+ "per_chunk": []
846
+ })
847
+
848
+ except Exception as e:
849
+ self._set_stage("split", "failed", 0.0, {"last_error": str(e), "ended_at": utc_now_iso()})
850
+ raise
851
+
852
+ # --- VAD (streaming from ffmpeg to avoid big temp files) ---
853
+
854
+ def _build_chunks_vad_or_fixed_streaming(self, src_uri: str, dur_ms: int, target_ms: int, overlap_ms: int) -> List[Tuple[int, int]]:
855
+ use_vad = (self.tool_versions.get("webrtcvad") == "installed") and \
856
+ (self.manifest["policy"].get("chunk_policy", "").startswith("vad"))
857
+ if not use_vad:
858
+ return self._fixed_chunks(dur_ms, target_ms, overlap_ms)
859
+
860
+ # Decode via ffmpeg to s16le PCM @16k mono on stdout, feed WebRTC VAD
861
+ try:
862
+ import webrtcvad
863
+ vad = webrtcvad.Vad(int(self.presets["vad_aggressiveness"]))
864
+ frame_ms = 30
865
+ bytes_per_frame = int(16000 * 2 * frame_ms / 1000)
866
+
867
+ # Start ffmpeg process
868
+ cmd = [
869
+ "ffmpeg", "-nostdin", "-hide_banner", "-v", "error",
870
+ "-i", src_uri, "-map", "0:a:0", "-ac", "1", "-ar", "16000",
871
+ "-f", "s16le", "-" # raw PCM to stdout
872
+ ]
873
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
874
+ speech_regions: List[Tuple[int, int]] = []
875
+ in_speech = False
876
+ seg_start = 0
877
+ frames_read = 0
878
+ last_progress_emit = time.time()
879
+
880
+ while True:
881
+ chunk = proc.stdout.read(bytes_per_frame)
882
+ if chunk is None or len(chunk) == 0:
883
+ break
884
+ if len(chunk) < bytes_per_frame:
885
+ break # tail
886
+
887
+ t_ms = frames_read * frame_ms
888
+ frames_read += 1
889
+ is_speech = vad.is_speech(chunk, 16000)
890
+ if is_speech and not in_speech:
891
+ in_speech = True
892
+ seg_start = t_ms
893
+ elif not is_speech and in_speech:
894
+ in_speech = False
895
+ speech_regions.append((seg_start, t_ms - seg_start))
896
+
897
+ # emit progress occasionally (decode-based approximation)
898
+ if time.time() - last_progress_emit > 0.5 and dur_ms:
899
+ prog = clamp(0.05 + 0.8 * (t_ms / dur_ms), 0.05, 0.95)
900
+ self._set_stage("split", "running", prog)
901
+ last_progress_emit = time.time()
902
+
903
+ # finalize region if still in speech
904
+ if in_speech:
905
+ speech_regions.append((seg_start, max(0, dur_ms - seg_start)))
906
+
907
+ # Build chunks by packing islands to ~target_ms
908
+ chunks: List[Tuple[int, int]] = []
909
+ if not speech_regions:
910
+ return self._fixed_chunks(dur_ms, target_ms, overlap_ms)
911
+
912
+ cur_start = None
913
+ cur_end = None
914
+ max_len = target_ms + 500
915
+ gap_allow = 300
916
+ # Pack complete speech islands into chunks. We never split an
917
+ # individual speech island; max_len is only used to decide when
918
+ # to merge adjacent islands. If a single island exceeds max_len
919
+ # it will remain intact in its own chunk.
920
+ for s, d in speech_regions:
921
+ e = s + d
922
+ if cur_start is None:
923
+ cur_start, cur_end = s, e
924
+ continue
925
+ # merge only if small gap and combined length stays within max_len
926
+ if (s - cur_end) <= gap_allow and (e - cur_start) <= max_len:
927
+ cur_end = e
928
+ else:
929
+ # finalize current chunk as the full span of islands
930
+ chunks.append((cur_start, cur_end - cur_start))
931
+ cur_start = s
932
+ cur_end = e
933
+
934
+ # finalize last chunk
935
+ if cur_start is not None and (cur_end - cur_start) > 250:
936
+ chunks.append((cur_start, cur_end - cur_start))
937
+
938
+ # Normalize with overlap shift but preserve complete islands.
939
+ # We shift the start earlier by overlap_ms for non-first chunks but
940
+ # keep the full island coverage in the duration.
941
+ normalized: List[Tuple[int, int]] = []
942
+ for i, (s, d) in enumerate(chunks):
943
+ if i == 0:
944
+ normalized.append((max(0, s), d))
945
+ else:
946
+ s2 = max(0, s - overlap_ms)
947
+ normalized.append((s2, (s + d) - s2))
948
+
949
+ return self._cap_chunks(normalized, dur_ms)
950
+
951
+ except Exception:
952
+ # If anything fails in VAD pipeline → fallback to fixed
953
+ return self._fixed_chunks(dur_ms, target_ms, overlap_ms)
954
+
955
+ def _fixed_chunks(self, dur_ms: int, target_ms: int, overlap_ms: int) -> List[Tuple[int, int]]:
956
+ chunks: List[Tuple[int, int]] = []
957
+ if dur_ms <= 0: return chunks
958
+ step = max(1, target_ms - overlap_ms)
959
+ start = 0
960
+ while start < dur_ms:
961
+ length = min(target_ms, dur_ms - start)
962
+ chunks.append((start, length))
963
+ if length < target_ms: break
964
+ start += step
965
+ return chunks
966
+
967
+ def _cap_chunks(self, chunks: List[Tuple[int, int]], dur_ms: int) -> List[Tuple[int, int]]:
968
+ capped = []
969
+ for s, d in chunks:
970
+ s2 = clamp(s, 0, max(0, dur_ms - 1))
971
+ d2 = clamp(d, 1, dur_ms - s2)
972
+ capped.append((int(s2), int(d2)))
973
+ return capped
974
+
975
+
976
+ # ------------------------------------------------------
977
+ # Example CLI (optional)
978
+ # ------------------------------------------------------
979
+ if __name__ == "__main__":
980
+ import argparse
981
+ ap = argparse.ArgumentParser(description="Audio preprocess runner v2 (until split).")
982
+ ap.add_argument("source", help="Local path or URL/storage URI to audio")
983
+ ap.add_argument("work_root", help="Working root directory")
984
+ ap.add_argument("--manifest", help="Path to existing manifest.json to resume", default=None)
985
+ ap.add_argument("--chunk_ms", type=int, default=60000)
986
+ ap.add_argument("--overlap_ms", type=int, default=300)
987
+ ap.add_argument("--materialize", action="store_true")
988
+ args = ap.parse_args()
989
+
990
+ presets = {
991
+ "chunk_target_ms": args.chunk_ms,
992
+ "overlap_ms": args.overlap_ms,
993
+ "materialize_chunks": args.materialize
994
+ }
995
+
996
+ storage = LocalStorageAdapter()
997
+
998
+ manifest = None
999
+ if args.manifest and os.path.exists(args.manifest):
1000
+ with open(args.manifest, "r", encoding="utf-8") as f:
1001
+ manifest = json.load(f)
1002
+
1003
+ runner = AudioJobRunner(
1004
+ manifest=manifest,
1005
+ source_uri=None if manifest else args.source,
1006
+ work_root=args.work_root,
1007
+ storage=storage,
1008
+ presets=presets
1009
+ )
1010
+ out_manifest = runner.run_until_split()
1011
+
1012
+ out_path = os.path.join(args.work_root, "manifest.json")
1013
+ os.makedirs(args.work_root, exist_ok=True)
1014
+ with open(out_path, "w", encoding="utf-8") as f:
1015
+ json.dump(out_manifest, f, ensure_ascii=False, indent=2)
1016
+ print(f"Saved manifest -> {out_path}")
requirements.txt CHANGED
@@ -1,4 +1,6 @@
1
  gradio>=5.39.0
2
  requests>=2.31.0
3
  pydub>=0.25.1
4
- ffmpeg-python>=0.2.0
 
 
 
1
  gradio>=5.39.0
2
  requests>=2.31.0
3
  pydub>=0.25.1
4
+ ffmpeg-python>=0.2.0
5
+ webrtcvad
6
+ boto3>=1.34.0