56 lines
2.2 KiB
Python
56 lines
2.2 KiB
Python
from PIL import Image
|
||
import os
|
||
import concurrent.futures
|
||
from PIL import Image
|
||
|
||
# 设置允许的最大像素数,例如设置为1亿像素
|
||
Image.MAX_IMAGE_PIXELS = 1000000000
|
||
def validate_folder_path(folder_path):
|
||
"""
|
||
验证文件夹路径的安全性,避免路径遍历攻击。
|
||
"""
|
||
clean_path = os.path.abspath(os.path.expanduser(folder_path))
|
||
if not clean_path.startswith(os.path.abspath(os.getcwd())):
|
||
raise ValueError("不安全的路径访问尝试。")
|
||
return clean_path
|
||
|
||
def convert_image_to_webp(input_path, output_path):
|
||
"""
|
||
单个图像转换函数,为了异常处理和代码清晰。
|
||
"""
|
||
try:
|
||
with Image.open(input_path) as img:
|
||
img.save(output_path, "WEBP")
|
||
print(f"Converted {os.path.basename(input_path)} to {os.path.basename(output_path)}")
|
||
except Exception as e: # 捕获更广泛的异常
|
||
print(f"Cannot convert {os.path.basename(input_path)}: {e}")
|
||
|
||
def convert_jpg_to_webp(input_folder, output_folder, max_workers=None):
|
||
input_folder = validate_folder_path(input_folder)
|
||
output_folder = validate_folder_path(output_folder)
|
||
|
||
if not os.path.exists(output_folder):
|
||
try:
|
||
os.makedirs(output_folder)
|
||
except Exception as e:
|
||
print(f"无法创建输出文件夹 {output_folder}: {e}")
|
||
return
|
||
|
||
# 使用集合来简化文件格式检查
|
||
valid_extensions = {"jpg", "jpeg", "png" , "JPG"}
|
||
|
||
# 使用并发执行来提高性能
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
for filename in os.listdir(input_folder):
|
||
if filename.endswith(tuple(valid_extensions)):
|
||
input_path = os.path.join(input_folder, filename)
|
||
webp_filename = os.path.splitext(filename)[0] + '.webp'
|
||
output_path = os.path.join(output_folder, webp_filename)
|
||
executor.submit(convert_image_to_webp, input_path, output_path)
|
||
|
||
# 指定输入和输出文件夹
|
||
input_folder = 'input'
|
||
output_folder = 'output'
|
||
|
||
# 调用函数进行转换,使用6个线程并发转换(根据实际情况调整)
|
||
convert_jpg_to_webp(input_folder, output_folder, max_workers=6) |