* Cosmetic changes

* Cosmetic changes

* Run single warm up for the benchmark suite

* Use latest version of Gradio

* More testing

* Introduce basic installer

* Fix typo

* Move more to installer file

* Fix the installer with the uninstall all trick

* Adjust wording

* Fix coreml in installer

* Allow Pyhton 3.9

* Add VENV to installer

* Just some cosmetics

* Just some cosmetics

* Dedicated headless mode, Refine API of UI layouts

* Use --headless for pytest

* Fix testing for Windows

* Normalize output path that lacks extension

* Fix CI for Windows

* Fix CI for Windows

* UI to change output path

* Add conda support for the installer

* Improve installer quite a bit

* Drop conda support

* Install community wheels for coreml silicon

* Improve output video component

* Fix silicon wheel downloading

* Remove venv from installer as we cannot activate via subprocess

* Use join to create wheel name

* Refine the output path normalization

* Refine the output path normalization

* Introduce ProcessMode and rename some methods

* Introduce ProcessMode and rename some methods

* Basic webcam integration and open_ffmpeg()

* Basic webcam integration part2

* Benchmark resolutions now selectable

* Rename benchmark resolution back to benchmark runs

* Fix repeating output path in UI

* Keep output_path untouched if not resolvable

* Add more cases to normalize output path

* None for those tests that don't take source path into account

* Finish basic webcam integration, UI layout now with custom run()

* Fix CI and hide link in webcam UI

* Cosmetics on webcam UI

* Move get_device to utilities

* Fix CI

* Introduce output-image-quality, Show and hide UI according to target media type

* Benchmark with partial result updates

* fix: trim frame sliders not appearing after draggin video

* fix: output and temp frame setting inputs not appearing

* Fix: set increased update delay to 250ms to let Gradio update conditional inputs properly

* Reverted .gitignore

* Adjust timings

* Remove timeout hacks and get fully event driven

* Update dependencies

* Update dependencies

* Revert NSFW library, Conditional unset trim args

* Face selector works better on preview slider release

* Add limit resources to UI

* Introduce vision.py for all CV2 operations, Rename some methods

* Add restoring audio failed

* Decouple updates for preview image and preview frame slider, Move reduce_preview_frame to vision

* Refactor detect_fps based on JSON output

* Only webcam when open

* More conditions to vision.py

* Add udp and v4l2 streaming to webcam UI

* Detect v4l2 device to be used

* Refactor code a bit

* Use static max memory for UI

* Fix CI

* Looks stable to me

* Update preview

* Update preview

---------

Co-authored-by: Sumit <vizsumit@gmail.com>
This commit is contained in:
Henry Ruhs 2023-09-06 00:25:18 +02:00 committed by GitHub
parent 4ffae94bac
commit 82eaf76da8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 788 additions and 282 deletions

View File

@ -1,3 +1,3 @@
[flake8]
select = E3, E4, F
per-file-ignores = facefusion/core.py:E402,F401
per-file-ignores = facefusion/core.py:E402, facefusion/installer.py:E402

BIN
.github/preview.png vendored

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.0 MiB

After

Width:  |  Height:  |  Size: 1.0 MiB

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
.assets
.idea
.vscode

View File

@ -18,15 +18,13 @@ Installation
Be aware, the installation needs technical skills and is not for beginners. Please do not open platform and installation related issues on GitHub. We have a very helpful [Discord](https://join.facefusion.io) community that will guide you to install FaceFusion.
[Basic](https://docs.facefusion.io/installation/basic) - It is more likely to work on your computer, but will be quite slow
[Acceleration](https://docs.facefusion.io/installation/acceleration) - Unleash the full potential of your CPU and GPU
Read the [installation](https://docs.facefusion.io/installation) now.
Usage
-----
Start the program with arguments:
Run the program as needed.
```
python run.py [options]
@ -36,7 +34,7 @@ python run.py [options]
-t TARGET_PATH, --target TARGET_PATH select a target image or video
-o OUTPUT_PATH, --output OUTPUT_PATH specify the output file or directory
--frame-processors FRAME_PROCESSORS [FRAME_PROCESSORS ...] choose from the available frame processors (choices: face_enhancer, face_swapper, frame_enhancer, ...)
--ui-layouts UI_LAYOUTS [UI_LAYOUTS ...] choose from the available ui layouts (choices: benchmark, default, ...)
--ui-layouts UI_LAYOUTS [UI_LAYOUTS ...] choose from the available ui layouts (choices: benchmark, webcam, default, ...)
--keep-fps preserve the frames per second (fps) of the target
--keep-temp retain temporary frames after processing
--skip-audio omit audio from the target
@ -51,17 +49,17 @@ python run.py [options]
--trim-frame-end TRIM_FRAME_END specify the end frame for extraction
--temp-frame-format {jpg,png} specify the image format used for frame extraction
--temp-frame-quality [0-100] specify the image quality used for frame extraction
--output-image-quality [0-100] specify the quality used for the output image
--output-video-encoder {libx264,libx265,libvpx-vp9,h264_nvenc,hevc_nvenc} specify the encoder used for the output video
--output-video-quality [0-100] specify the quality used for the output video
--max-memory MAX_MEMORY specify the maximum amount of ram to be used (in gb)
--execution-providers {cpu} [{cpu} ...] choose from the available execution providers (choices: cpu, ...)
--execution-thread-count EXECUTION_THREAD_COUNT specify the number of execution threads
--execution-queue-count EXECUTION_QUEUE_COUNT specify the number of execution queries
--headless run the program in headless mode
-v, --version show program's version number and exit
```
Using the `-s/--source`, `-t/--target` and `-o/--output` arguments will run the program in headless mode.
Disclaimer
----------

View File

@ -1,22 +0,0 @@
from typing import Optional
import cv2
from facefusion.typing import Frame
def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[Frame]:
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return frame
return None
def get_video_frame_total(video_path : str) -> int:
capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total

View File

@ -3,7 +3,7 @@ from typing import List
from facefusion.typing import FaceRecognition, FaceAnalyserDirection, FaceAnalyserAge, FaceAnalyserGender, TempFrameFormat, OutputVideoEncoder
face_recognition : List[FaceRecognition] = [ 'reference', 'many' ]
face_analyser_direction : List[FaceAnalyserDirection] = [ 'left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small']
face_analyser_direction : List[FaceAnalyserDirection] = [ 'left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small' ]
face_analyser_age : List[FaceAnalyserAge] = [ 'child', 'teen', 'adult', 'senior' ]
face_analyser_gender : List[FaceAnalyserGender] = [ 'male', 'female' ]
temp_frame_format : List[TempFrameFormat] = [ 'jpg', 'png' ]

View File

@ -20,7 +20,7 @@ import facefusion.globals
from facefusion import wording, metadata
from facefusion.predictor import predict_image, predict_video
from facefusion.processors.frame.core import get_frame_processors_modules
from facefusion.utilities import is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clear_temp, normalize_output_path, list_module_names, decode_execution_providers, encode_execution_providers
from facefusion.utilities import is_image, is_video, detect_fps, compress_image, merge_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clear_temp, normalize_output_path, list_module_names, decode_execution_providers, encode_execution_providers
warnings.filterwarnings('ignore', category = FutureWarning, module = 'insightface')
warnings.filterwarnings('ignore', category = UserWarning, module = 'torchvision')
@ -32,11 +32,11 @@ def parse_args() -> None:
program.add_argument('-s', '--source', help = wording.get('source_help'), dest = 'source_path')
program.add_argument('-t', '--target', help = wording.get('target_help'), dest = 'target_path')
program.add_argument('-o', '--output', help = wording.get('output_help'), dest = 'output_path')
program.add_argument('--frame-processors', help = wording.get('frame_processors_help').format(choices = ', '.join(list_module_names('facefusion/processors/frame/modules'))), dest = 'frame_processors', default = ['face_swapper'], nargs='+')
program.add_argument('--ui-layouts', help = wording.get('ui_layouts_help').format(choices = ', '.join(list_module_names('facefusion/uis/layouts'))), dest = 'ui_layouts', default = ['default'], nargs='+')
program.add_argument('--keep-fps', help = wording.get('keep_fps_help'), dest = 'keep_fps', action='store_true')
program.add_argument('--keep-temp', help = wording.get('keep_temp_help'), dest = 'keep_temp', action='store_true')
program.add_argument('--skip-audio', help = wording.get('skip_audio_help'), dest = 'skip_audio', action='store_true')
program.add_argument('--frame-processors', help = wording.get('frame_processors_help').format(choices = ', '.join(list_module_names('facefusion/processors/frame/modules'))), dest = 'frame_processors', default = ['face_swapper'], nargs = '+')
program.add_argument('--ui-layouts', help = wording.get('ui_layouts_help').format(choices = ', '.join(list_module_names('facefusion/uis/layouts'))), dest = 'ui_layouts', default = ['default'], nargs = '+')
program.add_argument('--keep-fps', help = wording.get('keep_fps_help'), dest = 'keep_fps', action = 'store_true')
program.add_argument('--keep-temp', help = wording.get('keep_temp_help'), dest = 'keep_temp', action = 'store_true')
program.add_argument('--skip-audio', help = wording.get('skip_audio_help'), dest = 'skip_audio', action = 'store_true')
program.add_argument('--face-recognition', help = wording.get('face_recognition_help'), dest = 'face_recognition', default = 'reference', choices = facefusion.choices.face_recognition)
program.add_argument('--face-analyser-direction', help = wording.get('face_analyser_direction_help'), dest = 'face_analyser_direction', default = 'left-right', choices = facefusion.choices.face_analyser_direction)
program.add_argument('--face-analyser-age', help = wording.get('face_analyser_age_help'), dest = 'face_analyser_age', choices = facefusion.choices.face_analyser_age)
@ -48,20 +48,21 @@ def parse_args() -> None:
program.add_argument('--trim-frame-end', help = wording.get('trim_frame_end_help'), dest = 'trim_frame_end', type = int)
program.add_argument('--temp-frame-format', help = wording.get('temp_frame_format_help'), dest = 'temp_frame_format', default = 'jpg', choices = facefusion.choices.temp_frame_format)
program.add_argument('--temp-frame-quality', help = wording.get('temp_frame_quality_help'), dest = 'temp_frame_quality', type = int, default = 100, choices = range(101), metavar = '[0-100]')
program.add_argument('--output-image-quality', help=wording.get('output_image_quality_help'), dest = 'output_image_quality', type = int, default = 90, choices = range(101), metavar = '[0-100]')
program.add_argument('--output-video-encoder', help = wording.get('output_video_encoder_help'), dest = 'output_video_encoder', default = 'libx264', choices = facefusion.choices.output_video_encoder)
program.add_argument('--output-video-quality', help = wording.get('output_video_quality_help'), dest = 'output_video_quality', type = int, default = 90, choices = range(101), metavar = '[0-100]')
program.add_argument('--max-memory', help = wording.get('max_memory_help'), dest = 'max_memory', type = int)
program.add_argument('--execution-providers', help = wording.get('execution_providers_help').format(choices = 'cpu'), dest = 'execution_providers', default = ['cpu'], choices = suggest_execution_providers_choices(), nargs='+')
program.add_argument('--execution-providers', help = wording.get('execution_providers_help').format(choices = 'cpu'), dest = 'execution_providers', default = ['cpu'], choices = suggest_execution_providers_choices(), nargs = '+')
program.add_argument('--execution-thread-count', help = wording.get('execution_thread_count_help'), dest = 'execution_thread_count', type = int, default = suggest_execution_thread_count_default())
program.add_argument('--execution-queue-count', help = wording.get('execution_queue_count_help'), dest = 'execution_queue_count', type = int, default = 1)
program.add_argument('-v', '--version', action='version', version = metadata.get('name') + ' ' + metadata.get('version'))
program.add_argument('--headless', help = wording.get('headless_help'), dest = 'headless', action = 'store_true')
program.add_argument('-v', '--version', version = metadata.get('name') + ' ' + metadata.get('version'), action = 'version')
args = program.parse_args()
facefusion.globals.source_path = args.source_path
facefusion.globals.target_path = args.target_path
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, args.output_path)
facefusion.globals.headless = facefusion.globals.source_path is not None and facefusion.globals.target_path is not None and facefusion.globals.output_path is not None
facefusion.globals.frame_processors = args.frame_processors
facefusion.globals.ui_layouts = args.ui_layouts
facefusion.globals.keep_fps = args.keep_fps
@ -78,12 +79,14 @@ def parse_args() -> None:
facefusion.globals.trim_frame_end = args.trim_frame_end
facefusion.globals.temp_frame_format = args.temp_frame_format
facefusion.globals.temp_frame_quality = args.temp_frame_quality
facefusion.globals.output_image_quality = args.output_image_quality
facefusion.globals.output_video_encoder = args.output_video_encoder
facefusion.globals.output_video_quality = args.output_video_quality
facefusion.globals.max_memory = args.max_memory
facefusion.globals.execution_providers = decode_execution_providers(args.execution_providers)
facefusion.globals.execution_thread_count = args.execution_thread_count
facefusion.globals.execution_queue_count = args.execution_queue_count
facefusion.globals.headless = args.headless
def suggest_execution_providers_choices() -> List[str]:
@ -122,8 +125,8 @@ def update_status(message : str, scope : str = 'FACEFUSION.CORE') -> None:
def pre_check() -> bool:
if sys.version_info < (3, 10):
update_status(wording.get('python_not_supported').format(version = '3.10'))
if sys.version_info < (3, 9):
update_status(wording.get('python_not_supported').format(version = '3.9'))
return False
if not shutil.which('ffmpeg'):
update_status(wording.get('ffmpeg_not_installed'))
@ -140,6 +143,10 @@ def process_image() -> None:
update_status(wording.get('processing'), frame_processor_module.NAME)
frame_processor_module.process_image(facefusion.globals.source_path, facefusion.globals.output_path, facefusion.globals.output_path)
frame_processor_module.post_process()
# compress image
update_status(wording.get('compressing_image'))
if not compress_image(facefusion.globals.output_path):
update_status(wording.get('compressing_image_failed'))
# validate image
if is_image(facefusion.globals.target_path):
update_status(wording.get('processing_image_succeed'))
@ -166,10 +173,10 @@ def process_video() -> None:
else:
update_status(wording.get('temp_frames_not_found'))
return
# create video
update_status(wording.get('creating_video_fps').format(fps = fps))
if not create_video(facefusion.globals.target_path, fps):
update_status(wording.get('creating_video_failed'))
# merge video
update_status(wording.get('merging_video_fps').format(fps = fps))
if not merge_video(facefusion.globals.target_path, fps):
update_status(wording.get('merging_video_failed'))
return
# handle audio
if facefusion.globals.skip_audio:
@ -177,7 +184,9 @@ def process_video() -> None:
move_temp(facefusion.globals.target_path, facefusion.globals.output_path)
else:
update_status(wording.get('restoring_audio'))
restore_audio(facefusion.globals.target_path, facefusion.globals.output_path)
if not restore_audio(facefusion.globals.target_path, facefusion.globals.output_path):
update_status(wording.get('restoring_audio_failed'))
move_temp(facefusion.globals.target_path, facefusion.globals.output_path)
# clear temp
update_status(wording.get('clearing_temp'))
clear_temp(facefusion.globals.target_path)
@ -190,7 +199,7 @@ def process_video() -> None:
def conditional_process() -> None:
for frame_processor_module in get_frame_processors_modules(facefusion.globals.frame_processors):
if not frame_processor_module.pre_process():
if not frame_processor_module.pre_process('output'):
return
if is_image(facefusion.globals.target_path):
process_image()
@ -207,12 +216,16 @@ def run() -> None:
for frame_processor in get_frame_processors_modules(facefusion.globals.frame_processors):
if not frame_processor.pre_check():
return
# process or launch
# headless or ui
if facefusion.globals.headless:
conditional_process()
else:
import facefusion.uis.core as ui
# pre check
for ui_layout in ui.get_ui_layouts_modules(facefusion.globals.ui_layouts):
if not ui_layout.pre_check():
return
ui.launch()

View File

@ -22,6 +22,7 @@ trim_frame_start : Optional[int] = None
trim_frame_end : Optional[int] = None
temp_frame_format : Optional[TempFrameFormat] = None
temp_frame_quality : Optional[int] = None
output_image_quality : Optional[int] = None
output_video_encoder : Optional[str] = None
output_video_quality : Optional[int] = None
max_memory : Optional[int] = None

49
facefusion/installer.py Normal file
View File

@ -0,0 +1,49 @@
from typing import Dict, Tuple
import os
import sys
import subprocess
import tempfile
subprocess.call([ 'pip', 'install' , 'inquirer', '-q' ])
import inquirer
from facefusion import wording
ONNXRUNTIMES : Dict[str, Tuple[str, str]] =\
{
'cpu': ('onnxruntime', '1.15.1'),
'cuda': ('onnxruntime-gpu', '1.15.1'),
'coreml-legacy': ('onnxruntime-coreml', '1.13.1'),
'coreml-silicon': ('onnxruntime-silicon', '1.14.2'),
'directml': ('onnxruntime-directml', '1.15.1'),
'openvino': ('onnxruntime-openvino', '1.15.0')
}
def run() -> None:
answers : Dict[str, str] = inquirer.prompt(
[
inquirer.List(
'onnxruntime_key',
message = wording.get('select_onnxruntime_install'),
choices = list(ONNXRUNTIMES.keys())
)
])
if answers is not None:
onnxruntime_key = answers['onnxruntime_key']
onnxruntime_name, onnxruntime_version = ONNXRUNTIMES[onnxruntime_key]
python_id = 'cp' + str(sys.version_info.major) + str(sys.version_info.minor)
subprocess.call([ 'pip', 'install', '-r', 'requirements.txt' ])
if onnxruntime_key != 'cpu':
subprocess.call([ 'pip', 'uninstall', 'onnxruntime', onnxruntime_name, '-y' ])
if onnxruntime_key != 'coreml-silicon':
subprocess.call([ 'pip', 'install', onnxruntime_name + '==' + onnxruntime_version ])
elif python_id in [ 'cp39', 'cp310', 'cp311' ]:
wheel_name = '-'.join([ 'onnxruntime_silicon', onnxruntime_version, python_id, python_id, 'macosx_12_0_arm64.whl' ])
wheel_path = os.path.join(tempfile.gettempdir(), wheel_name)
wheel_url = 'https://github.com/cansik/onnxruntime-silicon/releases/download/v' + onnxruntime_version + '/' + wheel_name
subprocess.call([ 'curl', wheel_url, '-o', wheel_path, '-L' ])
subprocess.call([ 'pip', 'install', wheel_path ])
os.remove(wheel_path)

View File

@ -2,7 +2,7 @@ METADATA =\
{
'name': 'FaceFusion',
'description': 'Next generation face swapper and enhancer',
'version': '1.0.0',
'version': '1.1.0',
'license': 'MIT',
'author': 'Henry Ruhs',
'url': 'https://facefusion.io'

View File

@ -70,7 +70,7 @@ def multi_process_frame(source_path : str, temp_frame_paths : List[str], process
def create_queue(temp_frame_paths : List[str]) -> Queue[str]:
queue: Queue[str] = Queue()
queue : Queue[str] = Queue()
for frame_path in temp_frame_paths:
queue.put(frame_path)
return queue
@ -103,11 +103,3 @@ def update_progress(progress : Any = None) -> None:
})
progress.refresh()
progress.update(1)
def get_device() -> str:
if 'CUDAExecutionProvider' in facefusion.globals.execution_providers:
return 'cuda'
if 'CoreMLExecutionProvider' in facefusion.globals.execution_providers:
return 'mps'
return 'cpu'

View File

@ -4,11 +4,10 @@ import threading
from gfpgan.utils import GFPGANer
import facefusion.globals
import facefusion.processors.frame.core as frame_processors
from facefusion import wording
from facefusion import wording, utilities
from facefusion.core import update_status
from facefusion.face_analyser import get_many_faces
from facefusion.typing import Frame, Face
from facefusion.typing import Frame, Face, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video
FRAME_PROCESSOR = None
@ -26,7 +25,7 @@ def get_frame_processor() -> Any:
FRAME_PROCESSOR = GFPGANer(
model_path = model_path,
upscale = 1,
device = frame_processors.get_device()
device = utilities.get_device(facefusion.globals.execution_providers)
)
return FRAME_PROCESSOR
@ -39,14 +38,17 @@ def clear_frame_processor() -> None:
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../.assets/models')
conditional_download(download_directory_path, ['https://github.com/facefusion/facefusion-assets/releases/download/models/GFPGANv1.4.pth'])
conditional_download(download_directory_path, [ 'https://github.com/facefusion/facefusion-assets/releases/download/models/GFPGANv1.4.pth' ])
return True
def pre_process() -> bool:
if not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path):
def pre_process(mode : ProcessMode) -> bool:
if mode in [ 'output', 'preview' ] and not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
return False
if mode == 'output' and not facefusion.globals.output_path:
update_status(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
return False
return True

View File

@ -9,7 +9,7 @@ from facefusion import wording
from facefusion.core import update_status
from facefusion.face_analyser import get_one_face, get_many_faces, find_similar_faces
from facefusion.face_reference import get_face_reference, set_face_reference
from facefusion.typing import Face, Frame
from facefusion.typing import Face, Frame, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video
FRAME_PROCESSOR = None
@ -35,19 +35,21 @@ def clear_frame_processor() -> None:
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../.assets/models')
conditional_download(download_directory_path, ['https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx'])
conditional_download(download_directory_path, [ 'https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx' ])
return True
def pre_process() -> bool:
def pre_process(mode : ProcessMode) -> bool:
if not is_image(facefusion.globals.source_path):
update_status(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME)
return False
elif not get_one_face(cv2.imread(facefusion.globals.source_path)):
update_status(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME)
return False
if not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path):
if mode in [ 'output', 'preview' ] and not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
if mode == 'output' and not facefusion.globals.output_path:
update_status(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
return False
return True

View File

@ -4,8 +4,11 @@ import threading
from basicsr.archs.rrdbnet_arch import RRDBNet
from realesrgan import RealESRGANer
import facefusion
import facefusion.processors.frame.core as frame_processors
from facefusion.typing import Frame, Face
from facefusion import wording, utilities
from facefusion.core import update_status
from facefusion.typing import Frame, Face, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path
FRAME_PROCESSOR = None
@ -30,7 +33,7 @@ def get_frame_processor() -> Any:
num_grow_ch = 32,
scale = 4
),
device = frame_processors.get_device(),
device = utilities.get_device(facefusion.globals.execution_providers),
tile = 512,
tile_pad = 32,
pre_pad = 0,
@ -47,11 +50,14 @@ def clear_frame_processor() -> None:
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../.assets/models')
conditional_download(download_directory_path, ['https://github.com/facefusion/facefusion-assets/releases/download/models/RealESRGAN_x4plus.pth'])
conditional_download(download_directory_path, [ 'https://github.com/facefusion/facefusion-assets/releases/download/models/RealESRGAN_x4plus.pth' ])
return True
def pre_process() -> bool:
def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not facefusion.globals.output_path:
update_status(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
return False
return True

View File

@ -5,6 +5,7 @@ import numpy
Face = Face
Frame = numpy.ndarray[Any, Any]
ProcessMode = Literal[ 'output', 'preview', 'stream' ]
FaceRecognition = Literal[ 'reference', 'many' ]
FaceAnalyserDirection = Literal[ 'left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small' ]
FaceAnalyserAge = Literal[ 'child', 'teen', 'adult', 'senior' ]

View File

@ -1,4 +1,4 @@
from typing import Any, Optional, List
from typing import Any, Optional, List, Dict, Generator
import time
import tempfile
import statistics
@ -6,26 +6,38 @@ import gradio
import facefusion.globals
from facefusion import wording
from facefusion.capturer import get_video_frame_total
from facefusion.core import conditional_process
from facefusion.vision import count_video_frame_total
from facefusion.core import limit_resources, conditional_process
from facefusion.uis.typing import Update
from facefusion.utilities import normalize_output_path, clear_temp
BENCHMARK_RESULT_DATAFRAME : Optional[gradio.Dataframe] = None
BENCHMARK_RESULTS_DATAFRAME : Optional[gradio.Dataframe] = None
BENCHMARK_RUNS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None
BENCHMARK_CYCLES_SLIDER : Optional[gradio.Button] = None
BENCHMARK_START_BUTTON : Optional[gradio.Button] = None
BENCHMARK_CLEAR_BUTTON : Optional[gradio.Button] = None
BENCHMARKS : Dict[str, str] = \
{
'240p': '.assets/examples/target-240p.mp4',
'360p': '.assets/examples/target-360p.mp4',
'540p': '.assets/examples/target-540p.mp4',
'720p': '.assets/examples/target-720p.mp4',
'1080p': '.assets/examples/target-1080p.mp4',
'1440p': '.assets/examples/target-1440p.mp4',
'2160p': '.assets/examples/target-2160p.mp4'
}
def render() -> None:
global BENCHMARK_RESULT_DATAFRAME
global BENCHMARK_RESULTS_DATAFRAME
global BENCHMARK_RUNS_CHECKBOX_GROUP
global BENCHMARK_CYCLES_SLIDER
global BENCHMARK_START_BUTTON
global BENCHMARK_CLEAR_BUTTON
with gradio.Box():
BENCHMARK_RESULT_DATAFRAME = gradio.Dataframe(
label = wording.get('benchmark_result_dataframe_label'),
BENCHMARK_RESULTS_DATAFRAME = gradio.Dataframe(
label = wording.get('benchmark_results_dataframe_label'),
headers =
[
'target_path',
@ -35,8 +47,7 @@ def render() -> None:
'slowest_run',
'relative_fps'
],
col_count = (6, 'fixed'),
row_count = (7, 'fixed'),
row_count = len(BENCHMARKS),
datatype =
[
'str',
@ -47,54 +58,65 @@ def render() -> None:
'number'
]
)
BENCHMARK_CYCLES_SLIDER = gradio.Slider(
label = wording.get('benchmark_cycles_slider_label'),
minimum = 1,
step = 1,
value = 3,
maximum = 10
)
with gradio.Box():
BENCHMARK_RUNS_CHECKBOX_GROUP = gradio.CheckboxGroup(
label = wording.get('benchmark_runs_checkbox_group_label'),
value = list(BENCHMARKS.keys()),
choices = list(BENCHMARKS.keys())
)
BENCHMARK_CYCLES_SLIDER = gradio.Slider(
label = wording.get('benchmark_cycles_slider_label'),
minimum = 1,
step = 1,
value = 3,
maximum = 10
)
with gradio.Row():
BENCHMARK_START_BUTTON = gradio.Button(wording.get('start_button_label'))
BENCHMARK_CLEAR_BUTTON = gradio.Button(wording.get('clear_button_label'))
def listen() -> None:
BENCHMARK_START_BUTTON.click(update, inputs = BENCHMARK_CYCLES_SLIDER, outputs = BENCHMARK_RESULT_DATAFRAME)
BENCHMARK_CLEAR_BUTTON.click(clear, outputs = BENCHMARK_RESULT_DATAFRAME)
BENCHMARK_RUNS_CHECKBOX_GROUP.change(update_benchmark_runs, inputs = BENCHMARK_RUNS_CHECKBOX_GROUP, outputs = BENCHMARK_RUNS_CHECKBOX_GROUP)
BENCHMARK_START_BUTTON.click(start, inputs = [ BENCHMARK_RUNS_CHECKBOX_GROUP, BENCHMARK_CYCLES_SLIDER ], outputs = BENCHMARK_RESULTS_DATAFRAME)
BENCHMARK_CLEAR_BUTTON.click(clear, outputs = BENCHMARK_RESULTS_DATAFRAME)
def update(benchmark_cycles : int) -> Update:
def update_benchmark_runs(benchmark_runs : List[str]) -> Update:
return gradio.update(value = benchmark_runs)
def start(benchmark_runs : List[str], benchmark_cycles : int) -> Generator[List[Any], None, None]:
facefusion.globals.source_path = '.assets/examples/source.jpg'
target_paths =\
[
'.assets/examples/target-240p.mp4',
'.assets/examples/target-360p.mp4',
'.assets/examples/target-540p.mp4',
'.assets/examples/target-720p.mp4',
'.assets/examples/target-1080p.mp4',
'.assets/examples/target-1440p.mp4',
'.assets/examples/target-2160p.mp4'
]
value = [ benchmark(target_path, benchmark_cycles) for target_path in target_paths ]
return gradio.update(value = value)
target_paths = [ BENCHMARKS[benchmark_run] for benchmark_run in benchmark_runs if benchmark_run in BENCHMARKS ]
benchmark_results = []
if target_paths:
warm_up(BENCHMARKS['240p'])
for target_path in target_paths:
benchmark_results.append(benchmark(target_path, benchmark_cycles))
yield benchmark_results
def warm_up(target_path : str) -> None:
facefusion.globals.target_path = target_path
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, tempfile.gettempdir())
conditional_process()
def benchmark(target_path : str, benchmark_cycles : int) -> List[Any]:
process_times = []
total_fps = 0.0
for i in range(benchmark_cycles + 1):
for i in range(benchmark_cycles):
facefusion.globals.target_path = target_path
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, tempfile.gettempdir())
video_frame_total = get_video_frame_total(facefusion.globals.target_path)
video_frame_total = count_video_frame_total(facefusion.globals.target_path)
start_time = time.perf_counter()
limit_resources()
conditional_process()
end_time = time.perf_counter()
process_time = end_time - start_time
fps = video_frame_total / process_time
if i > 0:
process_times.append(process_time)
total_fps += fps
total_fps += video_frame_total / process_time
process_times.append(process_time)
average_run = round(statistics.mean(process_times), 2)
fastest_run = round(min(process_times), 2)
slowest_run = round(max(process_times), 2)

View File

@ -1,5 +1,4 @@
from typing import List, Optional, Tuple, Any, Dict
from time import sleep
import cv2
import gradio
@ -7,7 +6,7 @@ import gradio
import facefusion.choices
import facefusion.globals
from facefusion import wording
from facefusion.capturer import get_video_frame
from facefusion.vision import get_video_frame, normalize_frame_color
from facefusion.face_analyser import get_many_faces
from facefusion.face_reference import clear_face_reference
from facefusion.typing import Frame, FaceRecognition
@ -26,7 +25,8 @@ def render() -> None:
global REFERENCE_FACE_DISTANCE_SLIDER
with gradio.Box():
reference_face_gallery_args: Dict[str, Any] = {
reference_face_gallery_args: Dict[str, Any] =\
{
'label': wording.get('reference_face_gallery_label'),
'height': 120,
'object_fit': 'cover',
@ -62,15 +62,17 @@ def listen() -> None:
FACE_RECOGNITION_DROPDOWN.select(update_face_recognition, inputs = FACE_RECOGNITION_DROPDOWN, outputs = [ REFERENCE_FACE_POSITION_GALLERY, REFERENCE_FACE_DISTANCE_SLIDER ])
REFERENCE_FACE_POSITION_GALLERY.select(clear_and_update_face_reference_position)
REFERENCE_FACE_DISTANCE_SLIDER.change(update_reference_face_distance, inputs = REFERENCE_FACE_DISTANCE_SLIDER)
update_component_names : List[ComponentName] =\
multi_component_names : List[ComponentName] =\
[
'target_file',
'preview_frame_slider'
'source_image',
'target_image',
'target_video'
]
for component_name in update_component_names:
for component_name in multi_component_names:
component = ui.get_component(component_name)
if component:
component.change(update_face_reference_position, outputs = REFERENCE_FACE_POSITION_GALLERY)
for method in [ 'upload', 'change', 'clear' ]:
getattr(component, method)(update_face_reference_position, outputs = REFERENCE_FACE_POSITION_GALLERY)
select_component_names : List[ComponentName] =\
[
'face_analyser_direction_dropdown',
@ -81,6 +83,9 @@ def listen() -> None:
component = ui.get_component(component_name)
if component:
component.select(update_face_reference_position, outputs = REFERENCE_FACE_POSITION_GALLERY)
preview_frame_slider = ui.get_component('preview_frame_slider')
if preview_frame_slider:
preview_frame_slider.release(update_face_reference_position, outputs = REFERENCE_FACE_POSITION_GALLERY)
def update_face_recognition(face_recognition : FaceRecognition) -> Tuple[Update, Update]:
@ -98,7 +103,6 @@ def clear_and_update_face_reference_position(event: gradio.SelectData) -> Update
def update_face_reference_position(reference_face_position : int = 0) -> Update:
sleep(0.2)
gallery_frames = []
facefusion.globals.reference_face_position = reference_face_position
if is_image(facefusion.globals.target_path):
@ -129,5 +133,6 @@ def extract_gallery_frames(reference_frame : Frame) -> List[Frame]:
end_x = max(0, end_x + padding_x)
end_y = max(0, end_y + padding_y)
crop_frame = reference_frame[start_y:end_y, start_x:end_x]
crop_frames.append(ui.normalize_frame(crop_frame))
crop_frame = normalize_frame_color(crop_frame)
crop_frames.append(crop_frame)
return crop_frames

View File

@ -0,0 +1,29 @@
from typing import Optional
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.uis.typing import Update
MAX_MEMORY_SLIDER : Optional[gradio.Slider] = None
def render() -> None:
global MAX_MEMORY_SLIDER
with gradio.Box():
MAX_MEMORY_SLIDER = gradio.Slider(
label = wording.get('max_memory_slider_label'),
minimum = 0,
maximum = 128,
step = 1
)
def listen() -> None:
MAX_MEMORY_SLIDER.change(update_max_memory, inputs = MAX_MEMORY_SLIDER, outputs = MAX_MEMORY_SLIDER)
def update_max_memory(max_memory : int) -> Update:
facefusion.globals.max_memory = max_memory if max_memory > 0 else None
return gradio.update(value = max_memory)

View File

@ -1,23 +1,26 @@
import tempfile
from typing import Tuple, Optional
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.core import conditional_process
from facefusion.core import limit_resources, conditional_process
from facefusion.uis.typing import Update
from facefusion.utilities import is_image, is_video, normalize_output_path, clear_temp
OUTPUT_START_BUTTON : Optional[gradio.Button] = None
OUTPUT_CLEAR_BUTTON : Optional[gradio.Button] = None
OUTPUT_IMAGE : Optional[gradio.Image] = None
OUTPUT_VIDEO : Optional[gradio.Video] = None
OUTPUT_PATH_TEXTBOX : Optional[gradio.Textbox] = None
OUTPUT_START_BUTTON : Optional[gradio.Button] = None
OUTPUT_CLEAR_BUTTON : Optional[gradio.Button] = None
def render() -> None:
global OUTPUT_START_BUTTON
global OUTPUT_CLEAR_BUTTON
global OUTPUT_IMAGE
global OUTPUT_VIDEO
global OUTPUT_PATH_TEXTBOX
global OUTPUT_START_BUTTON
global OUTPUT_CLEAR_BUTTON
with gradio.Row():
with gradio.Box():
@ -28,25 +31,36 @@ def render() -> None:
OUTPUT_VIDEO = gradio.Video(
label = wording.get('output_image_or_video_label')
)
OUTPUT_PATH_TEXTBOX = gradio.Textbox(
label = wording.get('output_path_textbox_label'),
value = facefusion.globals.output_path or tempfile.gettempdir(),
max_lines = 1
)
with gradio.Row():
OUTPUT_START_BUTTON = gradio.Button(wording.get('start_button_label'))
OUTPUT_CLEAR_BUTTON = gradio.Button(wording.get('clear_button_label'))
def listen() -> None:
OUTPUT_START_BUTTON.click(update, outputs = [ OUTPUT_IMAGE, OUTPUT_VIDEO ])
OUTPUT_PATH_TEXTBOX.change(update_output_path, inputs = OUTPUT_PATH_TEXTBOX, outputs = OUTPUT_PATH_TEXTBOX)
OUTPUT_START_BUTTON.click(start, inputs = OUTPUT_PATH_TEXTBOX, outputs = [ OUTPUT_IMAGE, OUTPUT_VIDEO ])
OUTPUT_CLEAR_BUTTON.click(clear, outputs = [ OUTPUT_IMAGE, OUTPUT_VIDEO ])
def update() -> Tuple[Update, Update]:
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, '.')
if facefusion.globals.output_path:
conditional_process()
if is_image(facefusion.globals.output_path):
return gradio.update(value = facefusion.globals.output_path, visible = True), gradio.update(value = None, visible = False)
if is_video(facefusion.globals.output_path):
return gradio.update(value = None, visible = False), gradio.update(value = facefusion.globals.output_path, visible = True)
return gradio.update(value = None, visible = False), gradio.update(value = None, visible = False)
def start(output_path : str) -> Tuple[Update, Update]:
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, output_path)
limit_resources()
conditional_process()
if is_image(facefusion.globals.output_path):
return gradio.update(value = facefusion.globals.output_path, visible = True), gradio.update(value = None, visible = False)
if is_video(facefusion.globals.output_path):
return gradio.update(value = None, visible = False), gradio.update(value = facefusion.globals.output_path, visible = True)
return gradio.update(), gradio.update()
def update_output_path(output_path : str) -> Update:
facefusion.globals.output_path = output_path
return gradio.update(value = output_path)
def clear() -> Tuple[Update, Update]:

View File

@ -1,36 +1,73 @@
from typing import Optional
from typing import Optional, Tuple, List
import gradio
import facefusion.choices
import facefusion.globals
from facefusion import wording
from facefusion.typing import OutputVideoEncoder
from facefusion.uis.typing import Update
from facefusion.uis import core as ui
from facefusion.uis.typing import Update, ComponentName
from facefusion.utilities import is_image, is_video
OUTPUT_IMAGE_QUALITY_SLIDER : Optional[gradio.Slider] = None
OUTPUT_VIDEO_ENCODER_DROPDOWN : Optional[gradio.Dropdown] = None
OUTPUT_VIDEO_QUALITY_SLIDER : Optional[gradio.Slider] = None
def render() -> None:
global OUTPUT_IMAGE_QUALITY_SLIDER
global OUTPUT_VIDEO_ENCODER_DROPDOWN
global OUTPUT_VIDEO_QUALITY_SLIDER
with gradio.Box():
OUTPUT_IMAGE_QUALITY_SLIDER = gradio.Slider(
label = wording.get('output_image_quality_slider_label'),
value = facefusion.globals.output_image_quality,
step = 1,
visible = is_image(facefusion.globals.target_path)
)
OUTPUT_VIDEO_ENCODER_DROPDOWN = gradio.Dropdown(
label = wording.get('output_video_encoder_dropdown_label'),
choices = facefusion.choices.output_video_encoder,
value = facefusion.globals.output_video_encoder
value = facefusion.globals.output_video_encoder,
visible = is_video(facefusion.globals.target_path)
)
OUTPUT_VIDEO_QUALITY_SLIDER = gradio.Slider(
label = wording.get('output_video_quality_slider_label'),
value = facefusion.globals.output_video_quality,
step = 1
step = 1,
visible = is_video(facefusion.globals.target_path)
)
def listen() -> None:
OUTPUT_IMAGE_QUALITY_SLIDER.change(update_output_image_quality, inputs = OUTPUT_IMAGE_QUALITY_SLIDER, outputs = OUTPUT_IMAGE_QUALITY_SLIDER)
OUTPUT_VIDEO_ENCODER_DROPDOWN.select(update_output_video_encoder, inputs = OUTPUT_VIDEO_ENCODER_DROPDOWN, outputs = OUTPUT_VIDEO_ENCODER_DROPDOWN)
OUTPUT_VIDEO_QUALITY_SLIDER.change(update_output_video_quality, inputs = OUTPUT_VIDEO_QUALITY_SLIDER, outputs = OUTPUT_VIDEO_QUALITY_SLIDER)
multi_component_names : List[ComponentName] =\
[
'source_image',
'target_image',
'target_video'
]
for component_name in multi_component_names:
component = ui.get_component(component_name)
if component:
for method in [ 'upload', 'change', 'clear' ]:
getattr(component, method)(remote_update, outputs = [ OUTPUT_IMAGE_QUALITY_SLIDER, OUTPUT_VIDEO_ENCODER_DROPDOWN, OUTPUT_VIDEO_QUALITY_SLIDER ])
def remote_update() -> Tuple[Update, Update, Update]:
if is_image(facefusion.globals.target_path):
return gradio.update(visible = True), gradio.update(visible = False), gradio.update(visible = False)
if is_video(facefusion.globals.target_path):
return gradio.update(visible = False), gradio.update(visible = True), gradio.update(visible = True)
return gradio.update(visible = False), gradio.update(visible = False), gradio.update(visible = False)
def update_output_image_quality(output_image_quality : int) -> Update:
facefusion.globals.output_image_quality = output_image_quality
return gradio.update(value = output_image_quality)
def update_output_video_encoder(output_video_encoder: OutputVideoEncoder) -> Update:

View File

@ -1,11 +1,10 @@
from time import sleep
from typing import Any, Dict, Tuple, List, Optional
from typing import Any, Dict, List, Optional
import cv2
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.capturer import get_video_frame, get_video_frame_total
from facefusion.vision import get_video_frame, count_video_frame_total, normalize_frame_color, resize_frame_dimension
from facefusion.face_analyser import get_one_face
from facefusion.face_reference import get_face_reference, set_face_reference
from facefusion.predictor import predict_frame
@ -24,25 +23,27 @@ def render() -> None:
global PREVIEW_FRAME_SLIDER
with gradio.Box():
preview_image_args: Dict[str, Any] = {
preview_image_args: Dict[str, Any] =\
{
'label': wording.get('preview_image_label')
}
preview_frame_slider_args: Dict[str, Any] = {
preview_frame_slider_args: Dict[str, Any] =\
{
'label': wording.get('preview_frame_slider_label'),
'step': 1,
'visible': False
}
if is_image(facefusion.globals.target_path):
target_frame = cv2.imread(facefusion.globals.target_path)
preview_frame = extract_preview_frame(target_frame)
preview_image_args['value'] = ui.normalize_frame(preview_frame)
preview_frame = process_preview_frame(target_frame)
preview_image_args['value'] = normalize_frame_color(preview_frame)
if is_video(facefusion.globals.target_path):
temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
preview_frame = extract_preview_frame(temp_frame)
preview_image_args['value'] = ui.normalize_frame(preview_frame)
preview_frame = process_preview_frame(temp_frame)
preview_image_args['value'] = normalize_frame_color(preview_frame)
preview_image_args['visible'] = True
preview_frame_slider_args['value'] = facefusion.globals.reference_frame_number
preview_frame_slider_args['maximum'] = get_video_frame_total(facefusion.globals.target_path)
preview_frame_slider_args['maximum'] = count_video_frame_total(facefusion.globals.target_path)
preview_frame_slider_args['visible'] = True
PREVIEW_IMAGE = gradio.Image(**preview_image_args)
PREVIEW_FRAME_SLIDER = gradio.Slider(**preview_frame_slider_args)
@ -50,19 +51,28 @@ def render() -> None:
def listen() -> None:
PREVIEW_FRAME_SLIDER.change(update, inputs = PREVIEW_FRAME_SLIDER, outputs = [ PREVIEW_IMAGE, PREVIEW_FRAME_SLIDER ])
PREVIEW_FRAME_SLIDER.change(update_preview_image, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_IMAGE)
multi_component_names : List[ComponentName] =\
[
'source_image',
'target_image',
'target_video'
]
for component_name in multi_component_names:
component = ui.get_component(component_name)
if component:
for method in [ 'upload', 'change', 'clear' ]:
getattr(component, method)(update_preview_image, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_IMAGE)
getattr(component, method)(update_preview_frame_slider, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_FRAME_SLIDER)
update_component_names : List[ComponentName] =\
[
'source_file',
'target_file',
'face_recognition_dropdown',
'reference_face_distance_slider',
'frame_processors_checkbox_group'
]
for component_name in update_component_names:
component = ui.get_component(component_name)
if component:
component.change(update, inputs = PREVIEW_FRAME_SLIDER, outputs = [ PREVIEW_IMAGE, PREVIEW_FRAME_SLIDER ])
component.change(update_preview_image, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_IMAGE)
select_component_names : List[ComponentName] =\
[
'reference_face_position_gallery',
@ -73,37 +83,48 @@ def listen() -> None:
for component_name in select_component_names:
component = ui.get_component(component_name)
if component:
component.select(update, inputs = PREVIEW_FRAME_SLIDER, outputs = [ PREVIEW_IMAGE, PREVIEW_FRAME_SLIDER ])
component.select(update_preview_image, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_IMAGE)
reference_face_distance_slider = ui.get_component('reference_face_distance_slider')
if reference_face_distance_slider:
reference_face_distance_slider.change(update_preview_image, inputs = PREVIEW_FRAME_SLIDER, outputs = PREVIEW_IMAGE)
def update(frame_number : int = 0) -> Tuple[Update, Update]:
sleep(0.1)
def update_preview_image(frame_number : int = 0) -> Update:
if is_image(facefusion.globals.target_path):
conditional_set_face_reference()
target_frame = cv2.imread(facefusion.globals.target_path)
preview_frame = extract_preview_frame(target_frame)
return gradio.update(value = ui.normalize_frame(preview_frame)), gradio.update(value = None, maximum = None, visible = False)
preview_frame = process_preview_frame(target_frame)
preview_frame = normalize_frame_color(preview_frame)
return gradio.update(value = preview_frame)
if is_video(facefusion.globals.target_path):
conditional_set_face_reference()
facefusion.globals.reference_frame_number = frame_number
temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
preview_frame = process_preview_frame(temp_frame)
preview_frame = normalize_frame_color(preview_frame)
return gradio.update(value = preview_frame)
return gradio.update(value = None)
def update_preview_frame_slider(frame_number : int = 0) -> Update:
if is_image(facefusion.globals.target_path):
return gradio.update(value = None, maximum = None, visible = False)
if is_video(facefusion.globals.target_path):
facefusion.globals.reference_frame_number = frame_number
video_frame_total = get_video_frame_total(facefusion.globals.target_path)
temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
preview_frame = extract_preview_frame(temp_frame)
return gradio.update(value = ui.normalize_frame(preview_frame)), gradio.update(maximum = video_frame_total, visible = True)
return gradio.update(value = None), gradio.update(value = None, maximum = None, visible = False)
video_frame_total = count_video_frame_total(facefusion.globals.target_path)
return gradio.update(maximum = video_frame_total, visible = True)
return gradio.update(value = None, maximum = None, visible = False)
def extract_preview_frame(temp_frame : Frame) -> Frame:
def process_preview_frame(temp_frame : Frame) -> Frame:
if predict_frame(temp_frame):
return cv2.GaussianBlur(temp_frame, (99, 99), 0)
source_face = get_one_face(cv2.imread(facefusion.globals.source_path)) if facefusion.globals.source_path else None
temp_frame = reduce_preview_frame(temp_frame)
if 'reference' in facefusion.globals.face_recognition and not get_face_reference():
reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
reference_face = get_one_face(reference_frame, facefusion.globals.reference_face_position)
set_face_reference(reference_face)
reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None
temp_frame = resize_frame_dimension(temp_frame, 480)
for frame_processor in facefusion.globals.frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
if frame_processor_module.pre_process():
if frame_processor_module.pre_process('preview'):
temp_frame = frame_processor_module.process_frame(
source_face,
reference_face,
@ -112,10 +133,8 @@ def extract_preview_frame(temp_frame : Frame) -> Frame:
return temp_frame
def reduce_preview_frame(temp_frame : Frame, max_height : int = 480) -> Frame:
height, width = temp_frame.shape[:2]
if height > max_height:
scale = max_height / height
max_width = int(width * scale)
temp_frame = cv2.resize(temp_frame, (max_width, max_height))
return temp_frame
def conditional_set_face_reference() -> None:
if 'reference' in facefusion.globals.face_recognition and not get_face_reference():
reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
reference_face = get_one_face(reference_frame, facefusion.globals.reference_face_position)
set_face_reference(reference_face)

View File

@ -19,7 +19,7 @@ def render() -> None:
is_source_image = is_image(facefusion.globals.source_path)
SOURCE_FILE = gradio.File(
file_count = 'single',
file_types=
file_types =
[
'.png',
'.jpg',
@ -28,12 +28,12 @@ def render() -> None:
label = wording.get('source_file_label'),
value = facefusion.globals.source_path if is_source_image else None
)
ui.register_component('source_file', SOURCE_FILE)
SOURCE_IMAGE = gradio.Image(
value = SOURCE_FILE.value['name'] if is_source_image else None,
visible = is_source_image,
show_label = False
)
ui.register_component('source_image', SOURCE_IMAGE)
def listen() -> None:

View File

@ -43,7 +43,8 @@ def render() -> None:
visible = is_target_video,
show_label = False
)
ui.register_component('target_file', TARGET_FILE)
ui.register_component('target_image', TARGET_IMAGE)
ui.register_component('target_video', TARGET_VIDEO)
def listen() -> None:

View File

@ -1,12 +1,13 @@
from typing import Optional
from typing import Optional, Tuple
import gradio
import facefusion.choices
import facefusion.globals
from facefusion import wording
from facefusion.typing import TempFrameFormat
from facefusion.uis import core as ui
from facefusion.uis.typing import Update
from facefusion.utilities import is_video
TEMP_FRAME_FORMAT_DROPDOWN : Optional[gradio.Dropdown] = None
TEMP_FRAME_QUALITY_SLIDER : Optional[gradio.Slider] = None
@ -20,18 +21,30 @@ def render() -> None:
TEMP_FRAME_FORMAT_DROPDOWN = gradio.Dropdown(
label = wording.get('temp_frame_format_dropdown_label'),
choices = facefusion.choices.temp_frame_format,
value = facefusion.globals.temp_frame_format
value = facefusion.globals.temp_frame_format,
visible = is_video(facefusion.globals.target_path)
)
TEMP_FRAME_QUALITY_SLIDER = gradio.Slider(
label = wording.get('temp_frame_quality_slider_label'),
value = facefusion.globals.temp_frame_quality,
step = 1
step = 1,
visible = is_video(facefusion.globals.target_path)
)
def listen() -> None:
TEMP_FRAME_FORMAT_DROPDOWN.select(update_temp_frame_format, inputs = TEMP_FRAME_FORMAT_DROPDOWN, outputs = TEMP_FRAME_FORMAT_DROPDOWN)
TEMP_FRAME_QUALITY_SLIDER.change(update_temp_frame_quality, inputs = TEMP_FRAME_QUALITY_SLIDER, outputs = TEMP_FRAME_QUALITY_SLIDER)
target_video = ui.get_component('target_video')
if target_video:
for method in [ 'upload', 'change', 'clear' ]:
getattr(target_video, method)(remote_update, outputs = [ TEMP_FRAME_FORMAT_DROPDOWN, TEMP_FRAME_QUALITY_SLIDER ])
def remote_update() -> Tuple[Update, Update]:
if is_video(facefusion.globals.target_path):
return gradio.update(visible = True), gradio.update(visible = True)
return gradio.update(visible = False), gradio.update(visible = False)
def update_temp_frame_format(temp_frame_format : TempFrameFormat) -> Update:

View File

@ -1,11 +1,9 @@
from time import sleep
from typing import Any, Dict, Tuple, Optional
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.capturer import get_video_frame_total
from facefusion.vision import count_video_frame_total
from facefusion.uis import core as ui
from facefusion.uis.typing import Update
from facefusion.utilities import is_video
@ -19,18 +17,20 @@ def render() -> None:
global TRIM_FRAME_END_SLIDER
with gradio.Box():
trim_frame_start_slider_args : Dict[str, Any] = {
trim_frame_start_slider_args : Dict[str, Any] =\
{
'label': wording.get('trim_frame_start_slider_label'),
'step': 1,
'visible': False
}
trim_frame_end_slider_args : Dict[str, Any] = {
trim_frame_end_slider_args : Dict[str, Any] =\
{
'label': wording.get('trim_frame_end_slider_label'),
'step': 1,
'visible': False
}
if is_video(facefusion.globals.target_path):
video_frame_total = get_video_frame_total(facefusion.globals.target_path)
video_frame_total = count_video_frame_total(facefusion.globals.target_path)
trim_frame_start_slider_args['value'] = facefusion.globals.trim_frame_start or 0
trim_frame_start_slider_args['maximum'] = video_frame_total
trim_frame_start_slider_args['visible'] = True
@ -43,23 +43,29 @@ def render() -> None:
def listen() -> None:
target_file = ui.get_component('target_file')
if target_file:
target_file.change(remote_update, outputs = [ TRIM_FRAME_START_SLIDER, TRIM_FRAME_END_SLIDER ])
TRIM_FRAME_START_SLIDER.change(lambda value : update_number('trim_frame_start', int(value)), inputs = TRIM_FRAME_START_SLIDER, outputs = TRIM_FRAME_START_SLIDER)
TRIM_FRAME_END_SLIDER.change(lambda value : update_number('trim_frame_end', int(value)), inputs = TRIM_FRAME_END_SLIDER, outputs = TRIM_FRAME_END_SLIDER)
TRIM_FRAME_START_SLIDER.change(update_trim_frame_start, inputs = TRIM_FRAME_START_SLIDER, outputs = TRIM_FRAME_START_SLIDER)
TRIM_FRAME_END_SLIDER.change(update_trim_frame_end, inputs = TRIM_FRAME_END_SLIDER, outputs = TRIM_FRAME_END_SLIDER)
target_video = ui.get_component('target_video')
if target_video:
for method in [ 'upload', 'change', 'clear' ]:
getattr(target_video, method)(remote_update, outputs = [ TRIM_FRAME_START_SLIDER, TRIM_FRAME_END_SLIDER ])
def remote_update() -> Tuple[Update, Update]:
sleep(0.1)
if is_video(facefusion.globals.target_path):
video_frame_total = get_video_frame_total(facefusion.globals.target_path)
facefusion.globals.trim_frame_start = 0
facefusion.globals.trim_frame_end = video_frame_total
video_frame_total = count_video_frame_total(facefusion.globals.target_path)
facefusion.globals.trim_frame_start = None
facefusion.globals.trim_frame_end = None
return gradio.update(value = 0, maximum = video_frame_total, visible = True), gradio.update(value = video_frame_total, maximum = video_frame_total, visible = True)
return gradio.update(value = None, maximum = None, visible = False), gradio.update(value = None, maximum = None, visible = False)
def update_number(name : str, value : int) -> Update:
setattr(facefusion.globals, name, value)
return gradio.update(value = value)
def update_trim_frame_start(trim_frame_start : int) -> Update:
facefusion.globals.trim_frame_start = trim_frame_start if trim_frame_start > 0 else None
return gradio.update(value = trim_frame_start)
def update_trim_frame_end(trim_frame_end : int) -> Update:
video_frame_total = count_video_frame_total(facefusion.globals.target_path)
facefusion.globals.trim_frame_end = trim_frame_end if trim_frame_end < video_frame_total else None
return gradio.update(value = trim_frame_end)

View File

@ -0,0 +1,103 @@
from typing import Optional, Generator
import os
import subprocess
import cv2
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.typing import Frame
from facefusion.face_analyser import get_one_face
from facefusion.processors.frame.core import load_frame_processor_module
from facefusion.uis.typing import StreamMode, WebcamMode, Update
from facefusion.utilities import open_ffmpeg
from facefusion.vision import normalize_frame_color
WEBCAM_IMAGE : Optional[gradio.Image] = None
WEBCAM_MODE_RADIO : Optional[gradio.Radio] = None
WEBCAM_START_BUTTON : Optional[gradio.Button] = None
WEBCAM_STOP_BUTTON : Optional[gradio.Button] = None
def render() -> None:
global WEBCAM_IMAGE
global WEBCAM_MODE_RADIO
global WEBCAM_START_BUTTON
global WEBCAM_STOP_BUTTON
WEBCAM_IMAGE = gradio.Image(
label = wording.get('webcam_image_label')
)
WEBCAM_MODE_RADIO = gradio.Radio(
label = wording.get('webcam_mode_radio_label'),
choices = [ 'inline', 'stream_udp', 'stream_v4l2' ],
value = 'inline'
)
WEBCAM_START_BUTTON = gradio.Button(wording.get('start_button_label'))
WEBCAM_STOP_BUTTON = gradio.Button(wording.get('stop_button_label'))
def listen() -> None:
start_event = WEBCAM_START_BUTTON.click(start, inputs = WEBCAM_MODE_RADIO, outputs = WEBCAM_IMAGE)
WEBCAM_MODE_RADIO.change(update, outputs = WEBCAM_IMAGE, cancels = start_event)
WEBCAM_STOP_BUTTON.click(None, cancels = start_event)
def update() -> Update:
return gradio.update(value = None)
def start(webcam_mode : WebcamMode) -> Generator[Frame, None, None]:
if webcam_mode == 'inline':
yield from start_inline()
if webcam_mode == 'stream_udp':
yield from start_stream('udp')
if webcam_mode == 'stream_v4l2':
yield from start_stream('v4l2')
def start_inline() -> Generator[Frame, None, None]:
facefusion.globals.face_recognition = 'many'
capture = cv2.VideoCapture(0)
if capture.isOpened():
while True:
_, temp_frame = capture.read()
temp_frame = process_stream_frame(temp_frame)
if temp_frame is not None:
yield normalize_frame_color(temp_frame)
def start_stream(mode : StreamMode) -> Generator[None, None, None]:
facefusion.globals.face_recognition = 'many'
capture = cv2.VideoCapture(0)
ffmpeg_process = open_stream(mode)
if capture.isOpened():
while True:
_, frame = capture.read()
temp_frame = process_stream_frame(frame)
if temp_frame is not None:
ffmpeg_process.stdin.write(temp_frame.tobytes())
yield normalize_frame_color(temp_frame)
def process_stream_frame(temp_frame : Frame) -> Frame:
source_face = get_one_face(cv2.imread(facefusion.globals.source_path)) if facefusion.globals.source_path else None
for frame_processor in facefusion.globals.frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
if frame_processor_module.pre_process('stream'):
temp_frame = frame_processor_module.process_frame(
source_face,
None,
temp_frame
)
return temp_frame
def open_stream(mode : StreamMode) -> subprocess.Popen[bytes]:
commands = [ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', '640x480', '-r', '30', '-i', '-' ]
if mode == 'udp':
commands.extend([ '-b:v', '2000k', '-f', 'mpegts', 'udp://localhost:27000' ])
if mode == 'v4l2':
device_name = os.listdir('/sys/devices/virtual/video4linux')[0]
commands.extend([ '-f', 'v4l2', '/dev/' + device_name ])
return open_ffmpeg(commands)

View File

@ -1,33 +1,25 @@
from typing import Dict, Optional, Any
from types import ModuleType
from typing import Dict, Optional, Any, List
import importlib
import sys
import cv2
import gradio
import facefusion.globals
from facefusion import metadata, wording
from facefusion.typing import Frame
from facefusion.uis.typing import Component, ComponentName
COMPONENTS: Dict[ComponentName, Component] = {}
UI_LAYOUT_MODULES : List[ModuleType] = []
UI_LAYOUT_METHODS =\
[
'pre_check',
'pre_render',
'render',
'listen'
'listen',
'run'
]
def launch() -> None:
with gradio.Blocks(theme = get_theme(), title = metadata.get('name') + ' ' + metadata.get('version')) as ui:
for ui_layout in facefusion.globals.ui_layouts:
ui_layout_module = load_ui_layout_module(ui_layout)
ui_layout_module.pre_check()
ui_layout_module.render()
ui_layout_module.listen()
ui.launch(show_api = False)
def load_ui_layout_module(ui_layout : str) -> Any:
try:
ui_layout_module = importlib.import_module('facefusion.uis.layouts.' + ui_layout)
@ -41,6 +33,29 @@ def load_ui_layout_module(ui_layout : str) -> Any:
return ui_layout_module
def get_ui_layouts_modules(ui_layouts : List[str]) -> List[ModuleType]:
global UI_LAYOUT_MODULES
if not UI_LAYOUT_MODULES:
for ui_layout in ui_layouts:
ui_layout_module = load_ui_layout_module(ui_layout)
UI_LAYOUT_MODULES.append(ui_layout_module)
return UI_LAYOUT_MODULES
def launch() -> None:
with gradio.Blocks(theme = get_theme(), title = metadata.get('name') + ' ' + metadata.get('version')) as ui:
for ui_layout in facefusion.globals.ui_layouts:
ui_layout_module = load_ui_layout_module(ui_layout)
if ui_layout_module.pre_render():
ui_layout_module.render()
ui_layout_module.listen()
for ui_layout in facefusion.globals.ui_layouts:
ui_layout_module = load_ui_layout_module(ui_layout)
ui_layout_module.run(ui)
def get_theme() -> gradio.Theme:
return gradio.themes.Soft(
primary_hue = gradio.themes.colors.red,
@ -61,7 +76,3 @@ def get_component(name: ComponentName) -> Optional[Component]:
def register_component(name: ComponentName, component: Component) -> None:
COMPONENTS[name] = component
def normalize_frame(frame : Frame) -> Frame:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

View File

@ -1,6 +1,6 @@
import gradio
from facefusion.uis.components import about, processors, execution, benchmark
from facefusion.uis.components import about, processors, execution, limit_resources, benchmark
from facefusion.utilities import conditional_download
@ -19,6 +19,10 @@ def pre_check() -> bool:
return True
def pre_render() -> bool:
return True
def render() -> gradio.Blocks:
with gradio.Blocks() as layout:
with gradio.Row():
@ -26,6 +30,7 @@ def render() -> gradio.Blocks:
about.render()
processors.render()
execution.render()
limit_resources.render()
with gradio.Column(scale= 5):
benchmark.render()
return layout
@ -34,4 +39,10 @@ def render() -> gradio.Blocks:
def listen() -> None:
processors.listen()
execution.listen()
limit_resources.listen()
benchmark.listen()
def run(ui : gradio.Blocks) -> None:
ui.queue(concurrency_count = 2, api_open = False)
ui.launch(show_api = False)

View File

@ -1,12 +1,16 @@
import gradio
from facefusion.uis.components import about, processors, execution, temp_frame, settings, source, target, preview, trim_frame, face_analyser, face_selector, output_settings, output
from facefusion.uis.components import about, processors, execution, limit_resources, temp_frame, output_settings, settings, source, target, preview, trim_frame, face_analyser, face_selector, output
def pre_check() -> bool:
return True
def pre_render() -> bool:
return True
def render() -> gradio.Blocks:
with gradio.Blocks() as layout:
with gradio.Row():
@ -14,12 +18,13 @@ def render() -> gradio.Blocks:
about.render()
processors.render()
execution.render()
limit_resources.render()
temp_frame.render()
output_settings.render()
settings.render()
with gradio.Column(scale = 2):
source.render()
target.render()
output_settings.render()
output.render()
with gradio.Column(scale = 3):
preview.render()
@ -32,13 +37,18 @@ def render() -> gradio.Blocks:
def listen() -> None:
processors.listen()
execution.listen()
settings.listen()
limit_resources.listen()
temp_frame.listen()
output_settings.listen()
settings.listen()
source.listen()
target.listen()
preview.listen()
trim_frame.listen()
face_selector.listen()
face_analyser.listen()
output_settings.listen()
output.listen()
def run(ui : gradio.Blocks) -> None:
ui.launch(show_api = False)

View File

@ -0,0 +1,38 @@
import gradio
from facefusion.uis.components import about, processors, execution, limit_resources, source, webcam
def pre_check() -> bool:
return True
def pre_render() -> bool:
return True
def render() -> gradio.Blocks:
with gradio.Blocks() as layout:
with gradio.Row():
with gradio.Column(scale = 2):
about.render()
processors.render()
execution.render()
limit_resources.render()
source.render()
with gradio.Column(scale = 5):
webcam.render()
return layout
def listen() -> None:
processors.listen()
execution.listen()
limit_resources.listen()
source.listen()
webcam.listen()
def run(ui : gradio.Blocks) -> None:
ui.queue(concurrency_count = 2, api_open = False)
ui.launch(show_api = False)

View File

@ -4,8 +4,9 @@ import gradio
Component = gradio.File or gradio.Image or gradio.Video or gradio.Slider
ComponentName = Literal\
[
'source_file',
'target_file',
'source_image',
'target_image',
'target_video',
'preview_frame_slider',
'face_recognition_dropdown',
'reference_face_position_gallery',
@ -15,4 +16,6 @@ ComponentName = Literal\
'face_analyser_gender_dropdown',
'frame_processors_checkbox_group'
]
WebcamMode = Literal[ 'inline', 'stream_udp', 'stream_v4l2' ]
StreamMode = Literal['udp', 'v4l2']
Update = Dict[Any, Any]

View File

@ -1,3 +1,7 @@
import json
from typing import List, Optional
from pathlib import Path
from tqdm import tqdm
import glob
import mimetypes
import os
@ -7,11 +11,7 @@ import ssl
import subprocess
import tempfile
import urllib
from pathlib import Path
from typing import List, Optional
import onnxruntime
from tqdm import tqdm
import facefusion.globals
from facefusion import wording
@ -28,28 +28,37 @@ def run_ffmpeg(args : List[str]) -> bool:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ]
commands.extend(args)
try:
subprocess.check_output(commands, stderr = subprocess.STDOUT)
subprocess.run(commands, stderr = subprocess.PIPE, check = True)
return True
except subprocess.CalledProcessError:
return False
def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ]
commands.extend(args)
return subprocess.Popen(commands, stdin = subprocess.PIPE)
def detect_fps(target_path : str) -> Optional[float]:
commands = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers = 1:nokey = 1', target_path ]
output = subprocess.check_output(commands).decode().strip().split('/')
commands = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'json', target_path ]
output = subprocess.check_output(commands).decode().strip()
try:
numerator, denominator = map(int, output)
return numerator / denominator
entries = json.loads(output)
for stream in entries.get('streams'):
numerator, denominator = map(int, stream.get('r_frame_rate').split('/'))
return numerator / denominator
return None
except (ValueError, ZeroDivisionError):
return None
def extract_frames(target_path : str, fps : float) -> bool:
temp_directory_path = get_temp_directory_path(target_path)
temp_frame_quality = round(31 - (facefusion.globals.temp_frame_quality * 0.31))
temp_frame_compression = round(31 - (facefusion.globals.temp_frame_quality * 0.31))
trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end
commands = [ '-hwaccel', 'auto', '-i', target_path, '-q:v', str(temp_frame_quality), '-pix_fmt', 'rgb24', ]
commands = [ '-hwaccel', 'auto', '-i', target_path, '-q:v', str(temp_frame_compression), '-pix_fmt', 'rgb24' ]
if trim_frame_start is not None and trim_frame_end is not None:
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ])
elif trim_frame_start is not None:
@ -62,20 +71,30 @@ def extract_frames(target_path : str, fps : float) -> bool:
return run_ffmpeg(commands)
def create_video(target_path : str, fps : float) -> bool:
def compress_image(output_path : str) -> bool:
output_image_compression = round(31 - (facefusion.globals.output_image_quality * 0.31))
commands = [ '-hwaccel', 'auto', '-i', output_path, '-q:v', str(output_image_compression), '-y', output_path ]
return run_ffmpeg(commands)
def merge_video(target_path : str, fps : float) -> bool:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
output_video_quality = round(51 - (facefusion.globals.output_video_quality * 0.5))
commands = [ '-hwaccel', 'auto', '-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.' + facefusion.globals.temp_frame_format), '-c:v', facefusion.globals.output_video_encoder ]
if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265', 'libvpx' ]:
commands.extend([ '-crf', str(output_video_quality) ])
if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-crf', str(output_video_compression) ])
if facefusion.globals.output_video_encoder in [ 'libvpx' ]:
output_video_compression = round(63 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-crf', str(output_video_compression) ])
if facefusion.globals.output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
commands.extend([ '-cq', str(output_video_quality) ])
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-cq', str(output_video_compression) ])
commands.extend([ '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625', '-y', temp_output_path ])
return run_ffmpeg(commands)
def restore_audio(target_path : str, output_path : str) -> None:
def restore_audio(target_path : str, output_path : str) -> bool:
fps = detect_fps(target_path)
trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end
@ -94,9 +113,7 @@ def restore_audio(target_path : str, output_path : str) -> None:
commands.extend([ '-to', str(end_time) ])
commands.extend([ '-c:a', 'aac' ])
commands.extend([ '-map', '0:v:0', '-map', '1:a:0', '-y', output_path ])
done = run_ffmpeg(commands)
if not done:
move_temp(target_path, output_path)
return run_ffmpeg(commands)
def get_temp_frame_paths(target_path : str) -> List[str]:
@ -114,12 +131,18 @@ def get_temp_output_path(target_path : str) -> str:
return os.path.join(temp_directory_path, TEMP_OUTPUT_NAME)
def normalize_output_path(source_path : str, target_path : str, output_path : str) -> Optional[str]:
if source_path and target_path and output_path:
def normalize_output_path(source_path : Optional[str], target_path : Optional[str], output_path : Optional[str]) -> Optional[str]:
if is_file(source_path) and is_file(target_path) and is_directory(output_path):
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
if is_file(target_path) and output_path:
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
output_name, output_extension = os.path.splitext(os.path.basename(output_path))
output_directory_path = os.path.dirname(output_path)
if is_directory(output_directory_path) and output_extension:
return os.path.join(output_directory_path, output_name + target_extension)
return None
return output_path
@ -130,8 +153,8 @@ def create_temp(target_path : str) -> None:
def move_temp(target_path : str, output_path : str) -> None:
temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path):
if os.path.isfile(output_path):
if is_file(temp_output_path):
if is_file(output_path):
os.remove(output_path)
shutil.move(temp_output_path, output_path)
@ -139,21 +162,29 @@ def move_temp(target_path : str, output_path : str) -> None:
def clear_temp(target_path : str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
if not facefusion.globals.keep_temp and os.path.isdir(temp_directory_path):
if not facefusion.globals.keep_temp and is_directory(temp_directory_path):
shutil.rmtree(temp_directory_path)
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path)
def is_file(file_path : str) -> bool:
return bool(file_path and os.path.isfile(file_path))
def is_directory(directory_path : str) -> bool:
return bool(directory_path and os.path.isdir(directory_path))
def is_image(image_path : str) -> bool:
if image_path and os.path.isfile(image_path):
if is_file(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith('image/'))
return False
def is_video(video_path : str) -> bool:
if video_path and os.path.isfile(video_path):
if is_file(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith('video/'))
return False
@ -178,13 +209,23 @@ def resolve_relative_path(path : str) -> str:
def list_module_names(path : str) -> Optional[List[str]]:
if os.path.exists(path):
files = os.listdir(path)
return [Path(file).stem for file in files if not Path(file).stem.startswith('__')]
return [ Path(file).stem for file in files if not Path(file).stem.startswith('__') ]
return None
def encode_execution_providers(execution_providers : List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
return [ execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers ]
def decode_execution_providers(execution_providers : List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
available_execution_providers = onnxruntime.get_available_providers()
encoded_execution_providers = encode_execution_providers(available_execution_providers)
return [ execution_provider for execution_provider, encoded_execution_provider in zip(available_execution_providers, encoded_execution_providers) if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers) ]
def get_device(execution_providers : List[str]) -> str:
if 'CUDAExecutionProvider' in execution_providers:
return 'cuda'
if 'CoreMLExecutionProvider' in execution_providers:
return 'mps'
return 'cpu'

38
facefusion/vision.py Normal file
View File

@ -0,0 +1,38 @@
from typing import Optional
import cv2
from facefusion.typing import Frame
def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[Frame]:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return frame
return None
def count_video_frame_total(video_path : str) -> int:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total
return 0
def normalize_frame_color(frame : Frame) -> Frame:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
def resize_frame_dimension(frame : Frame, max_height : int) -> Frame:
height, width = frame.shape[:2]
if height > max_height:
scale = max_height / height
max_width = int(width * scale)
frame = cv2.resize(frame, (max_width, max_height))
return frame

View File

@ -1,5 +1,6 @@
WORDING =\
{
'select_onnxruntime_install': 'Select the onnxruntime to be installed',
'python_not_supported': 'Python version is not supported, upgrade to {version} or higher',
'ffmpeg_not_installed': 'FFMpeg is not installed',
'source_help': 'select a source image',
@ -21,21 +22,26 @@ WORDING =\
'trim_frame_end_help': 'specify the end frame for extraction',
'temp_frame_format_help': 'specify the image format used for frame extraction',
'temp_frame_quality_help': 'specify the image quality used for frame extraction',
'output_image_quality_help': 'specify the quality used for the output image',
'output_video_encoder_help': 'specify the encoder used for the output video',
'output_video_quality_help': 'specify the quality used for the output video',
'max_memory_help': 'specify the maximum amount of ram to be used (in gb)',
'execution_providers_help': 'choose from the available execution providers (choices: {choices}, ...)',
'execution_thread_count_help': 'specify the number of execution threads',
'execution_queue_count_help': 'specify the number of execution queries',
'headless_help': 'run the program in headless mode',
'creating_temp': 'Creating temporary resources',
'extracting_frames_fps': 'Extracting frames with {fps} FPS',
'processing': 'Processing',
'downloading': 'Downloading',
'temp_frames_not_found': 'Temporary frames not found',
'creating_video_fps': 'Creating video with {fps} FPS',
'creating_video_failed': 'Creating video failed',
'compressing_image': 'Compressing image',
'compressing_image_failed': 'Compressing image failed',
'merging_video_fps': 'Merging video with {fps} FPS',
'merging_video_failed': 'Merging video failed',
'skipping_audio': 'Skipping audio',
'restoring_audio': 'Restoring audio',
'restoring_audio_failed': 'Restoring audio failed',
'clearing_temp': 'Clearing temporary resources',
'processing_image_succeed': 'Processing to image succeed',
'processing_image_failed': 'Processing to image failed',
@ -43,14 +49,17 @@ WORDING =\
'processing_video_failed': 'Processing to video failed',
'select_image_source': 'Select an image for source path',
'select_image_or_video_target': 'Select an image or video for target path',
'select_file_or_directory_output': 'Select an file or directory for output path',
'no_source_face_detected': 'No source face detected',
'frame_processor_not_loaded': 'Frame processor {frame_processor} could not be loaded',
'frame_processor_not_implemented': 'Frame processor {frame_processor} not implemented correctly',
'ui_layout_not_loaded': 'UI layout {ui_layout} could not be loaded',
'ui_layout_not_implemented': 'UI layout {ui_layout} not implemented correctly',
'start_button_label': 'START',
'stop_button_label': 'STOP',
'clear_button_label': 'CLEAR',
'benchmark_result_dataframe_label': 'BENCHMARK RESULT',
'benchmark_runs_checkbox_group_label': 'BENCHMARK RUNS',
'benchmark_results_dataframe_label': 'BENCHMARK RESULTS',
'benchmark_cycles_slider_label': 'BENCHMARK CYCLES',
'execution_providers_checkbox_group_label': 'EXECUTION PROVIDERS',
'execution_thread_count_slider_label': 'EXECUTION THREAD COUNT',
@ -61,7 +70,10 @@ WORDING =\
'reference_face_gallery_label': 'REFERENCE FACE',
'face_recognition_dropdown_label': 'FACE RECOGNITION',
'reference_face_distance_slider_label': 'REFERENCE FACE DISTANCE',
'max_memory_slider_label': 'MAX MEMORY',
'output_image_or_video_label': 'OUTPUT',
'output_path_textbox_label': 'OUTPUT PATH',
'output_image_quality_slider_label': 'OUTPUT IMAGE QUALITY',
'output_video_encoder_dropdown_label': 'OUTPUT VIDEO ENCODER',
'output_video_quality_slider_label': 'OUTPUT VIDEO QUALITY',
'preview_image_label': 'PREVIEW',
@ -76,6 +88,8 @@ WORDING =\
'trim_frame_end_slider_label': 'TRIM FRAME END',
'source_file_label': 'SOURCE',
'target_file_label': 'TARGET',
'webcam_image_label': 'WEBCAM',
'webcam_mode_radio_label': 'WEBCAM MODE',
'point': '.',
'comma': ',',
'colon': ':',

6
install.py Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from facefusion import installer
if __name__ == '__main__':
installer.run()

View File

@ -1,11 +1,11 @@
insightface==0.7.3
numpy==1.24.3
onnx==1.14.0
onnx==1.14.1
onnxruntime==1.15.1
opencv-python==4.8.0.74
opencv-python==4.8.0.76
opennsfw2==0.10.2
protobuf==4.23.4
protobuf==4.24.2
pytest==7.4.0
psutil==5.9.5
tensorflow==2.13.0
tqdm==4.65.0
tqdm==4.66.1

View File

@ -1,19 +1,16 @@
--extra-index-url https://download.pytorch.org/whl/cu118
gfpgan==1.3.8
gradio==3.40.1
gradio==3.42.0
insightface==0.7.3
numpy==1.24.3
onnx==1.14.0
onnxruntime==1.15.1; python_version != '3.9' and sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-coreml==1.13.1; python_version == '3.9' and sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-gpu==1.15.1; sys_platform != 'darwin'
onnxruntime-silicon==1.13.1; sys_platform == 'darwin' and platform_machine == 'arm64'
opencv-python==4.8.0.74
onnx==1.14.1
onnxruntime==1.15.1
opencv-python==4.8.0.76
opennsfw2==0.10.2
pillow==10.0.0
protobuf==4.23.4
protobuf==4.24.2
psutil==5.9.5
realesrgan==0.3.0
tensorflow==2.13.0
tqdm==4.65.0
tqdm==4.66.1

View File

@ -16,7 +16,7 @@ def before_all() -> None:
def test_image_to_image() -> None:
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.jpg', '-o', '.assets/examples' ]
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.jpg', '-o', '.assets/examples', '--headless' ]
run = subprocess.run(commands, stdout = subprocess.PIPE)
assert run.returncode == 0
@ -24,7 +24,7 @@ def test_image_to_image() -> None:
def test_image_to_video() -> None:
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.mp4', '-o', '.assets/examples', '--trim-frame-end', '10' ]
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.mp4', '-o', '.assets/examples', '--trim-frame-end', '10', '--headless' ]
run = subprocess.run(commands, stdout = subprocess.PIPE)
assert run.returncode == 0

View File

@ -1,9 +1,10 @@
import glob
import platform
import subprocess
import pytest
import facefusion.globals
from facefusion.utilities import conditional_download, detect_fps, extract_frames, create_temp, get_temp_directory_path, clear_temp
from facefusion.utilities import conditional_download, detect_fps, extract_frames, create_temp, get_temp_directory_path, clear_temp, normalize_output_path, is_file, is_directory, is_image, is_video, encode_execution_providers, decode_execution_providers
@pytest.fixture(scope = 'module', autouse = True)
@ -14,6 +15,7 @@ def before_all() -> None:
facefusion.globals.temp_frame_format = 'png'
conditional_download('.assets/examples',
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples/source.jpg',
'https://github.com/facefusion/facefusion-assets/releases/download/examples/target-240p.mp4'
])
subprocess.run([ 'ffmpeg', '-i', '.assets/examples/target-240p.mp4', '-vf', 'fps=25', '.assets/examples/target-240p-25fps.mp4' ])
@ -105,3 +107,48 @@ def test_extract_frames_with_trim_end() -> None:
assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total
clear_temp(target_path)
def test_normalize_output_path() -> None:
if platform.system().lower() != 'windows':
assert normalize_output_path('.assets/examples/source.jpg', None, '.assets/examples/target-240p.mp4') == '.assets/examples/target-240p.mp4'
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', '.assets/examples/target-240p.mp4') == '.assets/examples/target-240p.mp4'
assert normalize_output_path('.assets/examples/source.jpg', '.assets/examples/target-240p.mp4', '.assets/examples') == '.assets/examples/source-target-240p.mp4'
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', '.assets/examples/output.mp4') == '.assets/examples/output.mp4'
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', '.assets/output.mov') == '.assets/output.mp4'
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', '.assets/examples/invalid') is None
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', '.assets/invalid/output.mp4') is None
assert normalize_output_path(None, '.assets/examples/target-240p.mp4', 'invalid') is None
assert normalize_output_path('.assets/examples/source.jpg', '.assets/examples/target-240p.mp4', None) is None
def test_is_file() -> None:
assert is_file('.assets/examples/source.jpg') is True
assert is_file('.assets/examples') is False
assert is_file('invalid') is False
def test_is_directory() -> None:
assert is_directory('.assets/examples') is True
assert is_directory('.assets/examples/source.jpg') is False
assert is_directory('invalid') is False
def test_is_image() -> None:
assert is_image('.assets/examples/source.jpg') is True
assert is_image('.assets/examples/target-240p.mp4') is False
assert is_image('invalid') is False
def test_is_video() -> None:
assert is_video('.assets/examples/target-240p.mp4') is True
assert is_video('.assets/examples/source.jpg') is False
assert is_video('invalid') is False
def test_encode_execution_providers() -> None:
assert encode_execution_providers([ 'CPUExecutionProvider' ]) == [ 'cpu' ]
def test_decode_execution_providers() -> None:
assert decode_execution_providers([ 'cpu' ]) == [ 'CPUExecutionProvider' ]