multimodalart HF Staff commited on
Commit
ae7dd23
·
verified ·
1 Parent(s): 72c229c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +33 -45
app.py CHANGED
@@ -2,7 +2,7 @@ import gradio as gr
2
  from gradio_client import Client, handle_file
3
  from google import genai
4
  import os
5
- from typing import Optional, List
6
  from huggingface_hub import whoami
7
  from PIL import Image
8
  from io import BytesIO
@@ -35,28 +35,35 @@ def _extract_image_data_from_response(response) -> Optional[bytes]:
35
  return part.inline_data.data
36
  return None
37
 
38
- def _get_framerate(video_path: str) -> float:
39
- """Instantly gets the framerate of a video using ffprobe."""
40
  probe = ffmpeg.probe(video_path)
41
- video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
42
- if video_stream is None:
43
- raise ValueError("Could not find video stream in the file.")
44
- return eval(video_stream['avg_frame_rate'])
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
  def _trim_first_frame_fast(video_path: str) -> str:
47
- """
48
- Removes exactly the first frame of a video without re-encoding.
49
- This is the frame-accurate and fast method.
50
- """
51
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
52
  output_path = tmp_output_file.name
53
-
54
  try:
55
- framerate = _get_framerate(video_path)
56
  if framerate == 0: raise ValueError("Framerate cannot be zero.")
57
  start_time = 1 / framerate
58
-
59
- # The key is placing -ss AFTER -i for accuracy, combined with -c copy for speed.
60
  (
61
  ffmpeg
62
  .input(video_path, ss=start_time)
@@ -68,17 +75,14 @@ def _trim_first_frame_fast(video_path: str) -> str:
68
  raise RuntimeError(f"FFmpeg trim error: {e}")
69
 
70
  def _combine_videos_simple(video1_path: str, video2_path: str) -> str:
71
- """
72
- Combines two videos using the fast concat demuxer. Assumes video2 is already trimmed.
73
- """
74
  with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file:
75
  tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n")
76
  tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n")
77
  list_file_path = tmp_list_file.name
78
-
79
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
80
  output_path = tmp_output_file.name
81
-
82
  try:
83
  (
84
  ffmpeg
@@ -104,9 +108,6 @@ def _generate_video_segment(input_image_path: str, output_image_path: str, promp
104
  return result[0]["video"]
105
 
106
  def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], oauth_token: Optional[gr.OAuthToken]) -> tuple:
107
- """
108
- Handles image generation and determines the visibility of video creation buttons.
109
- """
110
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
111
  try:
112
  contents = [Image.open(image_path[0]) for image_path in images] if images else []
@@ -114,25 +115,16 @@ def unified_image_generator(prompt: str, images: Optional[List[str]], previous_v
114
  response = client.models.generate_content(model=GEMINI_MODEL_NAME, contents=contents)
115
  image_data = _extract_image_data_from_response(response)
116
  if not image_data: raise ValueError("No image data in response.")
117
-
118
  with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
119
  Image.open(BytesIO(image_data)).save(tmp.name)
120
  output_path = tmp.name
121
-
122
  can_create_video = bool(images and len(images) == 1)
123
  can_extend_video = can_create_video and bool(previous_video_path)
124
-
125
- return (
126
- output_path,
127
- gr.update(visible=can_create_video),
128
- gr.update(visible=can_extend_video),
129
- gr.update(visible=False)
130
- )
131
  except Exception as e:
132
  raise gr.Error(f"Image generation failed: {e}")
133
 
134
  def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
135
- """Starts a NEW video chain, overwriting any previous video state."""
136
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
137
  if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
138
  try:
@@ -142,12 +134,14 @@ def create_new_video(input_image_gallery: List[str], prompt_input: str, output_i
142
  raise gr.Error(f"Video creation failed: {e}")
143
 
144
  def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
145
- """Extends an existing video with a new segment."""
146
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
147
  if not previous_video_path: raise gr.Error("No previous video to extend.")
148
  if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
149
  try:
150
- new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token)
 
 
 
151
  trimmed_segment_path = _trim_first_frame_fast(new_segment_path)
152
  final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path)
153
  return final_video_path, final_video_path
@@ -189,8 +183,8 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
189
  output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath")
190
  use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary")
191
  with gr.Row():
192
- create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False)
193
- extend_video_button = gr.Button("Extend previous video with new scene 🎞️", variant="secondary", visible=False)
194
  with gr.Group(visible=False) as video_group:
195
  video_output = gr.Video(label="Generated Video", show_download_button=True, autoplay=True)
196
  gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad")
@@ -204,19 +198,14 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
204
  inputs=[prompt_input, image_input_gallery, previous_video_state],
205
  outputs=[output_image, create_video_button, extend_video_button, video_group]
206
  )
207
-
208
  use_image_button.click(
209
  fn=lambda img: (
210
- [img] if img else None,
211
- None,
212
- gr.update(visible=False),
213
- gr.update(visible=False),
214
- gr.update(visible=False)
215
  ),
216
  inputs=[output_image],
217
  outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group]
218
  )
219
-
220
  create_video_button.click(
221
  fn=lambda: gr.update(visible=True), outputs=[video_group]
222
  ).then(
@@ -224,7 +213,6 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
224
  inputs=[image_input_gallery, prompt_input, output_image],
225
  outputs=[video_output, previous_video_state],
226
  )
227
-
228
  extend_video_button.click(
229
  fn=lambda: gr.update(visible=True), outputs=[video_group]
230
  ).then(
 
2
  from gradio_client import Client, handle_file
3
  from google import genai
4
  import os
5
+ from typing import Optional, List, Tuple
6
  from huggingface_hub import whoami
7
  from PIL import Image
8
  from io import BytesIO
 
35
  return part.inline_data.data
36
  return None
37
 
38
+ def _get_video_info(video_path: str) -> Tuple[float, Tuple[int, int]]:
39
+ """Instantly gets the framerate and (width, height) of a video using ffprobe."""
40
  probe = ffmpeg.probe(video_path)
41
+ video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None)
42
+ if not video_stream:
43
+ raise ValueError("No video stream found in the file.")
44
+ framerate = eval(video_stream['avg_frame_rate'])
45
+ resolution = (int(video_stream['width']), int(video_stream['height']))
46
+ return framerate, resolution
47
+
48
+ def _resize_image(image_path: str, target_size: Tuple[int, int]) -> str:
49
+ """Resizes an image to a target size and saves it to a new temp file."""
50
+ with Image.open(image_path) as img:
51
+ if img.size == target_size:
52
+ return image_path
53
+ resized_img = img.resize(target_size, Image.Resampling.LANCZOS)
54
+ suffix = os.path.splitext(image_path)[1] or ".png"
55
+ with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
56
+ resized_img.save(tmp_file.name)
57
+ return tmp_file.name
58
 
59
  def _trim_first_frame_fast(video_path: str) -> str:
60
+ """Removes exactly the first frame of a video without re-encoding."""
 
 
 
61
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
62
  output_path = tmp_output_file.name
 
63
  try:
64
+ framerate, _ = _get_video_info(video_path)
65
  if framerate == 0: raise ValueError("Framerate cannot be zero.")
66
  start_time = 1 / framerate
 
 
67
  (
68
  ffmpeg
69
  .input(video_path, ss=start_time)
 
75
  raise RuntimeError(f"FFmpeg trim error: {e}")
76
 
77
  def _combine_videos_simple(video1_path: str, video2_path: str) -> str:
78
+ """Combines two videos using the fast concat demuxer."""
79
+
 
80
  with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file:
81
  tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n")
82
  tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n")
83
  list_file_path = tmp_list_file.name
 
84
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
85
  output_path = tmp_output_file.name
 
86
  try:
87
  (
88
  ffmpeg
 
108
  return result[0]["video"]
109
 
110
  def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], oauth_token: Optional[gr.OAuthToken]) -> tuple:
 
 
 
111
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
112
  try:
113
  contents = [Image.open(image_path[0]) for image_path in images] if images else []
 
115
  response = client.models.generate_content(model=GEMINI_MODEL_NAME, contents=contents)
116
  image_data = _extract_image_data_from_response(response)
117
  if not image_data: raise ValueError("No image data in response.")
 
118
  with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
119
  Image.open(BytesIO(image_data)).save(tmp.name)
120
  output_path = tmp.name
 
121
  can_create_video = bool(images and len(images) == 1)
122
  can_extend_video = can_create_video and bool(previous_video_path)
123
+ return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False))
 
 
 
 
 
 
124
  except Exception as e:
125
  raise gr.Error(f"Image generation failed: {e}")
126
 
127
  def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
 
128
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
129
  if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
130
  try:
 
134
  raise gr.Error(f"Video creation failed: {e}")
135
 
136
  def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
 
137
  if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
138
  if not previous_video_path: raise gr.Error("No previous video to extend.")
139
  if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
140
  try:
141
+ _, target_resolution = _get_video_info(previous_video_path)
142
+ resized_input_path = _resize_image(input_image_gallery[0][0], target_resolution)
143
+ resized_output_path = _resize_image(output_image, target_resolution)
144
+ new_segment_path = _generate_video_segment(resized_input_path, resized_output_path, prompt_input, oauth_token.token)
145
  trimmed_segment_path = _trim_first_frame_fast(new_segment_path)
146
  final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path)
147
  return final_video_path, final_video_path
 
183
  output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath")
184
  use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary")
185
  with gr.Row():
186
+ create_video_button = gr.Button("Create a video between the two images 🎥", variant="secondary", visible=False)
187
+ extend_video_button = gr.Button("Extend video with new scene 🎞️", variant="secondary", visible=False)
188
  with gr.Group(visible=False) as video_group:
189
  video_output = gr.Video(label="Generated Video", show_download_button=True, autoplay=True)
190
  gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad")
 
198
  inputs=[prompt_input, image_input_gallery, previous_video_state],
199
  outputs=[output_image, create_video_button, extend_video_button, video_group]
200
  )
 
201
  use_image_button.click(
202
  fn=lambda img: (
203
+ [img] if img else None, None, gr.update(visible=False),
204
+ gr.update(visible=False), gr.update(visible=False)
 
 
 
205
  ),
206
  inputs=[output_image],
207
  outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group]
208
  )
 
209
  create_video_button.click(
210
  fn=lambda: gr.update(visible=True), outputs=[video_group]
211
  ).then(
 
213
  inputs=[image_input_gallery, prompt_input, output_image],
214
  outputs=[video_output, previous_video_state],
215
  )
 
216
  extend_video_button.click(
217
  fn=lambda: gr.update(visible=True), outputs=[video_group]
218
  ).then(