* Allow passing the onnxruntime to install.py

* Fix CI

* Disallow none execution providers in the UI

* Use CV2 to detect fps

* Respect trim on videos with audio

* Respect trim on videos with audio (finally)

* Implement caching to speed up preview and webcam

* Define webcam UI and webcam performance

* Remove layout from components

* Add primary buttons

* Extract benchmark and webcam settings

* Introduce compact UI settings

* Caching for IO and **** prediction

* Caching for IO and **** prediction

* Introduce face analyser caching

* Fix some typing

* Improve setup for benchmark

* Clear image cache via post process

* Fix settings in UI, Simplify restore_audio() using shortest

* Select resolution and fps via webcam ui

* Introduce read_static_image() to stop caching temp images

* Use DirectShow under Windows

* Multi-threading for webcam

* Fix typing

* Refactor frame processor

* Refactor webcam processing

* Avoid warnings due capture.isOpened()

* Resume downloads (#110)

* Introduce resumable downloads

* Fix CURL commands

* Break execution_settings into pieces

* Cosmetic changes on cv2 webcam

* Update Gradio

* Move face cache to own file

* Uniform namings for threading

* Fix sorting of get_temp_frame_paths(), extend get_temp_frames_pattern()

* Minor changes from the review

* Looks stable to tme

* Update the disclaimer

* Update the disclaimer

* Update the disclaimer
This commit is contained in:
Henry Ruhs 2023-09-19 11:21:18 +02:00 committed by GitHub
parent 7f69889c95
commit 66ea4928f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 866 additions and 588 deletions

BIN
.github/preview.png vendored

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.0 MiB

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

@ -24,7 +24,7 @@ Read the [installation](https://docs.facefusion.io/installation) now.
Usage Usage
----- -----
Run the program as needed. Run the command:
``` ```
python run.py [options] python run.py [options]
@ -64,11 +64,11 @@ python run.py [options]
Disclaimer Disclaimer
---------- ----------
This software is meant to be a productive contribution to the rapidly growing AI-generated media industry. It will help artists with tasks such as animating a custom character or using the character as a model for clothing etc. We acknowledge the unethical potential of FaceFusion and are resolutely dedicated to establishing safeguards against such misuse. This program has been engineered to abstain from processing inappropriate content such as nudity, graphic content and sensitive material.
The developers of this software are aware of its possible unethical applications and are committed to take preventative measures against them. It has a built-in check which prevents the program from working on inappropriate media including but not limited to nudity, graphic content, sensitive material such as war footage etc. We will continue to develop this project in the positive direction while adhering to law and ethics. This project may be shut down or include watermarks on the output if requested by law. It is important to note that we maintain a strong stance against any type of pornographic nature and do not collaborate with any websites promoting the unauthorized use of our software.
Users of this software are expected to use this software responsibly while abiding the local law. If face of a real person is being used, users are suggested to get consent from the concerned person and clearly mention that it is a deepfake when posting content online. Developers of this software will not be responsible for actions of end-users. Users who seek to engage in such activities will face consequences, including being banned from our community. We reserve the right to report developers on GitHub who distribute unlocked forks of our software at any time.
Documentation Documentation

View File

@ -8,3 +8,4 @@ face_analyser_age : List[FaceAnalyserAge] = [ 'child', 'teen', 'adult', 'senior'
face_analyser_gender : List[FaceAnalyserGender] = [ 'male', 'female' ] face_analyser_gender : List[FaceAnalyserGender] = [ 'male', 'female' ]
temp_frame_format : List[TempFrameFormat] = [ 'jpg', 'png' ] temp_frame_format : List[TempFrameFormat] = [ 'jpg', 'png' ]
output_video_encoder : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc' ] output_video_encoder : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc' ]

View File

@ -1,13 +1,14 @@
import threading
from typing import Any, Optional, List from typing import Any, Optional, List
import threading
import insightface import insightface
import numpy import numpy
import facefusion.globals import facefusion.globals
from facefusion.face_cache import get_faces_cache, set_faces_cache
from facefusion.typing import Frame, Face, FaceAnalyserDirection, FaceAnalyserAge, FaceAnalyserGender from facefusion.typing import Frame, Face, FaceAnalyserDirection, FaceAnalyserAge, FaceAnalyserGender
FACE_ANALYSER = None FACE_ANALYSER = None
THREAD_LOCK = threading.Lock() THREAD_LOCK : threading.Lock = threading.Lock()
def get_face_analyser() -> Any: def get_face_analyser() -> Any:
@ -38,7 +39,12 @@ def get_one_face(frame : Frame, position : int = 0) -> Optional[Face]:
def get_many_faces(frame : Frame) -> List[Face]: def get_many_faces(frame : Frame) -> List[Face]:
try: try:
faces = get_face_analyser().get(frame) faces_cache = get_faces_cache(frame)
if faces_cache:
faces = faces_cache
else:
faces = get_face_analyser().get(frame)
set_faces_cache(frame, faces)
if facefusion.globals.face_analyser_direction: if facefusion.globals.face_analyser_direction:
faces = sort_by_direction(faces, facefusion.globals.face_analyser_direction) faces = sort_by_direction(faces, facefusion.globals.face_analyser_direction)
if facefusion.globals.face_analyser_age: if facefusion.globals.face_analyser_age:
@ -100,7 +106,3 @@ def filter_by_gender(faces : List[Face], gender : FaceAnalyserGender) -> List[Fa
if face['gender'] == 0 and gender == 'female': if face['gender'] == 0 and gender == 'female':
filter_faces.append(face) filter_faces.append(face)
return filter_faces return filter_faces
def get_faces_total(frame : Frame) -> int:
return len(get_many_faces(frame))

29
facefusion/face_cache.py Normal file
View File

@ -0,0 +1,29 @@
from typing import Optional, List, Dict
import hashlib
from facefusion.typing import Frame, Face
FACES_CACHE : Dict[str, List[Face]] = {}
def get_faces_cache(frame : Frame) -> Optional[List[Face]]:
frame_hash = create_frame_hash(frame)
if frame_hash in FACES_CACHE:
return FACES_CACHE[frame_hash]
return None
def set_faces_cache(frame : Frame, faces : List[Face]) -> None:
frame_hash = create_frame_hash(frame)
if frame_hash:
FACES_CACHE[frame_hash] = faces
def clear_faces_cache() -> None:
global FACES_CACHE
FACES_CACHE = {}
def create_frame_hash(frame : Frame) -> Optional[str]:
return hashlib.sha256(frame.tobytes()).hexdigest() if frame is not None else None

View File

@ -1,6 +1,6 @@
from typing import List, Optional from typing import List, Optional
from facefusion.typing import FaceRecognition, FaceAnalyserDirection, FaceAnalyserAge, FaceAnalyserGender, TempFrameFormat from facefusion.typing import FaceRecognition, FaceAnalyserDirection, FaceAnalyserAge, FaceAnalyserGender, TempFrameFormat, OutputVideoEncoder
source_path : Optional[str] = None source_path : Optional[str] = None
target_path : Optional[str] = None target_path : Optional[str] = None
@ -23,7 +23,7 @@ trim_frame_end : Optional[int] = None
temp_frame_format : Optional[TempFrameFormat] = None temp_frame_format : Optional[TempFrameFormat] = None
temp_frame_quality : Optional[int] = None temp_frame_quality : Optional[int] = None
output_image_quality : Optional[int] = None output_image_quality : Optional[int] = None
output_video_encoder : Optional[str] = None output_video_encoder : Optional[OutputVideoEncoder] = None
output_video_quality : Optional[int] = None output_video_quality : Optional[int] = None
max_memory : Optional[int] = None max_memory : Optional[int] = None
execution_providers : List[str] = [] execution_providers : List[str] = []

View File

@ -1,4 +1,5 @@
from typing import Dict, Tuple from typing import Dict, Tuple
import argparse
import os import os
import sys import sys
import subprocess import subprocess
@ -8,7 +9,7 @@ subprocess.call([ 'pip', 'install' , 'inquirer', '-q' ])
import inquirer import inquirer
from facefusion import wording from facefusion import metadata, wording
ONNXRUNTIMES : Dict[str, Tuple[str, str]] =\ ONNXRUNTIMES : Dict[str, Tuple[str, str]] =\
{ {
@ -22,27 +23,38 @@ ONNXRUNTIMES : Dict[str, Tuple[str, str]] =\
def run() -> None: def run() -> None:
answers : Dict[str, str] = inquirer.prompt( program = argparse.ArgumentParser(formatter_class = lambda prog: argparse.HelpFormatter(prog, max_help_position = 120))
[ program.add_argument('--onnxruntime', help = wording.get('onnxruntime_help'), dest = 'onnxruntime', choices = ONNXRUNTIMES.keys())
inquirer.List( program.add_argument('-v', '--version', version = metadata.get('name') + ' ' + metadata.get('version'), action = 'version')
'onnxruntime_key', args = program.parse_args()
message = wording.get('select_onnxruntime_install'),
choices = list(ONNXRUNTIMES.keys()) if args.onnxruntime:
) answers =\
]) {
'onnxruntime': args.onnxruntime
}
else:
answers = inquirer.prompt(
[
inquirer.List(
'onnxruntime',
message = wording.get('onnxruntime_help'),
choices = list(ONNXRUNTIMES.keys())
)
])
if answers is not None: if answers is not None:
onnxruntime_key = answers['onnxruntime_key'] onnxruntime = answers['onnxruntime']
onnxruntime_name, onnxruntime_version = ONNXRUNTIMES[onnxruntime_key] onnxruntime_name, onnxruntime_version = ONNXRUNTIMES[onnxruntime]
python_id = 'cp' + str(sys.version_info.major) + str(sys.version_info.minor) python_id = 'cp' + str(sys.version_info.major) + str(sys.version_info.minor)
subprocess.call([ 'pip', 'uninstall', 'torch', '-y' ]) subprocess.call([ 'pip', 'uninstall', 'torch', '-y' ])
if onnxruntime_key == 'cuda': if onnxruntime == 'cuda':
subprocess.call([ 'pip', 'install', '-r', 'requirements.txt', '--extra-index-url', 'https://download.pytorch.org/whl/cu118' ]) subprocess.call([ 'pip', 'install', '-r', 'requirements.txt', '--extra-index-url', 'https://download.pytorch.org/whl/cu118' ])
else: else:
subprocess.call([ 'pip', 'install', '-r', 'requirements.txt' ]) subprocess.call([ 'pip', 'install', '-r', 'requirements.txt' ])
if onnxruntime_key != 'cpu': if onnxruntime != 'cpu':
subprocess.call([ 'pip', 'uninstall', 'onnxruntime', onnxruntime_name, '-y' ]) subprocess.call([ 'pip', 'uninstall', 'onnxruntime', onnxruntime_name, '-y' ])
if onnxruntime_key != 'coreml-silicon': if onnxruntime != 'coreml-silicon':
subprocess.call([ 'pip', 'install', onnxruntime_name + '==' + onnxruntime_version ]) subprocess.call([ 'pip', 'install', onnxruntime_name + '==' + onnxruntime_version ])
elif python_id in [ 'cp39', 'cp310', 'cp311' ]: elif python_id in [ 'cp39', 'cp310', 'cp311' ]:
wheel_name = '-'.join([ 'onnxruntime_silicon', onnxruntime_version, python_id, python_id, 'macosx_12_0_arm64.whl' ]) wheel_name = '-'.join([ 'onnxruntime_silicon', onnxruntime_version, python_id, python_id, 'macosx_12_0_arm64.whl' ])

View File

@ -2,7 +2,7 @@ METADATA =\
{ {
'name': 'FaceFusion', 'name': 'FaceFusion',
'description': 'Next generation face swapper and enhancer', 'description': 'Next generation face swapper and enhancer',
'version': '1.1.0', 'version': '1.2.0',
'license': 'MIT', 'license': 'MIT',
'author': 'Henry Ruhs', 'author': 'Henry Ruhs',
'url': 'https://facefusion.io' 'url': 'https://facefusion.io'

View File

@ -1,4 +1,5 @@
import threading import threading
from functools import lru_cache
import numpy import numpy
import opennsfw2 import opennsfw2
from PIL import Image from PIL import Image
@ -7,7 +8,7 @@ from keras import Model
from facefusion.typing import Frame from facefusion.typing import Frame
PREDICTOR = None PREDICTOR = None
THREAD_LOCK = threading.Lock() THREAD_LOCK : threading.Lock = threading.Lock()
MAX_PROBABILITY = 0.75 MAX_PROBABILITY = 0.75
@ -34,10 +35,12 @@ def predict_frame(target_frame : Frame) -> bool:
return probability > MAX_PROBABILITY return probability > MAX_PROBABILITY
def predict_image(target_path : str) -> bool: @lru_cache(maxsize = None)
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY def predict_image(image_path : str) -> bool:
return opennsfw2.predict_image(image_path) > MAX_PROBABILITY
def predict_video(target_path : str) -> bool: @lru_cache(maxsize = None)
_, probabilities = opennsfw2.predict_video_frames(video_path = target_path, frame_interval = 100) def predict_video(video_path : str) -> bool:
_, probabilities = opennsfw2.predict_video_frames(video_path = video_path, frame_interval = 25)
return any(probability > MAX_PROBABILITY for probability in probabilities) return any(probability > MAX_PROBABILITY for probability in probabilities)

View File

@ -57,16 +57,19 @@ def clear_frame_processors_modules() -> None:
FRAME_PROCESSORS_MODULES = [] FRAME_PROCESSORS_MODULES = []
def multi_process_frame(source_path : str, temp_frame_paths : List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None: def multi_process_frames(source_path : str, temp_frame_paths : List[str], process_frames : Callable[[str, List[str], Callable[[], None]], None]) -> None:
with ThreadPoolExecutor(max_workers = facefusion.globals.execution_thread_count) as executor: progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
futures = [] with tqdm(total = len(temp_frame_paths), desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True, bar_format = progress_bar_format) as progress:
queue = create_queue(temp_frame_paths) with ThreadPoolExecutor(max_workers = facefusion.globals.execution_thread_count) as executor:
queue_per_future = max(len(temp_frame_paths) // facefusion.globals.execution_thread_count * facefusion.globals.execution_queue_count, 1) futures = []
while not queue.empty(): queue_temp_frame_paths : Queue[str] = create_queue(temp_frame_paths)
future = executor.submit(process_frames, source_path, pick_queue(queue, queue_per_future), update) queue_per_future = max(len(temp_frame_paths) // facefusion.globals.execution_thread_count * facefusion.globals.execution_queue_count, 1)
futures.append(future) while not queue_temp_frame_paths.empty():
for future in as_completed(futures): payload_temp_frame_paths = pick_queue(queue_temp_frame_paths, queue_per_future)
future.result() future = executor.submit(process_frames, source_path, payload_temp_frame_paths, lambda: update_progress(progress))
futures.append(future)
for future_done in as_completed(futures):
future_done.result()
def create_queue(temp_frame_paths : List[str]) -> Queue[str]: def create_queue(temp_frame_paths : List[str]) -> Queue[str]:
@ -84,13 +87,6 @@ def pick_queue(queue : Queue[str], queue_per_future : int) -> List[str]:
return queues return queues
def process_video(source_path : str, frame_paths : List[str], process_frames : Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total = total, desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True, bar_format = progress_bar_format) as progress:
multi_process_frame(source_path, frame_paths, process_frames, lambda: update_progress(progress))
def update_progress(progress : Any = None) -> None: def update_progress(progress : Any = None) -> None:
process = psutil.Process(os.getpid()) process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 memory_usage = process.memory_info().rss / 1024 / 1024 / 1024

View File

@ -1,5 +1,4 @@
from typing import Any, List, Callable from typing import Any, List, Callable
import cv2
import threading import threading
from gfpgan.utils import GFPGANer from gfpgan.utils import GFPGANer
@ -9,10 +8,11 @@ from facefusion.core import update_status
from facefusion.face_analyser import get_many_faces from facefusion.face_analyser import get_many_faces
from facefusion.typing import Frame, Face, ProcessMode from facefusion.typing import Frame, Face, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video
from facefusion.vision import read_image, read_static_image, write_image
FRAME_PROCESSOR = None FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore() THREAD_SEMAPHORE : threading.Semaphore = threading.Semaphore()
THREAD_LOCK = threading.Lock() THREAD_LOCK : threading.Lock = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_ENHANCER' NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_ENHANCER'
@ -54,6 +54,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None: def post_process() -> None:
clear_frame_processor() clear_frame_processor()
read_static_image.cache_clear()
def enhance_face(target_face : Face, temp_frame : Frame) -> Frame: def enhance_face(target_face : Face, temp_frame : Frame) -> Frame:
@ -83,20 +84,19 @@ def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame)
return temp_frame return temp_frame
def process_frames(source_path : str, temp_frame_paths : List[str], update: Callable[[], None]) -> None: def process_frames(source_path : str, temp_frame_paths : List[str], update_progress: Callable[[], None]) -> None:
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = read_image(temp_frame_path)
result_frame = process_frame(None, None, temp_frame) result_frame = process_frame(None, None, temp_frame)
cv2.imwrite(temp_frame_path, result_frame) write_image(temp_frame_path, result_frame)
if update: update_progress()
update()
def process_image(source_path : str, target_path : str, output_path : str) -> None: def process_image(source_path : str, target_path : str, output_path : str) -> None:
target_frame = cv2.imread(target_path) target_frame = read_static_image(target_path)
result_frame = process_frame(None, None, target_frame) result_frame = process_frame(None, None, target_frame)
cv2.imwrite(output_path, result_frame) write_image(output_path, result_frame)
def process_video(source_path : str, temp_frame_paths : List[str]) -> None: def process_video(source_path : str, temp_frame_paths : List[str]) -> None:
facefusion.processors.frame.core.process_video(None, temp_frame_paths, process_frames) facefusion.processors.frame.core.multi_process_frames(None, temp_frame_paths, process_frames)

View File

@ -1,5 +1,4 @@
from typing import Any, List, Callable from typing import Any, List, Callable
import cv2
import insightface import insightface
import threading import threading
@ -11,9 +10,10 @@ from facefusion.face_analyser import get_one_face, get_many_faces, find_similar_
from facefusion.face_reference import get_face_reference, set_face_reference from facefusion.face_reference import get_face_reference, set_face_reference
from facefusion.typing import Face, Frame, ProcessMode from facefusion.typing import Face, Frame, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video
from facefusion.vision import read_image, read_static_image, write_image
FRAME_PROCESSOR = None FRAME_PROCESSOR = None
THREAD_LOCK = threading.Lock() THREAD_LOCK : threading.Lock = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_SWAPPER' NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_SWAPPER'
@ -43,7 +43,7 @@ def pre_process(mode : ProcessMode) -> bool:
if not is_image(facefusion.globals.source_path): if not is_image(facefusion.globals.source_path):
update_status(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME) update_status(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME)
return False return False
elif not get_one_face(cv2.imread(facefusion.globals.source_path)): elif not get_one_face(read_static_image(facefusion.globals.source_path)):
update_status(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME) update_status(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME)
return False return False
if mode in [ 'output', 'preview' ] and not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path): if mode in [ 'output', 'preview' ] and not is_image(facefusion.globals.target_path) and not is_video(facefusion.globals.target_path):
@ -56,6 +56,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None: def post_process() -> None:
clear_frame_processor() clear_frame_processor()
read_static_image.cache_clear()
def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame: def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
@ -76,32 +77,31 @@ def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame)
return temp_frame return temp_frame
def process_frames(source_path : str, temp_frame_paths : List[str], update: Callable[[], None]) -> None: def process_frames(source_path : str, temp_frame_paths : List[str], update_progress: Callable[[], None]) -> None:
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(read_static_image(source_path))
reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = read_image(temp_frame_path)
result_frame = process_frame(source_face, reference_face, temp_frame) result_frame = process_frame(source_face, reference_face, temp_frame)
cv2.imwrite(temp_frame_path, result_frame) write_image(temp_frame_path, result_frame)
if update: update_progress()
update()
def process_image(source_path : str, target_path : str, output_path : str) -> None: def process_image(source_path : str, target_path : str, output_path : str) -> None:
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(read_static_image(source_path))
target_frame = cv2.imread(target_path) target_frame = read_static_image(target_path)
reference_face = get_one_face(target_frame, facefusion.globals.reference_face_position) if 'reference' in facefusion.globals.face_recognition else None reference_face = get_one_face(target_frame, facefusion.globals.reference_face_position) if 'reference' in facefusion.globals.face_recognition else None
result_frame = process_frame(source_face, reference_face, target_frame) result_frame = process_frame(source_face, reference_face, target_frame)
cv2.imwrite(output_path, result_frame) write_image(output_path, result_frame)
def process_video(source_path : str, temp_frame_paths : List[str]) -> None: def process_video(source_path : str, temp_frame_paths : List[str]) -> None:
conditional_set_face_reference(temp_frame_paths) conditional_set_face_reference(temp_frame_paths)
frame_processors.process_video(source_path, temp_frame_paths, process_frames) frame_processors.multi_process_frames(source_path, temp_frame_paths, process_frames)
def conditional_set_face_reference(temp_frame_paths : List[str]) -> None: def conditional_set_face_reference(temp_frame_paths : List[str]) -> None:
if 'reference' in facefusion.globals.face_recognition and not get_face_reference(): if 'reference' in facefusion.globals.face_recognition and not get_face_reference():
reference_frame = cv2.imread(temp_frame_paths[facefusion.globals.reference_frame_number]) reference_frame = read_static_image(temp_frame_paths[facefusion.globals.reference_frame_number])
reference_face = get_one_face(reference_frame, facefusion.globals.reference_face_position) reference_face = get_one_face(reference_frame, facefusion.globals.reference_face_position)
set_face_reference(reference_face) set_face_reference(reference_face)

View File

@ -1,5 +1,4 @@
from typing import Any, List, Callable from typing import Any, List, Callable
import cv2
import threading import threading
from basicsr.archs.rrdbnet_arch import RRDBNet from basicsr.archs.rrdbnet_arch import RRDBNet
from realesrgan import RealESRGANer from realesrgan import RealESRGANer
@ -10,10 +9,11 @@ from facefusion import wording, utilities
from facefusion.core import update_status from facefusion.core import update_status
from facefusion.typing import Frame, Face, ProcessMode from facefusion.typing import Frame, Face, ProcessMode
from facefusion.utilities import conditional_download, resolve_relative_path from facefusion.utilities import conditional_download, resolve_relative_path
from facefusion.vision import read_image, read_static_image, write_image
FRAME_PROCESSOR = None FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore() THREAD_SEMAPHORE : threading.Semaphore = threading.Semaphore()
THREAD_LOCK = threading.Lock() THREAD_LOCK : threading.Lock = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FRAME_ENHANCER' NAME = 'FACEFUSION.FRAME_PROCESSOR.FRAME_ENHANCER'
@ -63,6 +63,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None: def post_process() -> None:
clear_frame_processor() clear_frame_processor()
read_static_image.cache_clear()
def enhance_frame(temp_frame : Frame) -> Frame: def enhance_frame(temp_frame : Frame) -> Frame:
@ -75,20 +76,19 @@ def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame)
return enhance_frame(temp_frame) return enhance_frame(temp_frame)
def process_frames(source_path : str, temp_frame_paths : List[str], update: Callable[[], None]) -> None: def process_frames(source_path : str, temp_frame_paths : List[str], update_progress: Callable[[], None]) -> None:
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = read_image(temp_frame_path)
result_frame = process_frame(None, None, temp_frame) result_frame = process_frame(None, None, temp_frame)
cv2.imwrite(temp_frame_path, result_frame) write_image(temp_frame_path, result_frame)
if update: update_progress()
update()
def process_image(source_path : str, target_path : str, output_path : str) -> None: def process_image(source_path : str, target_path : str, output_path : str) -> None:
target_frame = cv2.imread(target_path) target_frame = read_static_image(target_path)
result = process_frame(None, None, target_frame) result = process_frame(None, None, target_frame)
cv2.imwrite(output_path, result) write_image(output_path, result)
def process_video(source_path : str, temp_frame_paths : List[str]) -> None: def process_video(source_path : str, temp_frame_paths : List[str]) -> None:
frame_processors.process_video(None, temp_frame_paths, process_frames) frame_processors.multi_process_frames(None, temp_frame_paths, process_frames)

View File

@ -0,0 +1,7 @@
from typing import List
from facefusion.uis.typing import WebcamMode
settings : List[str] = [ 'keep-fps', 'keep-temp', 'skip-audio' ]
webcam_mode : List[WebcamMode] = [ 'inline', 'stream_udp', 'stream_v4l2' ]
webcam_resolution : List[str] = [ '320x240', '640x480', '1280x720', '1920x1080', '2560x1440', '3840x2160' ]

View File

@ -9,5 +9,4 @@ ABOUT_HTML : Optional[gradio.HTML] = None
def render() -> None: def render() -> None:
global ABOUT_HTML global ABOUT_HTML
with gradio.Box(): ABOUT_HTML = gradio.HTML('<center><a href="' + metadata.get('url') + '">' + metadata.get('name') + ' ' + metadata.get('version') + '</a></center>')
ABOUT_HTML = gradio.HTML('<center><a href="' + metadata.get('url') + '">' + metadata.get('name') + ' ' + metadata.get('version') + '</a></center>')

View File

@ -6,17 +6,19 @@ import gradio
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.face_analyser import get_face_analyser
from facefusion.face_cache import clear_faces_cache
from facefusion.processors.frame.core import get_frame_processors_modules
from facefusion.vision import count_video_frame_total from facefusion.vision import count_video_frame_total
from facefusion.core import limit_resources, conditional_process from facefusion.core import limit_resources, conditional_process
from facefusion.uis.typing import Update from facefusion.uis.typing import Update
from facefusion.uis import core as ui
from facefusion.utilities import normalize_output_path, clear_temp from facefusion.utilities import normalize_output_path, clear_temp
BENCHMARK_RESULTS_DATAFRAME : Optional[gradio.Dataframe] = None BENCHMARK_RESULTS_DATAFRAME : Optional[gradio.Dataframe] = None
BENCHMARK_RUNS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None
BENCHMARK_CYCLES_SLIDER : Optional[gradio.Button] = None
BENCHMARK_START_BUTTON : Optional[gradio.Button] = None BENCHMARK_START_BUTTON : Optional[gradio.Button] = None
BENCHMARK_CLEAR_BUTTON : Optional[gradio.Button] = None BENCHMARK_CLEAR_BUTTON : Optional[gradio.Button] = None
BENCHMARKS : Dict[str, str] = \ BENCHMARKS : Dict[str, str] =\
{ {
'240p': '.assets/examples/target-240p.mp4', '240p': '.assets/examples/target-240p.mp4',
'360p': '.assets/examples/target-360p.mp4', '360p': '.assets/examples/target-360p.mp4',
@ -30,77 +32,68 @@ BENCHMARKS : Dict[str, str] = \
def render() -> None: def render() -> None:
global BENCHMARK_RESULTS_DATAFRAME global BENCHMARK_RESULTS_DATAFRAME
global BENCHMARK_RUNS_CHECKBOX_GROUP
global BENCHMARK_CYCLES_SLIDER
global BENCHMARK_START_BUTTON global BENCHMARK_START_BUTTON
global BENCHMARK_CLEAR_BUTTON global BENCHMARK_CLEAR_BUTTON
with gradio.Box(): BENCHMARK_RESULTS_DATAFRAME = gradio.Dataframe(
BENCHMARK_RESULTS_DATAFRAME = gradio.Dataframe( label = wording.get('benchmark_results_dataframe_label'),
label = wording.get('benchmark_results_dataframe_label'), headers =
headers = [
[ 'target_path',
'target_path', 'benchmark_cycles',
'benchmark_cycles', 'average_run',
'average_run', 'fastest_run',
'fastest_run', 'slowest_run',
'slowest_run', 'relative_fps'
'relative_fps' ],
], datatype =
row_count = len(BENCHMARKS), [
datatype = 'str',
[ 'number',
'str', 'number',
'number', 'number',
'number', 'number',
'number', 'number'
'number', ]
'number' )
] BENCHMARK_START_BUTTON = gradio.Button(
) value = wording.get('start_button_label'),
with gradio.Box(): variant = 'primary'
BENCHMARK_RUNS_CHECKBOX_GROUP = gradio.CheckboxGroup( )
label = wording.get('benchmark_runs_checkbox_group_label'), BENCHMARK_CLEAR_BUTTON = gradio.Button(
value = list(BENCHMARKS.keys()), value = wording.get('clear_button_label')
choices = list(BENCHMARKS.keys()) )
)
BENCHMARK_CYCLES_SLIDER = gradio.Slider(
label = wording.get('benchmark_cycles_slider_label'),
minimum = 1,
step = 1,
value = 3,
maximum = 10
)
with gradio.Row():
BENCHMARK_START_BUTTON = gradio.Button(wording.get('start_button_label'))
BENCHMARK_CLEAR_BUTTON = gradio.Button(wording.get('clear_button_label'))
def listen() -> None: def listen() -> None:
BENCHMARK_RUNS_CHECKBOX_GROUP.change(update_benchmark_runs, inputs = BENCHMARK_RUNS_CHECKBOX_GROUP, outputs = BENCHMARK_RUNS_CHECKBOX_GROUP) benchmark_runs_checkbox_group = ui.get_component('benchmark_runs_checkbox_group')
BENCHMARK_START_BUTTON.click(start, inputs = [ BENCHMARK_RUNS_CHECKBOX_GROUP, BENCHMARK_CYCLES_SLIDER ], outputs = BENCHMARK_RESULTS_DATAFRAME) benchmark_cycles_slider = ui.get_component('benchmark_cycles_slider')
if benchmark_runs_checkbox_group and benchmark_cycles_slider:
BENCHMARK_START_BUTTON.click(start, inputs = [ benchmark_runs_checkbox_group, benchmark_cycles_slider ], outputs = BENCHMARK_RESULTS_DATAFRAME)
BENCHMARK_CLEAR_BUTTON.click(clear, outputs = BENCHMARK_RESULTS_DATAFRAME) BENCHMARK_CLEAR_BUTTON.click(clear, outputs = BENCHMARK_RESULTS_DATAFRAME)
def update_benchmark_runs(benchmark_runs : List[str]) -> Update:
return gradio.update(value = benchmark_runs)
def start(benchmark_runs : List[str], benchmark_cycles : int) -> Generator[List[Any], None, None]: def start(benchmark_runs : List[str], benchmark_cycles : int) -> Generator[List[Any], None, None]:
facefusion.globals.source_path = '.assets/examples/source.jpg' facefusion.globals.source_path = '.assets/examples/source.jpg'
target_paths = [ BENCHMARKS[benchmark_run] for benchmark_run in benchmark_runs if benchmark_run in BENCHMARKS ] target_paths = [ BENCHMARKS[benchmark_run] for benchmark_run in benchmark_runs if benchmark_run in BENCHMARKS ]
benchmark_results = [] benchmark_results = []
if target_paths: if target_paths:
warm_up(BENCHMARKS['240p']) pre_process()
for target_path in target_paths: for target_path in target_paths:
benchmark_results.append(benchmark(target_path, benchmark_cycles)) benchmark_results.append(benchmark(target_path, benchmark_cycles))
yield benchmark_results yield benchmark_results
post_process()
def warm_up(target_path : str) -> None: def pre_process() -> None:
facefusion.globals.target_path = target_path limit_resources()
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, tempfile.gettempdir()) get_face_analyser()
conditional_process() for frame_processor_module in get_frame_processors_modules(facefusion.globals.frame_processors):
frame_processor_module.get_frame_processor()
def post_process() -> None:
clear_faces_cache()
def benchmark(target_path : str, benchmark_cycles : int) -> List[Any]: def benchmark(target_path : str, benchmark_cycles : int) -> List[Any]:
@ -111,7 +104,6 @@ def benchmark(target_path : str, benchmark_cycles : int) -> List[Any]:
facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, tempfile.gettempdir()) facefusion.globals.output_path = normalize_output_path(facefusion.globals.source_path, facefusion.globals.target_path, tempfile.gettempdir())
video_frame_total = count_video_frame_total(facefusion.globals.target_path) video_frame_total = count_video_frame_total(facefusion.globals.target_path)
start_time = time.perf_counter() start_time = time.perf_counter()
limit_resources()
conditional_process() conditional_process()
end_time = time.perf_counter() end_time = time.perf_counter()
process_time = end_time - start_time process_time = end_time - start_time

View File

@ -0,0 +1,38 @@
from typing import Optional, List
import gradio
from facefusion import wording
from facefusion.uis.typing import Update
from facefusion.uis import core as ui
from facefusion.uis.components.benchmark import BENCHMARKS
BENCHMARK_RUNS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None
BENCHMARK_CYCLES_SLIDER : Optional[gradio.Button] = None
def render() -> None:
global BENCHMARK_RUNS_CHECKBOX_GROUP
global BENCHMARK_CYCLES_SLIDER
BENCHMARK_RUNS_CHECKBOX_GROUP = gradio.CheckboxGroup(
label = wording.get('benchmark_runs_checkbox_group_label'),
value = list(BENCHMARKS.keys()),
choices = list(BENCHMARKS.keys())
)
BENCHMARK_CYCLES_SLIDER = gradio.Slider(
label = wording.get('benchmark_cycles_slider_label'),
minimum = 1,
step = 1,
value = 3,
maximum = 10
)
ui.register_component('benchmark_runs_checkbox_group', BENCHMARK_RUNS_CHECKBOX_GROUP)
ui.register_component('benchmark_cycles_slider', BENCHMARK_CYCLES_SLIDER)
def listen() -> None:
BENCHMARK_RUNS_CHECKBOX_GROUP.change(update_benchmark_runs, inputs = BENCHMARK_RUNS_CHECKBOX_GROUP, outputs = BENCHMARK_RUNS_CHECKBOX_GROUP)
def update_benchmark_runs(benchmark_runs : List[str]) -> Update:
return gradio.update(value = benchmark_runs)

View File

@ -10,55 +10,26 @@ from facefusion.uis.typing import Update
from facefusion.utilities import encode_execution_providers, decode_execution_providers from facefusion.utilities import encode_execution_providers, decode_execution_providers
EXECUTION_PROVIDERS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None EXECUTION_PROVIDERS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None
EXECUTION_THREAD_COUNT_SLIDER : Optional[gradio.Slider] = None
EXECUTION_QUEUE_COUNT_SLIDER : Optional[gradio.Slider] = None
def render() -> None: def render() -> None:
global EXECUTION_PROVIDERS_CHECKBOX_GROUP global EXECUTION_PROVIDERS_CHECKBOX_GROUP
global EXECUTION_THREAD_COUNT_SLIDER
global EXECUTION_QUEUE_COUNT_SLIDER
with gradio.Box(): EXECUTION_PROVIDERS_CHECKBOX_GROUP = gradio.CheckboxGroup(
EXECUTION_PROVIDERS_CHECKBOX_GROUP = gradio.CheckboxGroup( label = wording.get('execution_providers_checkbox_group_label'),
label = wording.get('execution_providers_checkbox_group_label'), choices = encode_execution_providers(onnxruntime.get_available_providers()),
choices = encode_execution_providers(onnxruntime.get_available_providers()), value = encode_execution_providers(facefusion.globals.execution_providers)
value = encode_execution_providers(facefusion.globals.execution_providers) )
)
EXECUTION_THREAD_COUNT_SLIDER = gradio.Slider(
label = wording.get('execution_thread_count_slider_label'),
value = facefusion.globals.execution_thread_count,
step = 1,
minimum = 1,
maximum = 128
)
EXECUTION_QUEUE_COUNT_SLIDER = gradio.Slider(
label = wording.get('execution_queue_count_slider_label'),
value = facefusion.globals.execution_queue_count,
step = 1,
minimum = 1,
maximum = 16
)
def listen() -> None: def listen() -> None:
EXECUTION_PROVIDERS_CHECKBOX_GROUP.change(update_execution_providers, inputs = EXECUTION_PROVIDERS_CHECKBOX_GROUP, outputs = EXECUTION_PROVIDERS_CHECKBOX_GROUP) EXECUTION_PROVIDERS_CHECKBOX_GROUP.change(update_execution_providers, inputs = EXECUTION_PROVIDERS_CHECKBOX_GROUP, outputs = EXECUTION_PROVIDERS_CHECKBOX_GROUP)
EXECUTION_THREAD_COUNT_SLIDER.change(update_execution_thread_count, inputs = EXECUTION_THREAD_COUNT_SLIDER, outputs = EXECUTION_THREAD_COUNT_SLIDER)
EXECUTION_QUEUE_COUNT_SLIDER.change(update_execution_queue_count, inputs = EXECUTION_QUEUE_COUNT_SLIDER, outputs = EXECUTION_QUEUE_COUNT_SLIDER)
def update_execution_providers(execution_providers : List[str]) -> Update: def update_execution_providers(execution_providers : List[str]) -> Update:
clear_face_analyser() clear_face_analyser()
clear_frame_processors_modules() clear_frame_processors_modules()
if not execution_providers:
execution_providers = encode_execution_providers(onnxruntime.get_available_providers())
facefusion.globals.execution_providers = decode_execution_providers(execution_providers) facefusion.globals.execution_providers = decode_execution_providers(execution_providers)
return gradio.update(value = execution_providers) return gradio.update(value = execution_providers)
def update_execution_thread_count(execution_thread_count : int = 1) -> Update:
facefusion.globals.execution_thread_count = execution_thread_count
return gradio.update(value = execution_thread_count)
def update_execution_queue_count(execution_queue_count : int = 1) -> Update:
facefusion.globals.execution_queue_count = execution_queue_count
return gradio.update(value = execution_queue_count)

View File

@ -0,0 +1,29 @@
from typing import Optional
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.uis.typing import Update
EXECUTION_QUEUE_COUNT_SLIDER : Optional[gradio.Slider] = None
def render() -> None:
global EXECUTION_QUEUE_COUNT_SLIDER
EXECUTION_QUEUE_COUNT_SLIDER = gradio.Slider(
label = wording.get('execution_queue_count_slider_label'),
value = facefusion.globals.execution_queue_count,
step = 1,
minimum = 1,
maximum = 16
)
def listen() -> None:
EXECUTION_QUEUE_COUNT_SLIDER.change(update_execution_queue_count, inputs = EXECUTION_QUEUE_COUNT_SLIDER, outputs = EXECUTION_QUEUE_COUNT_SLIDER)
def update_execution_queue_count(execution_queue_count : int = 1) -> Update:
facefusion.globals.execution_queue_count = execution_queue_count
return gradio.update(value = execution_queue_count)

View File

@ -0,0 +1,29 @@
from typing import Optional
import gradio
import facefusion.globals
from facefusion import wording
from facefusion.uis.typing import Update
EXECUTION_THREAD_COUNT_SLIDER : Optional[gradio.Slider] = None
def render() -> None:
global EXECUTION_THREAD_COUNT_SLIDER
EXECUTION_THREAD_COUNT_SLIDER = gradio.Slider(
label = wording.get('execution_thread_count_slider_label'),
value = facefusion.globals.execution_thread_count,
step = 1,
minimum = 1,
maximum = 128
)
def listen() -> None:
EXECUTION_THREAD_COUNT_SLIDER.change(update_execution_thread_count, inputs = EXECUTION_THREAD_COUNT_SLIDER, outputs = EXECUTION_THREAD_COUNT_SLIDER)
def update_execution_thread_count(execution_thread_count : int = 1) -> Update:
facefusion.globals.execution_thread_count = execution_thread_count
return gradio.update(value = execution_thread_count)

View File

@ -18,26 +18,24 @@ def render() -> None:
global FACE_ANALYSER_AGE_DROPDOWN global FACE_ANALYSER_AGE_DROPDOWN
global FACE_ANALYSER_GENDER_DROPDOWN global FACE_ANALYSER_GENDER_DROPDOWN
with gradio.Box(): FACE_ANALYSER_DIRECTION_DROPDOWN = gradio.Dropdown(
with gradio.Row(): label = wording.get('face_analyser_direction_dropdown_label'),
FACE_ANALYSER_DIRECTION_DROPDOWN = gradio.Dropdown( choices = facefusion.choices.face_analyser_direction,
label = wording.get('face_analyser_direction_dropdown_label'), value = facefusion.globals.face_analyser_direction
choices = facefusion.choices.face_analyser_direction, )
value = facefusion.globals.face_analyser_direction FACE_ANALYSER_AGE_DROPDOWN = gradio.Dropdown(
) label = wording.get('face_analyser_age_dropdown_label'),
FACE_ANALYSER_AGE_DROPDOWN = gradio.Dropdown( choices = ['none'] + facefusion.choices.face_analyser_age,
label = wording.get('face_analyser_age_dropdown_label'), value = facefusion.globals.face_analyser_age or 'none'
choices = ['none'] + facefusion.choices.face_analyser_age, )
value = facefusion.globals.face_analyser_age or 'none' FACE_ANALYSER_GENDER_DROPDOWN = gradio.Dropdown(
) label = wording.get('face_analyser_gender_dropdown_label'),
FACE_ANALYSER_GENDER_DROPDOWN = gradio.Dropdown( choices = ['none'] + facefusion.choices.face_analyser_gender,
label = wording.get('face_analyser_gender_dropdown_label'), value = facefusion.globals.face_analyser_gender or 'none'
choices = ['none'] + facefusion.choices.face_analyser_gender, )
value = facefusion.globals.face_analyser_gender or 'none' ui.register_component('face_analyser_direction_dropdown', FACE_ANALYSER_DIRECTION_DROPDOWN)
) ui.register_component('face_analyser_age_dropdown', FACE_ANALYSER_AGE_DROPDOWN)
ui.register_component('face_analyser_direction_dropdown', FACE_ANALYSER_DIRECTION_DROPDOWN) ui.register_component('face_analyser_gender_dropdown', FACE_ANALYSER_GENDER_DROPDOWN)
ui.register_component('face_analyser_age_dropdown', FACE_ANALYSER_AGE_DROPDOWN)
ui.register_component('face_analyser_gender_dropdown', FACE_ANALYSER_GENDER_DROPDOWN)
def listen() -> None: def listen() -> None:

View File

@ -1,12 +1,11 @@
from typing import List, Optional, Tuple, Any, Dict from typing import List, Optional, Tuple, Any, Dict
import cv2
import gradio import gradio
import facefusion.choices import facefusion.choices
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.vision import get_video_frame, normalize_frame_color from facefusion.vision import get_video_frame, normalize_frame_color, read_static_image
from facefusion.face_analyser import get_many_faces from facefusion.face_analyser import get_many_faces
from facefusion.face_reference import clear_face_reference from facefusion.face_reference import clear_face_reference
from facefusion.typing import Frame, FaceRecognition from facefusion.typing import Frame, FaceRecognition
@ -24,38 +23,37 @@ def render() -> None:
global REFERENCE_FACE_POSITION_GALLERY global REFERENCE_FACE_POSITION_GALLERY
global REFERENCE_FACE_DISTANCE_SLIDER global REFERENCE_FACE_DISTANCE_SLIDER
with gradio.Box(): reference_face_gallery_args: Dict[str, Any] =\
reference_face_gallery_args: Dict[str, Any] =\ {
{ 'label': wording.get('reference_face_gallery_label'),
'label': wording.get('reference_face_gallery_label'), 'height': 120,
'height': 120, 'object_fit': 'cover',
'object_fit': 'cover', 'columns': 10,
'columns': 10, 'allow_preview': False,
'allow_preview': False, 'visible': 'reference' in facefusion.globals.face_recognition
'visible': 'reference' in facefusion.globals.face_recognition }
} if is_image(facefusion.globals.target_path):
if is_image(facefusion.globals.target_path): reference_frame = read_static_image(facefusion.globals.target_path)
reference_frame = cv2.imread(facefusion.globals.target_path) reference_face_gallery_args['value'] = extract_gallery_frames(reference_frame)
reference_face_gallery_args['value'] = extract_gallery_frames(reference_frame) if is_video(facefusion.globals.target_path):
if is_video(facefusion.globals.target_path): reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number) reference_face_gallery_args['value'] = extract_gallery_frames(reference_frame)
reference_face_gallery_args['value'] = extract_gallery_frames(reference_frame) FACE_RECOGNITION_DROPDOWN = gradio.Dropdown(
FACE_RECOGNITION_DROPDOWN = gradio.Dropdown( label = wording.get('face_recognition_dropdown_label'),
label = wording.get('face_recognition_dropdown_label'), choices = facefusion.choices.face_recognition,
choices = facefusion.choices.face_recognition, value = facefusion.globals.face_recognition
value = facefusion.globals.face_recognition )
) REFERENCE_FACE_POSITION_GALLERY = gradio.Gallery(**reference_face_gallery_args)
REFERENCE_FACE_POSITION_GALLERY = gradio.Gallery(**reference_face_gallery_args) REFERENCE_FACE_DISTANCE_SLIDER = gradio.Slider(
REFERENCE_FACE_DISTANCE_SLIDER = gradio.Slider( label = wording.get('reference_face_distance_slider_label'),
label = wording.get('reference_face_distance_slider_label'), value = facefusion.globals.reference_face_distance,
value = facefusion.globals.reference_face_distance, maximum = 3,
maximum = 3, step = 0.05,
step = 0.05, visible = 'reference' in facefusion.globals.face_recognition
visible = 'reference' in facefusion.globals.face_recognition )
) ui.register_component('face_recognition_dropdown', FACE_RECOGNITION_DROPDOWN)
ui.register_component('face_recognition_dropdown', FACE_RECOGNITION_DROPDOWN) ui.register_component('reference_face_position_gallery', REFERENCE_FACE_POSITION_GALLERY)
ui.register_component('reference_face_position_gallery', REFERENCE_FACE_POSITION_GALLERY) ui.register_component('reference_face_distance_slider', REFERENCE_FACE_DISTANCE_SLIDER)
ui.register_component('reference_face_distance_slider', REFERENCE_FACE_DISTANCE_SLIDER)
def listen() -> None: def listen() -> None:
@ -106,7 +104,7 @@ def update_face_reference_position(reference_face_position : int = 0) -> Update:
gallery_frames = [] gallery_frames = []
facefusion.globals.reference_face_position = reference_face_position facefusion.globals.reference_face_position = reference_face_position
if is_image(facefusion.globals.target_path): if is_image(facefusion.globals.target_path):
reference_frame = cv2.imread(facefusion.globals.target_path) reference_frame = read_static_image(facefusion.globals.target_path)
gallery_frames = extract_gallery_frames(reference_frame) gallery_frames = extract_gallery_frames(reference_frame)
if is_video(facefusion.globals.target_path): if is_video(facefusion.globals.target_path):
reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number) reference_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)

View File

@ -11,13 +11,12 @@ MAX_MEMORY_SLIDER : Optional[gradio.Slider] = None
def render() -> None: def render() -> None:
global MAX_MEMORY_SLIDER global MAX_MEMORY_SLIDER
with gradio.Box(): MAX_MEMORY_SLIDER = gradio.Slider(
MAX_MEMORY_SLIDER = gradio.Slider( label = wording.get('max_memory_slider_label'),
label = wording.get('max_memory_slider_label'), minimum = 0,
minimum = 0, maximum = 128,
maximum = 128, step = 1
step = 1 )
)
def listen() -> None: def listen() -> None:

View File

@ -22,23 +22,25 @@ def render() -> None:
global OUTPUT_START_BUTTON global OUTPUT_START_BUTTON
global OUTPUT_CLEAR_BUTTON global OUTPUT_CLEAR_BUTTON
with gradio.Row(): OUTPUT_IMAGE = gradio.Image(
with gradio.Box(): label = wording.get('output_image_or_video_label'),
OUTPUT_IMAGE = gradio.Image( visible = False
label = wording.get('output_image_or_video_label'), )
visible = False OUTPUT_VIDEO = gradio.Video(
) label = wording.get('output_image_or_video_label')
OUTPUT_VIDEO = gradio.Video( )
label = wording.get('output_image_or_video_label') OUTPUT_PATH_TEXTBOX = gradio.Textbox(
) label = wording.get('output_path_textbox_label'),
OUTPUT_PATH_TEXTBOX = gradio.Textbox( value = facefusion.globals.output_path or tempfile.gettempdir(),
label = wording.get('output_path_textbox_label'), max_lines = 1
value = facefusion.globals.output_path or tempfile.gettempdir(), )
max_lines = 1 OUTPUT_START_BUTTON = gradio.Button(
) value = wording.get('start_button_label'),
with gradio.Row(): variant = 'primary'
OUTPUT_START_BUTTON = gradio.Button(wording.get('start_button_label')) )
OUTPUT_CLEAR_BUTTON = gradio.Button(wording.get('clear_button_label')) OUTPUT_CLEAR_BUTTON = gradio.Button(
value = wording.get('clear_button_label'),
)
def listen() -> None: def listen() -> None:

View File

@ -19,25 +19,24 @@ def render() -> None:
global OUTPUT_VIDEO_ENCODER_DROPDOWN global OUTPUT_VIDEO_ENCODER_DROPDOWN
global OUTPUT_VIDEO_QUALITY_SLIDER global OUTPUT_VIDEO_QUALITY_SLIDER
with gradio.Box(): OUTPUT_IMAGE_QUALITY_SLIDER = gradio.Slider(
OUTPUT_IMAGE_QUALITY_SLIDER = gradio.Slider( label = wording.get('output_image_quality_slider_label'),
label = wording.get('output_image_quality_slider_label'), value = facefusion.globals.output_image_quality,
value = facefusion.globals.output_image_quality, step = 1,
step = 1, visible = is_image(facefusion.globals.target_path)
visible = is_image(facefusion.globals.target_path) )
) OUTPUT_VIDEO_ENCODER_DROPDOWN = gradio.Dropdown(
OUTPUT_VIDEO_ENCODER_DROPDOWN = gradio.Dropdown( label = wording.get('output_video_encoder_dropdown_label'),
label = wording.get('output_video_encoder_dropdown_label'), choices = facefusion.choices.output_video_encoder,
choices = facefusion.choices.output_video_encoder, value = facefusion.globals.output_video_encoder,
value = facefusion.globals.output_video_encoder, visible = is_video(facefusion.globals.target_path)
visible = is_video(facefusion.globals.target_path) )
) OUTPUT_VIDEO_QUALITY_SLIDER = gradio.Slider(
OUTPUT_VIDEO_QUALITY_SLIDER = gradio.Slider( label = wording.get('output_video_quality_slider_label'),
label = wording.get('output_video_quality_slider_label'), value = facefusion.globals.output_video_quality,
value = facefusion.globals.output_video_quality, step = 1,
step = 1, visible = is_video(facefusion.globals.target_path)
visible = is_video(facefusion.globals.target_path) )
)
def listen() -> None: def listen() -> None:

View File

@ -4,12 +4,12 @@ import gradio
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.vision import get_video_frame, count_video_frame_total, normalize_frame_color, resize_frame_dimension from facefusion.vision import get_video_frame, count_video_frame_total, normalize_frame_color, resize_frame_dimension, read_static_image
from facefusion.face_analyser import get_one_face from facefusion.face_analyser import get_one_face
from facefusion.face_reference import get_face_reference, set_face_reference from facefusion.face_reference import get_face_reference, set_face_reference
from facefusion.predictor import predict_frame from facefusion.predictor import predict_frame
from facefusion.processors.frame.core import load_frame_processor_module from facefusion.processors.frame.core import load_frame_processor_module
from facefusion.typing import Frame from facefusion.typing import Frame, Face
from facefusion.uis import core as ui from facefusion.uis import core as ui
from facefusion.uis.typing import ComponentName, Update from facefusion.uis.typing import ComponentName, Update
from facefusion.utilities import is_video, is_image from facefusion.utilities import is_video, is_image
@ -22,32 +22,34 @@ def render() -> None:
global PREVIEW_IMAGE global PREVIEW_IMAGE
global PREVIEW_FRAME_SLIDER global PREVIEW_FRAME_SLIDER
with gradio.Box(): preview_image_args: Dict[str, Any] =\
preview_image_args: Dict[str, Any] =\ {
{ 'label': wording.get('preview_image_label')
'label': wording.get('preview_image_label') }
} preview_frame_slider_args: Dict[str, Any] =\
preview_frame_slider_args: Dict[str, Any] =\ {
{ 'label': wording.get('preview_frame_slider_label'),
'label': wording.get('preview_frame_slider_label'), 'step': 1,
'step': 1, 'visible': False
'visible': False }
} conditional_set_face_reference()
if is_image(facefusion.globals.target_path): source_face = get_one_face(read_static_image(facefusion.globals.source_path))
target_frame = cv2.imread(facefusion.globals.target_path) reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None
preview_frame = process_preview_frame(target_frame) if is_image(facefusion.globals.target_path):
preview_image_args['value'] = normalize_frame_color(preview_frame) target_frame = read_static_image(facefusion.globals.target_path)
if is_video(facefusion.globals.target_path): preview_frame = process_preview_frame(source_face, reference_face, target_frame)
temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number) preview_image_args['value'] = normalize_frame_color(preview_frame)
preview_frame = process_preview_frame(temp_frame) if is_video(facefusion.globals.target_path):
preview_image_args['value'] = normalize_frame_color(preview_frame) temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
preview_image_args['visible'] = True preview_frame = process_preview_frame(source_face, reference_face, temp_frame)
preview_frame_slider_args['value'] = facefusion.globals.reference_frame_number preview_image_args['value'] = normalize_frame_color(preview_frame)
preview_frame_slider_args['maximum'] = count_video_frame_total(facefusion.globals.target_path) preview_image_args['visible'] = True
preview_frame_slider_args['visible'] = True preview_frame_slider_args['value'] = facefusion.globals.reference_frame_number
PREVIEW_IMAGE = gradio.Image(**preview_image_args) preview_frame_slider_args['maximum'] = count_video_frame_total(facefusion.globals.target_path)
PREVIEW_FRAME_SLIDER = gradio.Slider(**preview_frame_slider_args) preview_frame_slider_args['visible'] = True
ui.register_component('preview_frame_slider', PREVIEW_FRAME_SLIDER) PREVIEW_IMAGE = gradio.Image(**preview_image_args)
PREVIEW_FRAME_SLIDER = gradio.Slider(**preview_frame_slider_args)
ui.register_component('preview_frame_slider', PREVIEW_FRAME_SLIDER)
def listen() -> None: def listen() -> None:
@ -90,17 +92,18 @@ def listen() -> None:
def update_preview_image(frame_number : int = 0) -> Update: def update_preview_image(frame_number : int = 0) -> Update:
conditional_set_face_reference()
source_face = get_one_face(read_static_image(facefusion.globals.source_path))
reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None
if is_image(facefusion.globals.target_path): if is_image(facefusion.globals.target_path):
conditional_set_face_reference() target_frame = read_static_image(facefusion.globals.target_path)
target_frame = cv2.imread(facefusion.globals.target_path) preview_frame = process_preview_frame(source_face, reference_face, target_frame)
preview_frame = process_preview_frame(target_frame)
preview_frame = normalize_frame_color(preview_frame) preview_frame = normalize_frame_color(preview_frame)
return gradio.update(value = preview_frame) return gradio.update(value = preview_frame)
if is_video(facefusion.globals.target_path): if is_video(facefusion.globals.target_path):
conditional_set_face_reference()
facefusion.globals.reference_frame_number = frame_number facefusion.globals.reference_frame_number = frame_number
temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number) temp_frame = get_video_frame(facefusion.globals.target_path, facefusion.globals.reference_frame_number)
preview_frame = process_preview_frame(temp_frame) preview_frame = process_preview_frame(source_face, reference_face, temp_frame)
preview_frame = normalize_frame_color(preview_frame) preview_frame = normalize_frame_color(preview_frame)
return gradio.update(value = preview_frame) return gradio.update(value = preview_frame)
return gradio.update(value = None) return gradio.update(value = None)
@ -116,11 +119,9 @@ def update_preview_frame_slider(frame_number : int = 0) -> Update:
return gradio.update(value = None, maximum = None, visible = False) return gradio.update(value = None, maximum = None, visible = False)
def process_preview_frame(temp_frame : Frame) -> Frame: def process_preview_frame(source_face : Face, reference_face : Face, temp_frame : Frame) -> Frame:
if predict_frame(temp_frame): if predict_frame(temp_frame):
return cv2.GaussianBlur(temp_frame, (99, 99), 0) return cv2.GaussianBlur(temp_frame, (99, 99), 0)
source_face = get_one_face(cv2.imread(facefusion.globals.source_path)) if facefusion.globals.source_path else None
reference_face = get_face_reference() if 'reference' in facefusion.globals.face_recognition else None
temp_frame = resize_frame_dimension(temp_frame, 480) temp_frame = resize_frame_dimension(temp_frame, 480)
for frame_processor in facefusion.globals.frame_processors: for frame_processor in facefusion.globals.frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor) frame_processor_module = load_frame_processor_module(frame_processor)

View File

@ -14,13 +14,12 @@ FRAME_PROCESSORS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None
def render() -> None: def render() -> None:
global FRAME_PROCESSORS_CHECKBOX_GROUP global FRAME_PROCESSORS_CHECKBOX_GROUP
with gradio.Box(): FRAME_PROCESSORS_CHECKBOX_GROUP = gradio.CheckboxGroup(
FRAME_PROCESSORS_CHECKBOX_GROUP = gradio.CheckboxGroup( label = wording.get('frame_processors_checkbox_group_label'),
label = wording.get('frame_processors_checkbox_group_label'), choices = sort_frame_processors(facefusion.globals.frame_processors),
choices = sort_frame_processors(facefusion.globals.frame_processors), value = facefusion.globals.frame_processors
value = facefusion.globals.frame_processors )
) ui.register_component('frame_processors_checkbox_group', FRAME_PROCESSORS_CHECKBOX_GROUP)
ui.register_component('frame_processors_checkbox_group', FRAME_PROCESSORS_CHECKBOX_GROUP)
def listen() -> None: def listen() -> None:

View File

@ -1,41 +1,37 @@
from typing import Optional from typing import Optional, List
import gradio import gradio
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.uis import choices
from facefusion.uis.typing import Update from facefusion.uis.typing import Update
KEEP_FPS_CHECKBOX : Optional[gradio.Checkbox] = None SETTINGS_CHECKBOX_GROUP : Optional[gradio.Checkboxgroup] = None
KEEP_TEMP_CHECKBOX : Optional[gradio.Checkbox] = None
SKIP_AUDIO_CHECKBOX : Optional[gradio.Checkbox] = None
def render() -> None: def render() -> None:
global KEEP_FPS_CHECKBOX global SETTINGS_CHECKBOX_GROUP
global KEEP_TEMP_CHECKBOX
global SKIP_AUDIO_CHECKBOX
with gradio.Box(): value = []
KEEP_FPS_CHECKBOX = gradio.Checkbox( if facefusion.globals.keep_fps:
label = wording.get('keep_fps_checkbox_label'), value.append('keep-fps')
value = facefusion.globals.keep_fps if facefusion.globals.keep_temp:
) value.append('keep-temp')
KEEP_TEMP_CHECKBOX = gradio.Checkbox( if facefusion.globals.skip_audio:
label = wording.get('keep_temp_checkbox_label'), value.append('skip-audio')
value = facefusion.globals.keep_temp SETTINGS_CHECKBOX_GROUP = gradio.Checkboxgroup(
) label = wording.get('settings_checkbox_group_label'),
SKIP_AUDIO_CHECKBOX = gradio.Checkbox( choices = choices.settings,
label = wording.get('skip_audio_checkbox_label'), value = value
value = facefusion.globals.skip_audio )
)
def listen() -> None: def listen() -> None:
KEEP_FPS_CHECKBOX.change(lambda value: update_checkbox('keep_fps', value), inputs = KEEP_FPS_CHECKBOX, outputs = KEEP_FPS_CHECKBOX) SETTINGS_CHECKBOX_GROUP.change(update, inputs = SETTINGS_CHECKBOX_GROUP, outputs = SETTINGS_CHECKBOX_GROUP)
KEEP_TEMP_CHECKBOX.change(lambda value: update_checkbox('keep_temp', value), inputs = KEEP_TEMP_CHECKBOX, outputs = KEEP_TEMP_CHECKBOX)
SKIP_AUDIO_CHECKBOX.change(lambda value: update_checkbox('skip_audio', value), inputs = SKIP_AUDIO_CHECKBOX, outputs = SKIP_AUDIO_CHECKBOX)
def update_checkbox(name : str, value: bool) -> Update: def update(settings : List[str]) -> Update:
setattr(facefusion.globals, name, value) facefusion.globals.keep_fps = 'keep-fps' in settings
return gradio.update(value = value) facefusion.globals.keep_temp = 'keep-temp' in settings
facefusion.globals.skip_audio = 'skip-audio' in settings
return gradio.update(value = settings)

View File

@ -15,25 +15,24 @@ def render() -> None:
global SOURCE_FILE global SOURCE_FILE
global SOURCE_IMAGE global SOURCE_IMAGE
with gradio.Box(): is_source_image = is_image(facefusion.globals.source_path)
is_source_image = is_image(facefusion.globals.source_path) SOURCE_FILE = gradio.File(
SOURCE_FILE = gradio.File( file_count = 'single',
file_count = 'single', file_types =
file_types = [
[ '.png',
'.png', '.jpg',
'.jpg', '.webp'
'.webp' ],
], label = wording.get('source_file_label'),
label = wording.get('source_file_label'), value = facefusion.globals.source_path if is_source_image else None
value = facefusion.globals.source_path if is_source_image else None )
) SOURCE_IMAGE = gradio.Image(
SOURCE_IMAGE = gradio.Image( value = SOURCE_FILE.value['name'] if is_source_image else None,
value = SOURCE_FILE.value['name'] if is_source_image else None, visible = is_source_image,
visible = is_source_image, show_label = False
show_label = False )
) ui.register_component('source_image', SOURCE_IMAGE)
ui.register_component('source_image', SOURCE_IMAGE)
def listen() -> None: def listen() -> None:

View File

@ -18,33 +18,32 @@ def render() -> None:
global TARGET_IMAGE global TARGET_IMAGE
global TARGET_VIDEO global TARGET_VIDEO
with gradio.Box(): is_target_image = is_image(facefusion.globals.target_path)
is_target_image = is_image(facefusion.globals.target_path) is_target_video = is_video(facefusion.globals.target_path)
is_target_video = is_video(facefusion.globals.target_path) TARGET_FILE = gradio.File(
TARGET_FILE = gradio.File( label = wording.get('target_file_label'),
label = wording.get('target_file_label'), file_count = 'single',
file_count = 'single', file_types =
file_types = [
[ '.png',
'.png', '.jpg',
'.jpg', '.webp',
'.webp', '.mp4'
'.mp4' ],
], value = facefusion.globals.target_path if is_target_image or is_target_video else None
value = facefusion.globals.target_path if is_target_image or is_target_video else None )
) TARGET_IMAGE = gradio.Image(
TARGET_IMAGE = gradio.Image( value = TARGET_FILE.value['name'] if is_target_image else None,
value = TARGET_FILE.value['name'] if is_target_image else None, visible = is_target_image,
visible = is_target_image, show_label = False
show_label = False )
) TARGET_VIDEO = gradio.Video(
TARGET_VIDEO = gradio.Video( value = TARGET_FILE.value['name'] if is_target_video else None,
value = TARGET_FILE.value['name'] if is_target_video else None, visible = is_target_video,
visible = is_target_video, show_label = False
show_label = False )
) ui.register_component('target_image', TARGET_IMAGE)
ui.register_component('target_image', TARGET_IMAGE) ui.register_component('target_video', TARGET_VIDEO)
ui.register_component('target_video', TARGET_VIDEO)
def listen() -> None: def listen() -> None:

View File

@ -17,19 +17,18 @@ def render() -> None:
global TEMP_FRAME_FORMAT_DROPDOWN global TEMP_FRAME_FORMAT_DROPDOWN
global TEMP_FRAME_QUALITY_SLIDER global TEMP_FRAME_QUALITY_SLIDER
with gradio.Box(): TEMP_FRAME_FORMAT_DROPDOWN = gradio.Dropdown(
TEMP_FRAME_FORMAT_DROPDOWN = gradio.Dropdown( label = wording.get('temp_frame_format_dropdown_label'),
label = wording.get('temp_frame_format_dropdown_label'), choices = facefusion.choices.temp_frame_format,
choices = facefusion.choices.temp_frame_format, value = facefusion.globals.temp_frame_format,
value = facefusion.globals.temp_frame_format, visible = is_video(facefusion.globals.target_path)
visible = is_video(facefusion.globals.target_path) )
) TEMP_FRAME_QUALITY_SLIDER = gradio.Slider(
TEMP_FRAME_QUALITY_SLIDER = gradio.Slider( label = wording.get('temp_frame_quality_slider_label'),
label = wording.get('temp_frame_quality_slider_label'), value = facefusion.globals.temp_frame_quality,
value = facefusion.globals.temp_frame_quality, step = 1,
step = 1, visible = is_video(facefusion.globals.target_path)
visible = is_video(facefusion.globals.target_path) )
)
def listen() -> None: def listen() -> None:

View File

@ -16,30 +16,28 @@ def render() -> None:
global TRIM_FRAME_START_SLIDER global TRIM_FRAME_START_SLIDER
global TRIM_FRAME_END_SLIDER global TRIM_FRAME_END_SLIDER
with gradio.Box(): trim_frame_start_slider_args : Dict[str, Any] =\
trim_frame_start_slider_args : Dict[str, Any] =\ {
{ 'label': wording.get('trim_frame_start_slider_label'),
'label': wording.get('trim_frame_start_slider_label'), 'step': 1,
'step': 1, 'visible': False
'visible': False }
} trim_frame_end_slider_args : Dict[str, Any] =\
trim_frame_end_slider_args : Dict[str, Any] =\ {
{ 'label': wording.get('trim_frame_end_slider_label'),
'label': wording.get('trim_frame_end_slider_label'), 'step': 1,
'step': 1, 'visible': False
'visible': False }
} if is_video(facefusion.globals.target_path):
if is_video(facefusion.globals.target_path): video_frame_total = count_video_frame_total(facefusion.globals.target_path)
video_frame_total = count_video_frame_total(facefusion.globals.target_path) trim_frame_start_slider_args['value'] = facefusion.globals.trim_frame_start or 0
trim_frame_start_slider_args['value'] = facefusion.globals.trim_frame_start or 0 trim_frame_start_slider_args['maximum'] = video_frame_total
trim_frame_start_slider_args['maximum'] = video_frame_total trim_frame_start_slider_args['visible'] = True
trim_frame_start_slider_args['visible'] = True trim_frame_end_slider_args['value'] = facefusion.globals.trim_frame_end or video_frame_total
trim_frame_end_slider_args['value'] = facefusion.globals.trim_frame_end or video_frame_total trim_frame_end_slider_args['maximum'] = video_frame_total
trim_frame_end_slider_args['maximum'] = video_frame_total trim_frame_end_slider_args['visible'] = True
trim_frame_end_slider_args['visible'] = True TRIM_FRAME_START_SLIDER = gradio.Slider(**trim_frame_start_slider_args)
with gradio.Row(): TRIM_FRAME_END_SLIDER = gradio.Slider(**trim_frame_end_slider_args)
TRIM_FRAME_START_SLIDER = gradio.Slider(**trim_frame_start_slider_args)
TRIM_FRAME_END_SLIDER = gradio.Slider(**trim_frame_end_slider_args)
def listen() -> None: def listen() -> None:

View File

@ -1,87 +1,114 @@
from typing import Optional, Generator from typing import Optional, Generator, Deque
from concurrent.futures import ThreadPoolExecutor
from collections import deque
import os import os
import platform
import subprocess import subprocess
import cv2 import cv2
import gradio import gradio
from tqdm import tqdm
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.typing import Frame from facefusion.typing import Frame, Face
from facefusion.face_analyser import get_one_face from facefusion.face_analyser import get_one_face
from facefusion.processors.frame.core import load_frame_processor_module from facefusion.processors.frame.core import load_frame_processor_module
from facefusion.uis import core as ui
from facefusion.uis.typing import StreamMode, WebcamMode, Update from facefusion.uis.typing import StreamMode, WebcamMode, Update
from facefusion.utilities import open_ffmpeg from facefusion.utilities import open_ffmpeg
from facefusion.vision import normalize_frame_color from facefusion.vision import normalize_frame_color, read_static_image
WEBCAM_IMAGE : Optional[gradio.Image] = None WEBCAM_IMAGE : Optional[gradio.Image] = None
WEBCAM_MODE_RADIO : Optional[gradio.Radio] = None
WEBCAM_START_BUTTON : Optional[gradio.Button] = None WEBCAM_START_BUTTON : Optional[gradio.Button] = None
WEBCAM_STOP_BUTTON : Optional[gradio.Button] = None WEBCAM_STOP_BUTTON : Optional[gradio.Button] = None
def render() -> None: def render() -> None:
global WEBCAM_IMAGE global WEBCAM_IMAGE
global WEBCAM_MODE_RADIO
global WEBCAM_START_BUTTON global WEBCAM_START_BUTTON
global WEBCAM_STOP_BUTTON global WEBCAM_STOP_BUTTON
WEBCAM_IMAGE = gradio.Image( WEBCAM_IMAGE = gradio.Image(
label = wording.get('webcam_image_label') label = wording.get('webcam_image_label')
) )
WEBCAM_MODE_RADIO = gradio.Radio( WEBCAM_START_BUTTON = gradio.Button(
label = wording.get('webcam_mode_radio_label'), value = wording.get('start_button_label'),
choices = [ 'inline', 'stream_udp', 'stream_v4l2' ], variant = 'primary'
value = 'inline' )
WEBCAM_STOP_BUTTON = gradio.Button(
value = wording.get('stop_button_label')
) )
WEBCAM_START_BUTTON = gradio.Button(wording.get('start_button_label'))
WEBCAM_STOP_BUTTON = gradio.Button(wording.get('stop_button_label'))
def listen() -> None: def listen() -> None:
start_event = WEBCAM_START_BUTTON.click(start, inputs = WEBCAM_MODE_RADIO, outputs = WEBCAM_IMAGE) start_event = None
WEBCAM_MODE_RADIO.change(update, outputs = WEBCAM_IMAGE, cancels = start_event) webcam_mode_radio = ui.get_component('webcam_mode_radio')
WEBCAM_STOP_BUTTON.click(None, cancels = start_event) webcam_resolution_dropdown = ui.get_component('webcam_resolution_dropdown')
webcam_fps_slider = ui.get_component('webcam_fps_slider')
if webcam_mode_radio and webcam_resolution_dropdown and webcam_fps_slider:
start_event = WEBCAM_START_BUTTON.click(start, inputs = [ webcam_mode_radio, webcam_resolution_dropdown, webcam_fps_slider ], outputs = WEBCAM_IMAGE)
webcam_mode_radio.change(stop, outputs = WEBCAM_IMAGE, cancels = start_event)
webcam_resolution_dropdown.change(stop, outputs = WEBCAM_IMAGE, cancels = start_event)
webcam_fps_slider.change(stop, outputs = WEBCAM_IMAGE, cancels = start_event)
WEBCAM_STOP_BUTTON.click(stop, cancels = start_event)
source_image = ui.get_component('source_image')
if source_image:
for method in [ 'upload', 'change', 'clear' ]:
getattr(source_image, method)(stop, cancels = start_event)
def update() -> Update: def start(mode: WebcamMode, resolution: str, fps: float) -> Generator[Frame, None, None]:
facefusion.globals.face_recognition = 'many'
source_face = get_one_face(read_static_image(facefusion.globals.source_path))
stream = None
if mode == 'stream_udp':
stream = open_stream('udp', resolution, fps)
if mode == 'stream_v4l2':
stream = open_stream('v4l2', resolution, fps)
capture = capture_webcam(resolution, fps)
if capture.isOpened():
for capture_frame in multi_process_capture(source_face, capture):
if stream is not None:
stream.stdin.write(capture_frame.tobytes())
yield normalize_frame_color(capture_frame)
def multi_process_capture(source_face: Face, capture : cv2.VideoCapture) -> Generator[Frame, None, None]:
progress = tqdm(desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True)
with ThreadPoolExecutor(max_workers = facefusion.globals.execution_thread_count) as executor:
futures = []
deque_capture_frames : Deque[Frame] = deque()
while True:
_, capture_frame = capture.read()
future = executor.submit(process_stream_frame, source_face, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
capture_frame = future_done.result()
deque_capture_frames.append(capture_frame)
futures.remove(future_done)
while deque_capture_frames:
yield deque_capture_frames.popleft()
progress.update()
def stop() -> Update:
return gradio.update(value = None) return gradio.update(value = None)
def start(webcam_mode : WebcamMode) -> Generator[Frame, None, None]: def capture_webcam(resolution : str, fps : float) -> cv2.VideoCapture:
if webcam_mode == 'inline': width, height = resolution.split('x')
yield from start_inline() if platform.system().lower() == 'windows':
if webcam_mode == 'stream_udp': capture = cv2.VideoCapture(0, cv2.CAP_DSHOW)
yield from start_stream('udp') else:
if webcam_mode == 'stream_v4l2': capture = cv2.VideoCapture(0)
yield from start_stream('v4l2') capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) # type: ignore[attr-defined]
capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(width))
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(height))
capture.set(cv2.CAP_PROP_FPS, fps)
return capture
def start_inline() -> Generator[Frame, None, None]: def process_stream_frame(source_face : Face, temp_frame : Frame) -> Frame:
facefusion.globals.face_recognition = 'many'
capture = cv2.VideoCapture(0)
if capture.isOpened():
while True:
_, temp_frame = capture.read()
temp_frame = process_stream_frame(temp_frame)
if temp_frame is not None:
yield normalize_frame_color(temp_frame)
def start_stream(mode : StreamMode) -> Generator[None, None, None]:
facefusion.globals.face_recognition = 'many'
capture = cv2.VideoCapture(0)
ffmpeg_process = open_stream(mode)
if capture.isOpened():
while True:
_, frame = capture.read()
temp_frame = process_stream_frame(frame)
if temp_frame is not None:
ffmpeg_process.stdin.write(temp_frame.tobytes())
yield normalize_frame_color(temp_frame)
def process_stream_frame(temp_frame : Frame) -> Frame:
source_face = get_one_face(cv2.imread(facefusion.globals.source_path)) if facefusion.globals.source_path else None
for frame_processor in facefusion.globals.frame_processors: for frame_processor in facefusion.globals.frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor) frame_processor_module = load_frame_processor_module(frame_processor)
if frame_processor_module.pre_process('stream'): if frame_processor_module.pre_process('stream'):
@ -93,8 +120,8 @@ def process_stream_frame(temp_frame : Frame) -> Frame:
return temp_frame return temp_frame
def open_stream(mode : StreamMode) -> subprocess.Popen[bytes]: def open_stream(mode : StreamMode, resolution : str, fps : float) -> subprocess.Popen[bytes]:
commands = [ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', '640x480', '-r', '30', '-i', '-' ] commands = [ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', resolution, '-r', str(fps), '-i', '-' ]
if mode == 'udp': if mode == 'udp':
commands.extend([ '-b:v', '2000k', '-f', 'mpegts', 'udp://localhost:27000?pkt_size=1316' ]) commands.extend([ '-b:v', '2000k', '-f', 'mpegts', 'udp://localhost:27000?pkt_size=1316' ])
if mode == 'v4l2': if mode == 'v4l2':

View File

@ -0,0 +1,42 @@
from typing import Optional
import gradio
from facefusion import wording
from facefusion.uis import choices
from facefusion.uis import core as ui
from facefusion.uis.typing import Update
WEBCAM_MODE_RADIO : Optional[gradio.Radio] = None
WEBCAM_RESOLUTION_DROPDOWN : Optional[gradio.Dropdown] = None
WEBCAM_FPS_SLIDER : Optional[gradio.Slider] = None
def render() -> None:
global WEBCAM_MODE_RADIO
global WEBCAM_RESOLUTION_DROPDOWN
global WEBCAM_FPS_SLIDER
WEBCAM_MODE_RADIO = gradio.Radio(
label = wording.get('webcam_mode_radio_label'),
choices = choices.webcam_mode,
value = 'inline'
)
WEBCAM_RESOLUTION_DROPDOWN = gradio.Dropdown(
label = wording.get('webcam_resolution_dropdown'),
choices = choices.webcam_resolution,
value = choices.webcam_resolution[0]
)
WEBCAM_FPS_SLIDER = gradio.Slider(
label = wording.get('webcam_fps_slider'),
minimum = 1,
maximum = 60,
step = 1,
value = 25
)
ui.register_component('webcam_mode_radio', WEBCAM_MODE_RADIO)
ui.register_component('webcam_resolution_dropdown', WEBCAM_RESOLUTION_DROPDOWN)
ui.register_component('webcam_fps_slider', WEBCAM_FPS_SLIDER)
def update() -> Update:
return gradio.update(value = None)

View File

@ -1,6 +1,6 @@
import gradio import gradio
from facefusion.uis.components import about, processors, execution, limit_resources, benchmark from facefusion.uis.components import about, processors, execution, execution_thread_count, execution_queue_count, limit_resources, benchmark_settings, benchmark
from facefusion.utilities import conditional_download from facefusion.utilities import conditional_download
@ -27,19 +27,31 @@ def render() -> gradio.Blocks:
with gradio.Blocks() as layout: with gradio.Blocks() as layout:
with gradio.Row(): with gradio.Row():
with gradio.Column(scale = 2): with gradio.Column(scale = 2):
about.render() with gradio.Box():
processors.render() about.render()
execution.render() with gradio.Blocks():
limit_resources.render() processors.render()
with gradio.Blocks():
execution.render()
execution_thread_count.render()
execution_queue_count.render()
with gradio.Blocks():
limit_resources.render()
with gradio.Blocks():
benchmark_settings.render()
with gradio.Column(scale= 5): with gradio.Column(scale= 5):
benchmark.render() with gradio.Blocks():
benchmark.render()
return layout return layout
def listen() -> None: def listen() -> None:
processors.listen() processors.listen()
execution.listen() execution.listen()
execution_thread_count.listen()
execution_queue_count.listen()
limit_resources.listen() limit_resources.listen()
benchmark_settings.listen()
benchmark.listen() benchmark.listen()

View File

@ -1,6 +1,6 @@
import gradio import gradio
from facefusion.uis.components import about, processors, execution, limit_resources, temp_frame, output_settings, settings, source, target, preview, trim_frame, face_analyser, face_selector, output from facefusion.uis.components import about, processors, execution, execution_thread_count, execution_queue_count, limit_resources, temp_frame, output_settings, settings, source, target, preview, trim_frame, face_analyser, face_selector, output
def pre_check() -> bool: def pre_check() -> bool:
@ -15,28 +15,46 @@ def render() -> gradio.Blocks:
with gradio.Blocks() as layout: with gradio.Blocks() as layout:
with gradio.Row(): with gradio.Row():
with gradio.Column(scale = 2): with gradio.Column(scale = 2):
about.render() with gradio.Box():
processors.render() about.render()
execution.render() with gradio.Blocks():
limit_resources.render() processors.render()
temp_frame.render() with gradio.Blocks():
output_settings.render() execution.render()
settings.render() execution_thread_count.render()
execution_queue_count.render()
with gradio.Blocks():
limit_resources.render()
with gradio.Blocks():
temp_frame.render()
with gradio.Blocks():
output_settings.render()
with gradio.Blocks():
settings.render()
with gradio.Column(scale = 2): with gradio.Column(scale = 2):
source.render() with gradio.Blocks():
target.render() source.render()
output.render() with gradio.Blocks():
target.render()
with gradio.Blocks():
output.render()
with gradio.Column(scale = 3): with gradio.Column(scale = 3):
preview.render() with gradio.Blocks():
trim_frame.render() preview.render()
face_selector.render() with gradio.Row():
face_analyser.render() trim_frame.render()
with gradio.Blocks():
face_selector.render()
with gradio.Row():
face_analyser.render()
return layout return layout
def listen() -> None: def listen() -> None:
processors.listen() processors.listen()
execution.listen() execution.listen()
execution_thread_count.listen()
execution_queue_count.listen()
limit_resources.listen() limit_resources.listen()
temp_frame.listen() temp_frame.listen()
output_settings.listen() output_settings.listen()

View File

@ -1,6 +1,6 @@
import gradio import gradio
from facefusion.uis.components import about, processors, execution, limit_resources, source, webcam from facefusion.uis.components import about, processors, execution, execution_thread_count, webcam_settings, source, webcam
def pre_check() -> bool: def pre_check() -> bool:
@ -15,20 +15,27 @@ def render() -> gradio.Blocks:
with gradio.Blocks() as layout: with gradio.Blocks() as layout:
with gradio.Row(): with gradio.Row():
with gradio.Column(scale = 2): with gradio.Column(scale = 2):
about.render() with gradio.Box():
processors.render() about.render()
execution.render() with gradio.Blocks():
limit_resources.render() processors.render()
source.render() with gradio.Blocks():
execution.render()
execution_thread_count.render()
with gradio.Blocks():
webcam_settings.render()
with gradio.Blocks():
source.render()
with gradio.Column(scale = 5): with gradio.Column(scale = 5):
webcam.render() with gradio.Blocks():
webcam.render()
return layout return layout
def listen() -> None: def listen() -> None:
processors.listen() processors.listen()
execution.listen() execution.listen()
limit_resources.listen() execution_thread_count.listen()
source.listen() source.listen()
webcam.listen() webcam.listen()

View File

@ -14,8 +14,13 @@ ComponentName = Literal\
'face_analyser_direction_dropdown', 'face_analyser_direction_dropdown',
'face_analyser_age_dropdown', 'face_analyser_age_dropdown',
'face_analyser_gender_dropdown', 'face_analyser_gender_dropdown',
'frame_processors_checkbox_group' 'frame_processors_checkbox_group',
'benchmark_runs_checkbox_group',
'benchmark_cycles_slider',
'webcam_mode_radio',
'webcam_resolution_dropdown',
'webcam_fps_slider'
] ]
WebcamMode = Literal[ 'inline', 'stream_udp', 'stream_v4l2' ] WebcamMode = Literal[ 'inline', 'stream_udp', 'stream_v4l2' ]
StreamMode = Literal['udp', 'v4l2'] StreamMode = Literal[ 'udp', 'v4l2' ]
Update = Dict[Any, Any] Update = Dict[Any, Any]

View File

@ -1,4 +1,3 @@
import json
from typing import List, Optional from typing import List, Optional
from pathlib import Path from pathlib import Path
from tqdm import tqdm from tqdm import tqdm
@ -15,9 +14,10 @@ import onnxruntime
import facefusion.globals import facefusion.globals
from facefusion import wording from facefusion import wording
from facefusion.vision import detect_fps
TEMP_DIRECTORY_PATH = os.path.join(tempfile.gettempdir(), 'facefusion') TEMP_DIRECTORY_PATH = os.path.join(tempfile.gettempdir(), 'facefusion')
TEMP_OUTPUT_NAME = 'temp.mp4' TEMP_OUTPUT_VIDEO_NAME = 'temp.mp4'
# monkey patch ssl # monkey patch ssl
if platform.system().lower() == 'darwin': if platform.system().lower() == 'darwin':
@ -40,24 +40,11 @@ def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]:
return subprocess.Popen(commands, stdin = subprocess.PIPE) return subprocess.Popen(commands, stdin = subprocess.PIPE)
def detect_fps(target_path : str) -> Optional[float]:
commands = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'json', target_path ]
output = subprocess.check_output(commands).decode().strip()
try:
entries = json.loads(output)
for stream in entries.get('streams'):
numerator, denominator = map(int, stream.get('r_frame_rate').split('/'))
return numerator / denominator
return None
except (ValueError, ZeroDivisionError):
return None
def extract_frames(target_path : str, fps : float) -> bool: def extract_frames(target_path : str, fps : float) -> bool:
temp_directory_path = get_temp_directory_path(target_path)
temp_frame_compression = round(31 - (facefusion.globals.temp_frame_quality * 0.31)) temp_frame_compression = round(31 - (facefusion.globals.temp_frame_quality * 0.31))
trim_frame_start = facefusion.globals.trim_frame_start trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end trim_frame_end = facefusion.globals.trim_frame_end
temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
commands = [ '-hwaccel', 'auto', '-i', target_path, '-q:v', str(temp_frame_compression), '-pix_fmt', 'rgb24' ] commands = [ '-hwaccel', 'auto', '-i', target_path, '-q:v', str(temp_frame_compression), '-pix_fmt', 'rgb24' ]
if trim_frame_start is not None and trim_frame_end is not None: if trim_frame_start is not None and trim_frame_end is not None:
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ]) commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ])
@ -67,7 +54,7 @@ def extract_frames(target_path : str, fps : float) -> bool:
commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ]) commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ])
else: else:
commands.extend([ '-vf', 'fps=' + str(fps) ]) commands.extend([ '-vf', 'fps=' + str(fps) ])
commands.extend([os.path.join(temp_directory_path, '%04d.' + facefusion.globals.temp_frame_format)]) commands.extend([ '-vsync', '0', temp_frames_pattern ])
return run_ffmpeg(commands) return run_ffmpeg(commands)
@ -78,19 +65,19 @@ def compress_image(output_path : str) -> bool:
def merge_video(target_path : str, fps : float) -> bool: def merge_video(target_path : str, fps : float) -> bool:
temp_output_path = get_temp_output_path(target_path) temp_output_video_path = get_temp_output_video_path(target_path)
temp_directory_path = get_temp_directory_path(target_path) temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
commands = [ '-hwaccel', 'auto', '-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.' + facefusion.globals.temp_frame_format), '-c:v', facefusion.globals.output_video_encoder ] commands = [ '-hwaccel', 'auto', '-r', str(fps), '-i', temp_frames_pattern, '-c:v', facefusion.globals.output_video_encoder ]
if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265' ]: if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5)) output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-crf', str(output_video_compression) ]) commands.extend([ '-crf', str(output_video_compression) ])
if facefusion.globals.output_video_encoder in [ 'libvpx' ]: if facefusion.globals.output_video_encoder in [ 'libvpx-vp9' ]:
output_video_compression = round(63 - (facefusion.globals.output_video_quality * 0.5)) output_video_compression = round(63 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-crf', str(output_video_compression) ]) commands.extend([ '-crf', str(output_video_compression) ])
if facefusion.globals.output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]: if facefusion.globals.output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5)) output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.5))
commands.extend([ '-cq', str(output_video_compression) ]) commands.extend([ '-cq', str(output_video_compression) ])
commands.extend([ '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625', '-y', temp_output_path ]) commands.extend([ '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_output_video_path ])
return run_ffmpeg(commands) return run_ffmpeg(commands)
@ -98,27 +85,26 @@ def restore_audio(target_path : str, output_path : str) -> bool:
fps = detect_fps(target_path) fps = detect_fps(target_path)
trim_frame_start = facefusion.globals.trim_frame_start trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end trim_frame_end = facefusion.globals.trim_frame_end
temp_output_path = get_temp_output_path(target_path) temp_output_video_path = get_temp_output_video_path(target_path)
commands = [ '-hwaccel', 'auto', '-i', temp_output_path, '-i', target_path ] commands = [ '-hwaccel', 'auto', '-i', temp_output_video_path ]
if trim_frame_start is None and trim_frame_end is None: if trim_frame_start is not None:
commands.extend([ '-c:a', 'copy' ]) start_time = trim_frame_start / fps
else: commands.extend([ '-ss', str(start_time) ])
if trim_frame_start is not None: if trim_frame_end is not None:
start_time = trim_frame_start / fps end_time = trim_frame_end / fps
commands.extend([ '-ss', str(start_time) ]) commands.extend([ '-to', str(end_time) ])
else: commands.extend([ '-i', target_path, '-c', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-y', output_path ])
commands.extend([ '-ss', '0' ])
if trim_frame_end is not None:
end_time = trim_frame_end / fps
commands.extend([ '-to', str(end_time) ])
commands.extend([ '-c:a', 'aac' ])
commands.extend([ '-map', '0:v:0', '-map', '1:a:0', '-y', output_path ])
return run_ffmpeg(commands) return run_ffmpeg(commands)
def get_temp_frame_paths(target_path : str) -> List[str]: def get_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
return sorted(glob.glob(temp_frames_pattern))
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.' + facefusion.globals.temp_frame_format))) return os.path.join(temp_directory_path, temp_frame_prefix + '.' + facefusion.globals.temp_frame_format)
def get_temp_directory_path(target_path : str) -> str: def get_temp_directory_path(target_path : str) -> str:
@ -126,9 +112,9 @@ def get_temp_directory_path(target_path : str) -> str:
return os.path.join(TEMP_DIRECTORY_PATH, target_name) return os.path.join(TEMP_DIRECTORY_PATH, target_name)
def get_temp_output_path(target_path : str) -> str: def get_temp_output_video_path(target_path : str) -> str:
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_OUTPUT_NAME) return os.path.join(temp_directory_path, TEMP_OUTPUT_VIDEO_NAME)
def normalize_output_path(source_path : Optional[str], target_path : Optional[str], output_path : Optional[str]) -> Optional[str]: def normalize_output_path(source_path : Optional[str], target_path : Optional[str], output_path : Optional[str]) -> Optional[str]:
@ -152,11 +138,11 @@ def create_temp(target_path : str) -> None:
def move_temp(target_path : str, output_path : str) -> None: def move_temp(target_path : str, output_path : str) -> None:
temp_output_path = get_temp_output_path(target_path) temp_output_video_path = get_temp_output_video_path(target_path)
if is_file(temp_output_path): if is_file(temp_output_video_path):
if is_file(output_path): if is_file(output_path):
os.remove(output_path) os.remove(output_path)
shutil.move(temp_output_path, output_path) shutil.move(temp_output_video_path, output_path)
def clear_temp(target_path : str) -> None: def clear_temp(target_path : str) -> None:
@ -191,15 +177,29 @@ def is_video(video_path : str) -> bool:
def conditional_download(download_directory_path : str, urls : List[str]) -> None: def conditional_download(download_directory_path : str, urls : List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
for url in urls: for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url)) download_file_path = os.path.join(download_directory_path, os.path.basename(url))
if not os.path.exists(download_file_path): total = get_download_size(url)
request = urllib.request.urlopen(url) # type: ignore[attr-defined] if is_file(download_file_path):
total = int(request.headers.get('Content-Length', 0)) initial = os.path.getsize(download_file_path)
with tqdm(total = total, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024) as progress: else:
urllib.request.urlretrieve(url, download_file_path, reporthook = lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] initial = 0
if initial < total:
with tqdm(total = total, initial = initial, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024) as progress:
subprocess.Popen([ 'curl', '--create-dirs', '--silent', '--location', '--continue-at', '-', '--output', download_file_path, url ])
current = initial
while current < total:
if is_file(download_file_path):
current = os.path.getsize(download_file_path)
progress.update(current - progress.n)
def get_download_size(url : str) -> int:
response = urllib.request.urlopen(url) # type: ignore[attr-defined]
content_length = response.getheader('Content-Length')
if content_length:
return int(content_length)
return 0
def resolve_relative_path(path : str) -> str: def resolve_relative_path(path : str) -> str:

View File

@ -1,27 +1,38 @@
from typing import Optional from typing import Optional
from functools import lru_cache
import cv2 import cv2
from facefusion.typing import Frame from facefusion.typing import Frame
def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[Frame]: def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[Frame]:
capture = cv2.VideoCapture(video_path) if video_path:
if capture.isOpened(): capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) if capture.isOpened():
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
has_frame, frame = capture.read() capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
capture.release() has_frame, frame = capture.read()
if has_frame: capture.release()
return frame if has_frame:
return frame
return None
def detect_fps(video_path : str) -> Optional[float]:
if video_path:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
return capture.get(cv2.CAP_PROP_FPS)
return None return None
def count_video_frame_total(video_path : str) -> int: def count_video_frame_total(video_path : str) -> int:
capture = cv2.VideoCapture(video_path) if video_path:
if capture.isOpened(): capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) if capture.isOpened():
capture.release() video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
return video_frame_total capture.release()
return video_frame_total
return 0 return 0
@ -36,3 +47,20 @@ def resize_frame_dimension(frame : Frame, max_height : int) -> Frame:
max_width = int(width * scale) max_width = int(width * scale)
frame = cv2.resize(frame, (max_width, max_height)) frame = cv2.resize(frame, (max_width, max_height))
return frame return frame
@lru_cache(maxsize = 128)
def read_static_image(image_path : str) -> Optional[Frame]:
return read_image(image_path)
def read_image(image_path : str) -> Optional[Frame]:
if image_path:
return cv2.imread(image_path)
return None
def write_image(image_path : str, frame : Frame) -> bool:
if image_path:
return cv2.imwrite(image_path, frame)
return False

View File

@ -1,8 +1,8 @@
WORDING =\ WORDING =\
{ {
'select_onnxruntime_install': 'Select the onnxruntime to be installed',
'python_not_supported': 'Python version is not supported, upgrade to {version} or higher', 'python_not_supported': 'Python version is not supported, upgrade to {version} or higher',
'ffmpeg_not_installed': 'FFMpeg is not installed', 'ffmpeg_not_installed': 'FFMpeg is not installed',
'onnxruntime_help': 'select the onnxruntime to be installed',
'source_help': 'select a source image', 'source_help': 'select a source image',
'target_help': 'select a target image or video', 'target_help': 'select a target image or video',
'output_help': 'specify the output file or directory', 'output_help': 'specify the output file or directory',
@ -79,9 +79,7 @@ WORDING =\
'preview_image_label': 'PREVIEW', 'preview_image_label': 'PREVIEW',
'preview_frame_slider_label': 'PREVIEW FRAME', 'preview_frame_slider_label': 'PREVIEW FRAME',
'frame_processors_checkbox_group_label': 'FRAME PROCESSORS', 'frame_processors_checkbox_group_label': 'FRAME PROCESSORS',
'keep_fps_checkbox_label': 'KEEP FPS', 'settings_checkbox_group_label': 'SETTINGS',
'keep_temp_checkbox_label': 'KEEP TEMP',
'skip_audio_checkbox_label': 'SKIP AUDIO',
'temp_frame_format_dropdown_label': 'TEMP FRAME FORMAT', 'temp_frame_format_dropdown_label': 'TEMP FRAME FORMAT',
'temp_frame_quality_slider_label': 'TEMP FRAME QUALITY', 'temp_frame_quality_slider_label': 'TEMP FRAME QUALITY',
'trim_frame_start_slider_label': 'TRIM FRAME START', 'trim_frame_start_slider_label': 'TRIM FRAME START',
@ -90,6 +88,8 @@ WORDING =\
'target_file_label': 'TARGET', 'target_file_label': 'TARGET',
'webcam_image_label': 'WEBCAM', 'webcam_image_label': 'WEBCAM',
'webcam_mode_radio_label': 'WEBCAM MODE', 'webcam_mode_radio_label': 'WEBCAM MODE',
'webcam_resolution_dropdown': 'WEBCAM RESOLUTION',
'webcam_fps_slider': 'WEBCAM FPS',
'point': '.', 'point': '.',
'comma': ',', 'comma': ',',
'colon': ':', 'colon': ':',

View File

@ -1,5 +1,5 @@
gfpgan==1.3.8 gfpgan==1.3.8
gradio==3.42.0 gradio==3.44.3
insightface==0.7.3 insightface==0.7.3
numpy==1.24.3 numpy==1.24.3
onnx==1.14.1 onnx==1.14.1

View File

@ -1,4 +1,5 @@
import subprocess import subprocess
import sys
import pytest import pytest
from facefusion import wording from facefusion import wording
@ -16,7 +17,7 @@ def before_all() -> None:
def test_image_to_image() -> None: def test_image_to_image() -> None:
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.jpg', '-o', '.assets/examples', '--headless' ] commands = [ sys.executable, 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.jpg', '-o', '.assets/examples', '--headless' ]
run = subprocess.run(commands, stdout = subprocess.PIPE) run = subprocess.run(commands, stdout = subprocess.PIPE)
assert run.returncode == 0 assert run.returncode == 0
@ -24,7 +25,7 @@ def test_image_to_image() -> None:
def test_image_to_video() -> None: def test_image_to_video() -> None:
commands = [ 'python', 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.mp4', '-o', '.assets/examples', '--trim-frame-end', '10', '--headless' ] commands = [ sys.executable, 'run.py', '-s', '.assets/examples/source.jpg', '-t', '.assets/examples/target-1080p.mp4', '-o', '.assets/examples', '--trim-frame-end', '10', '--headless' ]
run = subprocess.run(commands, stdout = subprocess.PIPE) run = subprocess.run(commands, stdout = subprocess.PIPE)
assert run.returncode == 0 assert run.returncode == 0

View File

@ -4,7 +4,7 @@ import subprocess
import pytest import pytest
import facefusion.globals import facefusion.globals
from facefusion.utilities import conditional_download, detect_fps, extract_frames, create_temp, get_temp_directory_path, clear_temp, normalize_output_path, is_file, is_directory, is_image, is_video, encode_execution_providers, decode_execution_providers from facefusion.utilities import conditional_download, extract_frames, create_temp, get_temp_directory_path, clear_temp, normalize_output_path, is_file, is_directory, is_image, is_video, encode_execution_providers, decode_execution_providers
@pytest.fixture(scope = 'module', autouse = True) @pytest.fixture(scope = 'module', autouse = True)
@ -31,12 +31,6 @@ def before_each() -> None:
facefusion.globals.temp_frame_format = 'jpg' facefusion.globals.temp_frame_format = 'jpg'
def test_detect_fps() -> None:
assert detect_fps('.assets/examples/target-240p-25fps.mp4') == 25.0
assert detect_fps('.assets/examples/target-240p-30fps.mp4') == 30.0
assert detect_fps('.assets/examples/target-240p-60fps.mp4') == 60.0
def test_extract_frames() -> None: def test_extract_frames() -> None:
target_paths =\ target_paths =\
[ [

49
tests/test_vision.py Normal file
View File

@ -0,0 +1,49 @@
import subprocess
import pytest
import facefusion.globals
from facefusion.utilities import conditional_download
from facefusion.vision import get_video_frame, detect_fps, count_video_frame_total
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
facefusion.globals.temp_frame_quality = 100
facefusion.globals.trim_frame_start = None
facefusion.globals.trim_frame_end = None
facefusion.globals.temp_frame_format = 'png'
conditional_download('.assets/examples',
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples/source.jpg',
'https://github.com/facefusion/facefusion-assets/releases/download/examples/target-240p.mp4'
])
subprocess.run([ 'ffmpeg', '-i', '.assets/examples/target-240p.mp4', '-vf', 'fps=25', '.assets/examples/target-240p-25fps.mp4' ])
subprocess.run([ 'ffmpeg', '-i', '.assets/examples/target-240p.mp4', '-vf', 'fps=30', '.assets/examples/target-240p-30fps.mp4' ])
subprocess.run([ 'ffmpeg', '-i', '.assets/examples/target-240p.mp4', '-vf', 'fps=60', '.assets/examples/target-240p-60fps.mp4' ])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
facefusion.globals.trim_frame_start = None
facefusion.globals.trim_frame_end = None
facefusion.globals.temp_frame_quality = 90
facefusion.globals.temp_frame_format = 'jpg'
def test_get_video_frame() -> None:
assert get_video_frame('.assets/examples/target-240p-25fps.mp4') is not None
assert get_video_frame('invalid') is None
def test_detect_fps() -> None:
assert detect_fps('.assets/examples/target-240p-25fps.mp4') == 25.0
assert detect_fps('.assets/examples/target-240p-30fps.mp4') == 30.0
assert detect_fps('.assets/examples/target-240p-60fps.mp4') == 60.0
assert detect_fps('invalid') is None
def test_count_video_frame_total() -> None:
assert count_video_frame_total('.assets/examples/target-240p-25fps.mp4') == 270
assert count_video_frame_total('.assets/examples/target-240p-30fps.mp4') == 324
assert count_video_frame_total('.assets/examples/target-240p-60fps.mp4') == 648
assert count_video_frame_total('invalid') == 0