Fix/enforce vp9 for webm (#805)

* Simple fix to enforce vp9 for webm

* Remove suggest methods from program helper

* Cleanup ffmpeg.py a bit
This commit is contained in:
Henry Ruhs 2024-11-03 14:14:22 +01:00 committed by henryruhs
parent 965da98745
commit 4bffa0d183
4 changed files with 38 additions and 38 deletions

View File

@ -14,13 +14,14 @@ from facefusion.vision import detect_video_duration, restrict_video_fps
def run_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]: def run_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]:
log_level = state_manager.get_item('log_level')
commands = [ shutil.which('ffmpeg'), '-hide_banner', '-loglevel', 'error' ] commands = [ shutil.which('ffmpeg'), '-hide_banner', '-loglevel', 'error' ]
commands.extend(args) commands.extend(args)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while process_manager.is_processing(): while process_manager.is_processing():
try: try:
if state_manager.get_item('log_level') == 'debug': if log_level == 'debug':
log_debug(process) log_debug(process)
process.wait(timeout = 0.5) process.wait(timeout = 0.5)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -66,30 +67,37 @@ def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fp
def merge_video(target_path : str, output_video_resolution : str, output_video_fps : Fps) -> bool: def merge_video(target_path : str, output_video_resolution : str, output_video_fps : Fps) -> bool:
output_video_encoder = state_manager.get_item('output_video_encoder')
output_video_quality = state_manager.get_item('output_video_quality')
output_video_preset = state_manager.get_item('output_video_preset')
temp_video_fps = restrict_video_fps(target_path, output_video_fps) temp_video_fps = restrict_video_fps(target_path, output_video_fps)
temp_file_path = get_temp_file_path(target_path) temp_file_path = get_temp_file_path(target_path)
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d') temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', state_manager.get_item('output_video_encoder') ] is_webm = filetype.guess_mime(target_path) == 'video/webm'
if state_manager.get_item('output_video_encoder') in [ 'libx264', 'libx265' ]: if is_webm:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) output_video_encoder = 'libvpx-vp9'
commands.extend([ '-crf', str(output_video_compression), '-preset', state_manager.get_item('output_video_preset') ]) commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ]
if state_manager.get_item('output_video_encoder') in [ 'libvpx-vp9' ]: if output_video_encoder in [ 'libx264', 'libx265' ]:
output_video_compression = round(63 - (state_manager.get_item('output_video_quality') * 0.63)) output_video_compression = round(51 - (output_video_quality * 0.51))
commands.extend([ '-crf', str(output_video_compression), '-preset', output_video_preset ])
if output_video_encoder in [ 'libvpx-vp9' ]:
output_video_compression = round(63 - (output_video_quality * 0.63))
commands.extend([ '-crf', str(output_video_compression) ]) commands.extend([ '-crf', str(output_video_compression) ])
if state_manager.get_item('output_video_encoder') in [ 'h264_nvenc', 'hevc_nvenc' ]: if output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) output_video_compression = round(51 - (output_video_quality * 0.51))
commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(state_manager.get_item('output_video_preset')) ]) commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(output_video_preset) ])
if state_manager.get_item('output_video_encoder') in [ 'h264_amf', 'hevc_amf' ]: if output_video_encoder in [ 'h264_amf', 'hevc_amf' ]:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) output_video_compression = round(51 - (output_video_quality * 0.51))
commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(state_manager.get_item('output_video_preset')) ]) commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(output_video_preset) ])
if state_manager.get_item('output_video_encoder') in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]: if output_video_encoder in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]:
commands.extend([ '-q:v', str(state_manager.get_item('output_video_quality')) ]) commands.extend([ '-q:v', str(output_video_quality) ])
commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path ]) commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path ])
return run_ffmpeg(commands).returncode == 0 return run_ffmpeg(commands).returncode == 0
def concat_video(output_path : str, temp_output_paths : List[str]) -> bool: def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
concat_video_path = tempfile.mktemp() concat_video_path = tempfile.mktemp()
with open(concat_video_path, 'w') as concat_video_file: with open(concat_video_path, 'w') as concat_video_file:
@ -97,7 +105,7 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep) concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
concat_video_file.flush() concat_video_file.flush()
concat_video_file.close() concat_video_file.close()
commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-y', os.path.abspath(output_path) ] commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', output_audio_encoder, '-y', os.path.abspath(output_path) ]
process = run_ffmpeg(commands) process = run_ffmpeg(commands)
process.communicate() process.communicate()
remove_file(concat_video_path) remove_file(concat_video_path)
@ -112,8 +120,9 @@ def copy_image(target_path : str, temp_image_resolution : str) -> bool:
def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool: def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool:
output_image_quality = state_manager.get_item('output_image_quality')
temp_file_path = get_temp_file_path(target_path) temp_file_path = get_temp_file_path(target_path)
output_image_compression = calc_image_compression(target_path, state_manager.get_item('output_image_quality')) output_image_compression = calc_image_compression(target_path, output_image_quality)
commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ] commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ]
return run_ffmpeg(commands).returncode == 0 return run_ffmpeg(commands).returncode == 0
@ -137,6 +146,7 @@ def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int)
def restore_audio(target_path : str, output_path : str, output_video_fps : Fps) -> bool: def restore_audio(target_path : str, output_path : str, output_video_fps : Fps) -> bool:
trim_frame_start = state_manager.get_item('trim_frame_start') trim_frame_start = state_manager.get_item('trim_frame_start')
trim_frame_end = state_manager.get_item('trim_frame_end') trim_frame_end = state_manager.get_item('trim_frame_end')
output_audio_encoder = state_manager.get_item('output_audio_encoder')
temp_file_path = get_temp_file_path(target_path) temp_file_path = get_temp_file_path(target_path)
temp_video_duration = detect_video_duration(temp_file_path) temp_video_duration = detect_video_duration(temp_file_path)
commands = [ '-i', temp_file_path ] commands = [ '-i', temp_file_path ]
@ -147,14 +157,15 @@ def restore_audio(target_path : str, output_path : str, output_video_fps : Fps)
if isinstance(trim_frame_end, int): if isinstance(trim_frame_end, int):
end_time = trim_frame_end / output_video_fps end_time = trim_frame_end / output_video_fps
commands.extend([ '-to', str(end_time) ]) commands.extend([ '-to', str(end_time) ])
commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-map', '0:v:0', '-map', '1:a:0', '-t', str(temp_video_duration), '-y', output_path ]) commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', output_audio_encoder, '-map', '0:v:0', '-map', '1:a:0', '-t', str(temp_video_duration), '-y', output_path ])
return run_ffmpeg(commands).returncode == 0 return run_ffmpeg(commands).returncode == 0
def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool: def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
temp_file_path = get_temp_file_path(target_path) temp_file_path = get_temp_file_path(target_path)
temp_video_duration = detect_video_duration(temp_file_path) temp_video_duration = detect_video_duration(temp_file_path)
commands = [ '-i', temp_file_path, '-i', audio_path, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-t', str(temp_video_duration), '-y', output_path ] commands = [ '-i', temp_file_path, '-i', audio_path, '-c:v', 'copy', '-c:a', output_audio_encoder, '-t', str(temp_video_duration), '-y', output_path ]
return run_ffmpeg(commands).returncode == 0 return run_ffmpeg(commands).returncode == 0

View File

@ -20,7 +20,7 @@ from facefusion.inference_manager import get_static_model_initializer
from facefusion.processors import choices as processors_choices from facefusion.processors import choices as processors_choices
from facefusion.processors.pixel_boost import explode_pixel_boost, implode_pixel_boost from facefusion.processors.pixel_boost import explode_pixel_boost, implode_pixel_boost
from facefusion.processors.typing import FaceSwapperInputs from facefusion.processors.typing import FaceSwapperInputs
from facefusion.program_helper import find_argument_group, suggest_face_swapper_pixel_boost_choices from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.typing import ApplyStateItem, Args, Embedding, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame from facefusion.typing import ApplyStateItem, Args, Embedding, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, read_static_images, unpack_resolution, write_image from facefusion.vision import read_image, read_static_image, read_static_images, unpack_resolution, write_image
@ -351,7 +351,8 @@ def register_args(program : ArgumentParser) -> None:
group_processors = find_argument_group(program, 'processors') group_processors = find_argument_group(program, 'processors')
if group_processors: if group_processors:
group_processors.add_argument('--face-swapper-model', help = wording.get('help.face_swapper_model'), default = config.get_str_value('processors.face_swapper_model', 'inswapper_128_fp16'), choices = processors_choices.face_swapper_set.keys()) group_processors.add_argument('--face-swapper-model', help = wording.get('help.face_swapper_model'), default = config.get_str_value('processors.face_swapper_model', 'inswapper_128_fp16'), choices = processors_choices.face_swapper_set.keys())
face_swapper_pixel_boost_choices = suggest_face_swapper_pixel_boost_choices(program) known_args, _ = program.parse_known_args()
face_swapper_pixel_boost_choices = processors_choices.face_swapper_set.get(known_args.face_swapper_model)
group_processors.add_argument('--face-swapper-pixel-boost', help = wording.get('help.face_swapper_pixel_boost'), default = config.get_str_value('processors.face_swapper_pixel_boost', get_first(face_swapper_pixel_boost_choices)), choices = face_swapper_pixel_boost_choices) group_processors.add_argument('--face-swapper-pixel-boost', help = wording.get('help.face_swapper_pixel_boost'), default = config.get_str_value('processors.face_swapper_pixel_boost', get_first(face_swapper_pixel_boost_choices)), choices = face_swapper_pixel_boost_choices)
facefusion.jobs.job_store.register_step_keys([ 'face_swapper_model', 'face_swapper_pixel_boost' ]) facefusion.jobs.job_store.register_step_keys([ 'face_swapper_model', 'face_swapper_pixel_boost' ])

View File

@ -3,12 +3,11 @@ from argparse import ArgumentParser, HelpFormatter
import facefusion.choices import facefusion.choices
from facefusion import config, metadata, state_manager, wording from facefusion import config, metadata, state_manager, wording
from facefusion.common_helper import create_float_metavar, create_int_metavar from facefusion.common_helper import create_float_metavar, create_int_metavar, get_last
from facefusion.execution import get_execution_provider_choices from facefusion.execution import get_execution_provider_choices
from facefusion.filesystem import list_directory from facefusion.filesystem import list_directory
from facefusion.jobs import job_store from facefusion.jobs import job_store
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
from facefusion.program_helper import suggest_face_detector_choices
def create_help_formatter_small(prog : str) -> HelpFormatter: def create_help_formatter_small(prog : str) -> HelpFormatter:
@ -72,7 +71,9 @@ def create_face_detector_program() -> ArgumentParser:
program = ArgumentParser(add_help = False) program = ArgumentParser(add_help = False)
group_face_detector = program.add_argument_group('face detector') group_face_detector = program.add_argument_group('face detector')
group_face_detector.add_argument('--face-detector-model', help = wording.get('help.face_detector_model'), default = config.get_str_value('face_detector.face_detector_model', 'yoloface'), choices = facefusion.choices.face_detector_set.keys()) group_face_detector.add_argument('--face-detector-model', help = wording.get('help.face_detector_model'), default = config.get_str_value('face_detector.face_detector_model', 'yoloface'), choices = facefusion.choices.face_detector_set.keys())
group_face_detector.add_argument('--face-detector-size', help = wording.get('help.face_detector_size'), default = config.get_str_value('face_detector.face_detector_size', '640x640'), choices = suggest_face_detector_choices(program)) known_args, _ = program.parse_known_args()
face_detector_size_choices = facefusion.choices.face_detector_set.get(known_args.face_detector_model)
group_face_detector.add_argument('--face-detector-size', help = wording.get('help.face_detector_size'), default = config.get_str_value('face_detector.face_detector_size', get_last(face_detector_size_choices)), choices = face_detector_size_choices)
group_face_detector.add_argument('--face-detector-angles', help = wording.get('help.face_detector_angles'), type = int, default = config.get_int_list('face_detector.face_detector_angles', '0'), choices = facefusion.choices.face_detector_angles, nargs = '+', metavar = 'FACE_DETECTOR_ANGLES') group_face_detector.add_argument('--face-detector-angles', help = wording.get('help.face_detector_angles'), type = int, default = config.get_int_list('face_detector.face_detector_angles', '0'), choices = facefusion.choices.face_detector_angles, nargs = '+', metavar = 'FACE_DETECTOR_ANGLES')
group_face_detector.add_argument('--face-detector-score', help = wording.get('help.face_detector_score'), type = float, default = config.get_float_value('face_detector.face_detector_score', '0.5'), choices = facefusion.choices.face_detector_score_range, metavar = create_float_metavar(facefusion.choices.face_detector_score_range)) group_face_detector.add_argument('--face-detector-score', help = wording.get('help.face_detector_score'), type = float, default = config.get_float_value('face_detector.face_detector_score', '0.5'), choices = facefusion.choices.face_detector_score_range, metavar = create_float_metavar(facefusion.choices.face_detector_score_range))
job_store.register_step_keys([ 'face_detector_model', 'face_detector_angles', 'face_detector_size', 'face_detector_score' ]) job_store.register_step_keys([ 'face_detector_model', 'face_detector_angles', 'face_detector_size', 'face_detector_score' ])

View File

@ -1,8 +1,5 @@
from argparse import ArgumentParser, _ArgumentGroup, _SubParsersAction from argparse import ArgumentParser, _ArgumentGroup, _SubParsersAction
from typing import List, Optional from typing import Optional
import facefusion.choices
from facefusion.processors import choices as processors_choices
def find_argument_group(program : ArgumentParser, group_name : str) -> Optional[_ArgumentGroup]: def find_argument_group(program : ArgumentParser, group_name : str) -> Optional[_ArgumentGroup]:
@ -32,13 +29,3 @@ def validate_actions(program : ArgumentParser) -> bool:
elif action.default not in action.choices: elif action.default not in action.choices:
return False return False
return True return True
def suggest_face_detector_choices(program : ArgumentParser) -> List[str]:
known_args, _ = program.parse_known_args()
return facefusion.choices.face_detector_set.get(known_args.face_detector_model) #type:ignore[call-overload]
def suggest_face_swapper_pixel_boost_choices(program : ArgumentParser) -> List[str]:
known_args, _ = program.parse_known_args()
return processors_choices.face_swapper_set.get(known_args.face_swapper_model) #type:ignore[call-overload]