
* Simplify bbox access
* Code cleanup
* Simplify bbox access
* Move code to face helper
* Swap and paste back without insightface
* Swap and paste back without insightface
* Remove semaphore where possible
* Improve paste back performance
* Cosmetic changes
* Move the predictor to ONNX to avoid tensorflow, Use video ranges for prediction
* Make CI happy
* Move template and size to the options
* Fix different color on box
* Uniform model handling for predictor
* Uniform frame handling for predictor
* Pass kps direct to warp_face
* Fix urllib
* Analyse based on matches
* Analyse based on rate
* Fix CI
* ROCM and OpenVINO mapping for torch backends
* Fix the paste back speed
* Fix import
* Replace retinaface with yunet (#168)
* Remove insightface dependency
* Fix urllib
* Some fixes
* Analyse based on matches
* Analyse based on rate
* Fix CI
* Migrate to Yunet
* Something is off here
* We indeed need semaphore for yunet
* Normalize the normed_embedding
* Fix download of models
* Fix download of models
* Fix download of models
* Add score and improve affine_matrix
* Temp fix for bbox out of frame
* Temp fix for bbox out of frame
* ROCM and OpenVINO mapping for torch backends
* Normalize bbox
* Implement gender age
* Cosmetics on cli args
* Prevent face jumping
* Fix the paste back speed
* FIx import
* Introduce detection size
* Cosmetics on face analyser ARGS and globals
* Temp fix for shaking face
* Accurate event handling
* Accurate event handling
* Accurate event handling
* Set the reference_frame_number in face_selector component
* Simswap model (#171)
* Add simswap models
* Add ghost models
* Introduce normed template
* Conditional prepare and normalize for ghost
* Conditional prepare and normalize for ghost
* Get simswap working
* Get simswap working
* Fix refresh of swapper model
* Refine face selection and detection (#174)
* Refine face selection and detection
* Update README.md
* Fix some face analyser UI
* Fix some face analyser UI
* Introduce range handling for CLI arguments
* Introduce range handling for CLI arguments
* Fix some spacings
* Disable onnxruntime warnings
* Use cv2.blur over cv2.GaussianBlur for better performance
* Revert "Use cv2.blur over cv2.GaussianBlur for better performance"
This reverts commit bab666d6f9
.
* Prepare universal face detection
* Prepare universal face detection part2
* Reimplement retinaface
* Introduce cached anchors creation
* Restore filtering to enhance performance
* Minor changes
* Minor changes
* More code but easier to understand
* Minor changes
* Rename predictor to content analyser
* Change detection/recognition to detector/recognizer
* Fix crop frame borders
* Fix spacing
* Allow normalize output without a source
* Improve conditional set face reference
* Update dependencies
* Add timeout for get_download_size
* Fix performance due disorder
* Move models to assets repository, Adjust namings
* Refactor face analyser
* Rename models once again
* Fix spacing
* Highres simswap (#192)
* Introduce highres simswap
* Fix simswap 256 color issue (#191)
* Fix simswap 256 color issue
* Update face_swapper.py
* Normalize models and host in our repo
* Normalize models and host in our repo
---------
Co-authored-by: Harisreedhar <46858047+harisreedhar@users.noreply.github.com>
* Rename face analyser direction to face analyser order
* Improve the UI for face selector
* Add best-worst, worst-best detector ordering
* Clear as needed and fix zero score bug
* Fix linter
* Improve startup time by multi thread remote download size
* Just some cosmetics
* Normalize swagger source input, Add blendface_256 (unfinished)
* New paste back (#195)
* add new paste_back (#194)
* add new paste_back
* Update face_helper.py
* Update face_helper.py
* add commandline arguments and gui
* fix conflict
* Update face_mask.py
* type fix
* Clean some wording and typing
---------
Co-authored-by: Harisreedhar <46858047+harisreedhar@users.noreply.github.com>
* Clean more names, use blur range approach
* Add blur padding range
* Change the padding order
* Fix yunet filename
* Introduce face debugger
* Use percent for mask padding
* Ignore this
* Ignore this
* Simplify debugger output
* implement blendface (#198)
* Clean up after the genius
* Add gpen_bfr_256
* Cosmetics
* Ignore face_mask_padding on face enhancer
* Update face_debugger.py (#202)
* Shrink debug_face() to a minimum
* Mark as 2.0.0 release
* remove unused (#204)
* Apply NMS (#205)
* Apply NMS
* Apply NMS part2
* Fix restoreformer url
* Add debugger cli and gui components (#206)
* Add debugger cli and gui components
* update
* Polishing the types
* Fix usage in README.md
* Update onnxruntime
* Support for webp
* Rename paste-back to face-mask
* Add license to README
* Add license to README
* Extend face selector mode by one
* Update utilities.py (#212)
* Stop inline camera on stream
* Minor webcam updates
* Gracefully start and stop webcam
* Rename capture to video_capture
* Make get webcam capture pure
* Check webcam to not be None
* Remove some is not None
* Use index 0 for webcam
* Remove memory lookup within progress bar
* Less progress bar updates
* Uniform progress bar
* Use classic progress bar
* Fix image and video validation
* Use different hash for cache
* Use best-worse order for webcam
* Normalize padding like CSS
* Update preview
* Fix max memory
* Move disclaimer and license to the docs
* Update wording in README
* Add LICENSE.md
* Fix argument in README
---------
Co-authored-by: Harisreedhar <46858047+harisreedhar@users.noreply.github.com>
Co-authored-by: alex00ds <31631959+alex00ds@users.noreply.github.com>
269 lines
11 KiB
Python
269 lines
11 KiB
Python
from typing import Any, List, Optional
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
import glob
|
|
import filetype
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import ssl
|
|
import subprocess
|
|
import tempfile
|
|
import urllib.request
|
|
import onnxruntime
|
|
|
|
import facefusion.globals
|
|
from facefusion import wording
|
|
from facefusion.typing import Padding
|
|
from facefusion.vision import detect_fps
|
|
|
|
TEMP_DIRECTORY_PATH = os.path.join(tempfile.gettempdir(), 'facefusion')
|
|
TEMP_OUTPUT_VIDEO_NAME = 'temp.mp4'
|
|
|
|
# monkey patch ssl
|
|
if platform.system().lower() == 'darwin':
|
|
ssl._create_default_https_context = ssl._create_unverified_context
|
|
|
|
|
|
def run_ffmpeg(args : List[str]) -> bool:
|
|
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ]
|
|
commands.extend(args)
|
|
try:
|
|
subprocess.run(commands, stderr = subprocess.PIPE, check = True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
|
|
def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]:
|
|
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ]
|
|
commands.extend(args)
|
|
return subprocess.Popen(commands, stdin = subprocess.PIPE)
|
|
|
|
|
|
def extract_frames(target_path : str, fps : float) -> bool:
|
|
temp_frame_compression = round(31 - (facefusion.globals.temp_frame_quality * 0.31))
|
|
trim_frame_start = facefusion.globals.trim_frame_start
|
|
trim_frame_end = facefusion.globals.trim_frame_end
|
|
temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
|
|
commands = [ '-hwaccel', 'auto', '-i', target_path, '-q:v', str(temp_frame_compression), '-pix_fmt', 'rgb24' ]
|
|
if trim_frame_start is not None and trim_frame_end is not None:
|
|
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ])
|
|
elif trim_frame_start is not None:
|
|
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(fps) ])
|
|
elif trim_frame_end is not None:
|
|
commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(fps) ])
|
|
else:
|
|
commands.extend([ '-vf', 'fps=' + str(fps) ])
|
|
commands.extend([ '-vsync', '0', temp_frames_pattern ])
|
|
return run_ffmpeg(commands)
|
|
|
|
|
|
def compress_image(output_path : str) -> bool:
|
|
output_image_compression = round(31 - (facefusion.globals.output_image_quality * 0.31))
|
|
commands = [ '-hwaccel', 'auto', '-i', output_path, '-q:v', str(output_image_compression), '-y', output_path ]
|
|
return run_ffmpeg(commands)
|
|
|
|
|
|
def merge_video(target_path : str, fps : float) -> bool:
|
|
temp_output_video_path = get_temp_output_video_path(target_path)
|
|
temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
|
|
commands = [ '-hwaccel', 'auto', '-r', str(fps), '-i', temp_frames_pattern, '-c:v', facefusion.globals.output_video_encoder ]
|
|
if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265' ]:
|
|
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.51))
|
|
commands.extend([ '-crf', str(output_video_compression) ])
|
|
if facefusion.globals.output_video_encoder in [ 'libvpx-vp9' ]:
|
|
output_video_compression = round(63 - (facefusion.globals.output_video_quality * 0.63))
|
|
commands.extend([ '-crf', str(output_video_compression) ])
|
|
if facefusion.globals.output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
|
|
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.51))
|
|
commands.extend([ '-cq', str(output_video_compression) ])
|
|
commands.extend([ '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_output_video_path ])
|
|
return run_ffmpeg(commands)
|
|
|
|
|
|
def restore_audio(target_path : str, output_path : str) -> bool:
|
|
fps = detect_fps(target_path)
|
|
trim_frame_start = facefusion.globals.trim_frame_start
|
|
trim_frame_end = facefusion.globals.trim_frame_end
|
|
temp_output_video_path = get_temp_output_video_path(target_path)
|
|
commands = [ '-hwaccel', 'auto', '-i', temp_output_video_path ]
|
|
if trim_frame_start is not None:
|
|
start_time = trim_frame_start / fps
|
|
commands.extend([ '-ss', str(start_time) ])
|
|
if trim_frame_end is not None:
|
|
end_time = trim_frame_end / fps
|
|
commands.extend([ '-to', str(end_time) ])
|
|
commands.extend([ '-i', target_path, '-c', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-y', output_path ])
|
|
return run_ffmpeg(commands)
|
|
|
|
|
|
def get_temp_frame_paths(target_path : str) -> List[str]:
|
|
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
|
|
return sorted(glob.glob(temp_frames_pattern))
|
|
|
|
|
|
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
|
|
temp_directory_path = get_temp_directory_path(target_path)
|
|
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + facefusion.globals.temp_frame_format)
|
|
|
|
|
|
def get_temp_directory_path(target_path : str) -> str:
|
|
target_name, _ = os.path.splitext(os.path.basename(target_path))
|
|
return os.path.join(TEMP_DIRECTORY_PATH, target_name)
|
|
|
|
|
|
def get_temp_output_video_path(target_path : str) -> str:
|
|
temp_directory_path = get_temp_directory_path(target_path)
|
|
return os.path.join(temp_directory_path, TEMP_OUTPUT_VIDEO_NAME)
|
|
|
|
|
|
def create_temp(target_path : str) -> None:
|
|
temp_directory_path = get_temp_directory_path(target_path)
|
|
Path(temp_directory_path).mkdir(parents = True, exist_ok = True)
|
|
|
|
|
|
def move_temp(target_path : str, output_path : str) -> None:
|
|
temp_output_video_path = get_temp_output_video_path(target_path)
|
|
if is_file(temp_output_video_path):
|
|
if is_file(output_path):
|
|
os.remove(output_path)
|
|
shutil.move(temp_output_video_path, output_path)
|
|
|
|
|
|
def clear_temp(target_path : str) -> None:
|
|
temp_directory_path = get_temp_directory_path(target_path)
|
|
parent_directory_path = os.path.dirname(temp_directory_path)
|
|
if not facefusion.globals.keep_temp and is_directory(temp_directory_path):
|
|
shutil.rmtree(temp_directory_path)
|
|
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
|
|
os.rmdir(parent_directory_path)
|
|
|
|
|
|
def normalize_output_path(source_path : Optional[str], target_path : Optional[str], output_path : Optional[str]) -> Optional[str]:
|
|
if is_file(target_path) and is_directory(output_path):
|
|
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
|
|
if is_file(source_path):
|
|
source_name, _ = os.path.splitext(os.path.basename(source_path))
|
|
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
|
|
return os.path.join(output_path, target_name + target_extension)
|
|
if is_file(target_path) and output_path:
|
|
_, target_extension = os.path.splitext(os.path.basename(target_path))
|
|
output_name, output_extension = os.path.splitext(os.path.basename(output_path))
|
|
output_directory_path = os.path.dirname(output_path)
|
|
if is_directory(output_directory_path) and output_extension:
|
|
return os.path.join(output_directory_path, output_name + target_extension)
|
|
return None
|
|
return output_path
|
|
|
|
|
|
def normalize_padding(padding : Optional[List[int]]) -> Optional[Padding]:
|
|
if padding and len(padding) == 1:
|
|
return tuple([ padding[0], padding[0], padding[0], padding[0] ]) # type: ignore[return-value]
|
|
if padding and len(padding) == 2:
|
|
return tuple([ padding[0], padding[1], padding[0], padding[1] ]) # type: ignore[return-value]
|
|
if padding and len(padding) == 3:
|
|
return tuple([ padding[0], padding[1], padding[2], padding[1] ]) # type: ignore[return-value]
|
|
if padding and len(padding) == 4:
|
|
return tuple(padding) # type: ignore[return-value]
|
|
return None
|
|
|
|
|
|
def is_file(file_path : str) -> bool:
|
|
return bool(file_path and os.path.isfile(file_path))
|
|
|
|
|
|
def is_directory(directory_path : str) -> bool:
|
|
return bool(directory_path and os.path.isdir(directory_path))
|
|
|
|
|
|
def is_image(image_path : str) -> bool:
|
|
if is_file(image_path):
|
|
mimetype = filetype.guess(image_path).mime
|
|
return bool(mimetype and mimetype.startswith('image/'))
|
|
return False
|
|
|
|
|
|
def is_video(video_path : str) -> bool:
|
|
if is_file(video_path):
|
|
mimetype = filetype.guess(video_path).mime
|
|
return bool(mimetype and mimetype.startswith('video/'))
|
|
return False
|
|
|
|
|
|
def conditional_download(download_directory_path : str, urls : List[str]) -> None:
|
|
with ThreadPoolExecutor() as executor:
|
|
for url in urls:
|
|
executor.submit(get_download_size, url)
|
|
for url in urls:
|
|
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
|
|
total = get_download_size(url)
|
|
if is_file(download_file_path):
|
|
initial = os.path.getsize(download_file_path)
|
|
else:
|
|
initial = 0
|
|
if initial < total:
|
|
with tqdm(total = total, initial = initial, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024, ascii = ' =') as progress:
|
|
subprocess.Popen([ 'curl', '--create-dirs', '--silent', '--insecure', '--location', '--continue-at', '-', '--output', download_file_path, url ])
|
|
current = initial
|
|
while current < total:
|
|
if is_file(download_file_path):
|
|
current = os.path.getsize(download_file_path)
|
|
progress.update(current - progress.n)
|
|
|
|
|
|
@lru_cache(maxsize = None)
|
|
def get_download_size(url : str) -> int:
|
|
try:
|
|
response = urllib.request.urlopen(url, timeout = 10)
|
|
return int(response.getheader('Content-Length'))
|
|
except (OSError, ValueError):
|
|
return 0
|
|
|
|
|
|
def is_download_done(url : str, file_path : str) -> bool:
|
|
if is_file(file_path):
|
|
return get_download_size(url) == os.path.getsize(file_path)
|
|
return False
|
|
|
|
|
|
def resolve_relative_path(path : str) -> str:
|
|
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
|
|
|
|
|
|
def list_module_names(path : str) -> Optional[List[str]]:
|
|
if os.path.exists(path):
|
|
files = os.listdir(path)
|
|
return [ Path(file).stem for file in files if not Path(file).stem.startswith(('.', '__')) ]
|
|
return None
|
|
|
|
|
|
def encode_execution_providers(execution_providers : List[str]) -> List[str]:
|
|
return [ execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers ]
|
|
|
|
|
|
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
|
|
available_execution_providers = onnxruntime.get_available_providers()
|
|
encoded_execution_providers = encode_execution_providers(available_execution_providers)
|
|
return [ execution_provider for execution_provider, encoded_execution_provider in zip(available_execution_providers, encoded_execution_providers) if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers) ]
|
|
|
|
|
|
def map_device(execution_providers : List[str]) -> str:
|
|
if 'CoreMLExecutionProvider' in execution_providers:
|
|
return 'mps'
|
|
if 'CUDAExecutionProvider' in execution_providers or 'ROCMExecutionProvider' in execution_providers :
|
|
return 'cuda'
|
|
if 'OpenVINOExecutionProvider' in execution_providers:
|
|
return 'mkl'
|
|
return 'cpu'
|
|
|
|
|
|
def create_metavar(ranges : List[Any]) -> str:
|
|
return '[' + str(ranges[0]) + '-' + str(ranges[-1]) + ']'
|
|
|
|
|
|
def update_status(message : str, scope : str = 'FACEFUSION.CORE') -> None:
|
|
print('[' + scope + '] ' + message)
|