Introduce bulk-run (#810)

* Introduce bulk-run

* Make bulk run bullet proof

* Integration test for bulk-run
This commit is contained in:
Henry Ruhs 2024-11-12 10:41:25 +01:00 committed by GitHub
parent f53f959510
commit 8385e199f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 120 additions and 1 deletions

View File

@ -5,6 +5,11 @@ source_paths =
target_path =
output_path =
[patterns]
source_pattern =
target_pattern =
output_pattern =
[face_detector]
face_detector_model =
face_detector_size =

View File

@ -15,6 +15,14 @@ def reduce_step_args(args : Args) -> Args:
return step_args
def reduce_job_args(args : Args) -> Args:
job_args =\
{
key: args[key] for key in args if key in job_store.get_job_keys()
}
return job_args
def collect_step_args() -> Args:
step_args =\
{
@ -40,6 +48,10 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
apply_state_item('source_paths', args.get('source_paths'))
apply_state_item('target_path', args.get('target_path'))
apply_state_item('output_path', args.get('output_path'))
# patterns
apply_state_item('source_pattern', args.get('source_pattern'))
apply_state_item('target_pattern', args.get('target_pattern'))
apply_state_item('output_pattern', args.get('output_pattern'))
# face detector
apply_state_item('face_detector_model', args.get('face_detector_model'))
apply_state_item('face_detector_size', args.get('face_detector_size'))

View File

@ -1,3 +1,5 @@
import glob
import itertools
import shutil
import signal
import sys
@ -6,7 +8,7 @@ from time import time
import numpy
from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, logger, process_manager, state_manager, voice_extractor, wording
from facefusion.args import apply_args, collect_job_args, reduce_step_args
from facefusion.args import apply_args, collect_job_args, reduce_job_args, reduce_step_args
from facefusion.common_helper import get_first
from facefusion.content_analyser import analyse_image, analyse_video
from facefusion.download import conditional_download_hashes, conditional_download_sources
@ -74,6 +76,11 @@ def route(args : Args) -> None:
hard_exit(1)
error_core = process_headless(args)
hard_exit(error_core)
if state_manager.get_item('command') == 'bulk-run':
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
error_core = process_bulk(args)
hard_exit(error_core)
if state_manager.get_item('command') in [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]:
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
@ -304,6 +311,27 @@ def process_headless(args : Args) -> ErrorCode:
return 1
def process_bulk(args : Args) -> ErrorCode:
job_id = job_helper.suggest_job_id('bulk')
step_args = reduce_step_args(args)
job_args = reduce_job_args(args)
source_paths = sorted(glob.glob(job_args.get('source_pattern')))
target_paths = sorted(glob.glob(job_args.get('target_pattern')))
if job_manager.create_job(job_id):
for index, (source_path, target_path) in enumerate(itertools.product(source_paths, target_paths)):
step_args['source_paths'] = [ source_path ]
step_args['target_path'] = target_path
step_args['output_path'] = job_args.get('output_pattern').format(index = index)
if not job_manager.add_step(job_id, step_args):
return 1
if job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step):
return 0
return 1
def process_image(start_time : float) -> ErrorCode:
if analyse_image(state_manager.get_item('target_path')):
return 3

View File

@ -67,6 +67,30 @@ def create_output_path_program() -> ArgumentParser:
return program
def create_source_pattern_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_patterns = program.add_argument_group('patterns')
group_patterns.add_argument('-s', '--source-pattern', help = wording.get('help.source_pattern'), default = config.get_str_value('patterns.source_pattern'))
job_store.register_job_keys([ 'source_pattern' ])
return program
def create_target_pattern_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_patterns = program.add_argument_group('patterns')
group_patterns.add_argument('-t', '--target-pattern', help = wording.get('help.target_pattern'), default = config.get_str_value('patterns.target_pattern'))
job_store.register_job_keys([ 'target_pattern' ])
return program
def create_output_pattern_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_patterns = program.add_argument_group('patterns')
group_patterns.add_argument('-o', '--output-pattern', help = wording.get('help.output_pattern'), default = config.get_str_value('patterns.output_pattern'))
job_store.register_job_keys([ 'output_pattern' ])
return program
def create_face_detector_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_face_detector = program.add_argument_group('face detector')
@ -236,6 +260,7 @@ def create_program() -> ArgumentParser:
# general
sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('bulk-run', help = wording.get('help.bulk_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_pattern_program(), create_target_pattern_program(), create_output_pattern_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_log_level_program() ], formatter_class = create_help_formatter_large)
# job manager
sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)

View File

@ -199,6 +199,9 @@ StateKey = Literal\
'source_paths',
'target_path',
'output_path',
'source_pattern',
'target_pattern',
'output_pattern',
'face_detector_model',
'face_detector_size',
'face_detector_angles',
@ -257,6 +260,9 @@ State = TypedDict('State',
'source_paths' : List[str],
'target_path' : str,
'output_path' : str,
'source_pattern' : str,
'target_pattern' : str,
'output_pattern' : str,
'face_detector_model' : FaceDetectorModel,
'face_detector_size' : str,
'face_detector_angles' : List[Angle],

View File

@ -0,0 +1,43 @@
import subprocess
import sys
import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p-a.jpg') ])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '2', get_test_example_file('target-240p-b.jpg') ])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
clear_jobs(get_test_jobs_directory())
init_jobs(get_test_jobs_directory())
prepare_test_output_directory()
def test_bulk_run_image_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'bulk-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-s', get_test_example_file('target-240p-*.jpg'), '-t', get_test_example_file('target-240p-*.jpg'), '-o', get_test_output_file('test-bulk-run-image-to-image-{index}.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-bulk-run-image-to-image-0.jpg') is True
assert is_test_output_file('test-bulk-run-image-to-image-1.jpg') is True
assert is_test_output_file('test-bulk-run-image-to-image-2.jpg') is True
assert is_test_output_file('test-bulk-run-image-to-image-3.jpg') is True
def test_bulk_run_image_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'bulk-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-s', get_test_example_file('target-240p-*.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-bulk-run-image-to-video-{index}.mp4') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-bulk-run-image-to-video-0.mp4') is True
assert is_test_output_file('test-bulk-run-image-to-video-1.mp4') is True