Python 3.8 分包脚本详解
import os
import sys
import zipfile
import hashlib
import tempfile
import shutil
from functools import partial
def validate_environment():
"""验证Python版本和必要模块"""
if sys.version_info < (3, 6):
raise RuntimeError("需要Python 3.6或更高版本")
if not hasattr(os, 'scandir'):
raise RuntimeError("当前环境不支持os.scandir()方法")
def get_all_files(source_dir):
"""收集并标准化所有文件信息(兼容符号链接)"""
source_dir = os.path.abspath(source_dir)
if not os.path.isdir(source_dir):
raise ValueError(f"源目录不存在:{source_dir}")
files = []
for root, _, filenames in os.walk(source_dir, followlinks=False):
for filename in filenames:
abs_path = os.path.join(root, filename)
try:
# 跳过无法访问的文件(如权限问题)
if not os.access(abs_path, os.R_OK):
print(f"警告:跳过不可读文件 {abs_path}")
continue
# 标准化相对路径为POSIX格式
rel_path = os.path.relpath(abs_path, source_dir)
rel_path = rel_path.replace(os.sep, '/')
# 排除隐藏文件(可选)
if rel_path.split('/')[-1].startswith('.'):
continue
files.append({
'abs': abs_path,
'rel': rel_path,
'size': os.path.getsize(abs_path)
})
except OSError as e:
print(f"跳过问题文件 {abs_path}:{str(e)}")
# 确保稳定的文件排序
files.sort(key=lambda x: x['rel'])
return files
def split_into_packages(files, max_size):
"""智能分包算法(处理超大文件和空目录)"""
if max_size <= 0:
raise ValueError("分包大小必须大于0")
packages = []
current_pkg = []
current_size = 0
for file_info in files:
file_size = file_info['size']
if file_size == 0:
print(f"警告:空文件 {file_info['rel']} 将被跳过")
continue
# 处理超大文件
if file_size > max_size:
if current_pkg:
packages.append(current_pkg)
current_pkg = []
current_size = 0
packages.append([file_info])
continue
# 常规分包
if current_size + file_size > max_size:
packages.append(current_pkg)
current_pkg = [file_info]
current_size = file_size
else:
current_pkg.append(file_info)
current_size += file_size
if current_pkg:
packages.append(current_pkg)
return packages
def calculate_md5(file_path, chunk_size=4096):
"""安全计算大文件哈希"""
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(partial(f.read, chunk_size), b''):
md5.update(chunk)
return md5.hexdigest()
def compress_package(package, source_root, output_dir):
"""创建带完整目录结构的压缩包"""
temp_path = None
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(
dir=output_dir,
delete=False,
suffix='.tmp.zip'
) as temp_file:
temp_path = temp_file.name
# 创建ZIP并维护目录结构
with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
created_dirs = set()
for file_info in package:
abs_path = file_info['abs']
arcname = file_info['rel']
# 显式创建父目录
parent_dir = os.path.dirname(arcname)
if parent_dir:
parts = parent_dir.split('/')
current_dir = ''
for part in parts:
current_dir = f"{current_dir}/{part}" if current_dir else part
if current_dir not in created_dirs:
zipf.mkdir(current_dir)
created_dirs.add(current_dir)
# 写入文件
zipf.write(abs_path, arcname)
# 计算哈希并重命名
md5_hash = calculate_md5(temp_path)
final_name = f"{md5_hash}.Unity3D"
final_path = os.path.join(output_dir, final_name)
# 安全移动文件
shutil.move(temp_path, final_path)
return final_name
except Exception as e:
# 清理临时文件
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
except Exception as cleanup_err:
print(f"清理临时文件失败:{cleanup_err}")
raise RuntimeError(f"压缩失败:{str(e)}") from e
def main(source_dir, max_size, output_dir):
"""主处理流程"""
validate_environment()
# 参数校验
if not os.path.isdir(source_dir):
raise ValueError(f"无效源目录:{source_dir}")
if max_size <= 0:
raise ValueError(f"无效分包大小:{max_size} 字节")
# 准备输出目录
os.makedirs(output_dir, exist_ok=True)
print(f"源目录:{source_dir}")
print(f"输出目录:{output_dir}")
print(f"最大分包大小:{max_size / 1024 / 1024:.2f} MB\n")
# 文件收集
files = get_all_files(source_dir)
if not files:
raise RuntimeError("源目录中没有可压缩的文件")
print(f"找到 {len(files)} 个待处理文件")
# 智能分包
packages = split_into_packages(files, max_size)
print(f"生成 {len(packages)} 个分包\n")
# 执行压缩
results = []
for idx, pkg in enumerate(packages, 1):
try:
print(f"[{idx}/{len(packages)}] 正在处理分包,包含 {len(pkg)} 个文件…")
result_file = compress_package(pkg, source_dir, output_dir)
results.append(os.path.basename(result_file))
except Exception as e:
print(f"错误:分包 {idx} 处理失败 – {str(e)}")
if len(packages) == 1: # 如果是唯一包则直接退出
raise
# 输出结果
print("\n压缩完成,生成以下文件:")
for name in results:
print("\""+name+"\""+",")
if __name__ == '__main__':
try:
# 配置参数(按需修改)
source_directory = r"/Resources"
max_package_size = 100* 1024 * 1024 # 100MB
output_directory = r"/zipout"
main(source_directory, max_package_size, output_directory)
except Exception as e:
print(f"\n错误:{str(e)}")
sys.exit(1)
作者:CreasyChan