import os
import sys
import zipfile
import hashlib
import tempfile
import shutil
from functools import partial

def validate_environment():
    """验证Python版本和必要模块"""
    if sys.version_info < (3, 6):
        raise RuntimeError("需要Python 3.6或更高版本")
    if not hasattr(os, 'scandir'):
        raise RuntimeError("当前环境不支持os.scandir()方法")

def get_all_files(source_dir):
    """收集并标准化所有文件信息(兼容符号链接)"""
    source_dir = os.path.abspath(source_dir)
    if not os.path.isdir(source_dir):
        raise ValueError(f"源目录不存在:{source_dir}")

    files = []
    for root, _, filenames in os.walk(source_dir, followlinks=False):
        for filename in filenames:
            abs_path = os.path.join(root, filename)
            try:
                # 跳过无法访问的文件(如权限问题)
                if not os.access(abs_path, os.R_OK):
                    print(f"警告:跳过不可读文件 {abs_path}")
                    continue
                
                # 标准化相对路径为POSIX格式
                rel_path = os.path.relpath(abs_path, source_dir)
                rel_path = rel_path.replace(os.sep, '/')
                
                # 排除隐藏文件(可选)
                if rel_path.split('/')[-1].startswith('.'):
                    continue
                
                files.append({
                    'abs': abs_path,
                    'rel': rel_path,
                    'size': os.path.getsize(abs_path)
                })
            except OSError as e:
                print(f"跳过问题文件 {abs_path}:{str(e)}")

    # 确保稳定的文件排序
    files.sort(key=lambda x: x['rel'])
    return files

def split_into_packages(files, max_size):
    """智能分包算法(处理超大文件和空目录)"""
    if max_size <= 0:
        raise ValueError("分包大小必须大于0")

    packages = []
    current_pkg = []
    current_size = 0

    for file_info in files:
        file_size = file_info['size']
        if file_size == 0:
            print(f"警告:空文件 {file_info['rel']} 将被跳过")
            continue

        # 处理超大文件
        if file_size > max_size:
            if current_pkg:
                packages.append(current_pkg)
                current_pkg = []
                current_size = 0
            packages.append([file_info])
            continue

        # 常规分包
        if current_size + file_size > max_size:
            packages.append(current_pkg)
            current_pkg = [file_info]
            current_size = file_size
        else:
            current_pkg.append(file_info)
            current_size += file_size

    if current_pkg:
        packages.append(current_pkg)
    
    return packages

def calculate_md5(file_path, chunk_size=4096):
    """安全计算大文件哈希"""
    md5 = hashlib.md5()
    with open(file_path, 'rb') as f:
        for chunk in iter(partial(f.read, chunk_size), b''):
            md5.update(chunk)
    return md5.hexdigest()

def compress_package(package, source_root, output_dir):
    """创建带完整目录结构的压缩包"""
    temp_path = None
    try:
        # 创建临时文件
        with tempfile.NamedTemporaryFile(
            dir=output_dir,
            delete=False,
            suffix='.tmp.zip'
        ) as temp_file:
            temp_path = temp_file.name

        # 创建ZIP并维护目录结构
        with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            created_dirs = set()
            
            for file_info in package:
                abs_path = file_info['abs']
                arcname = file_info['rel']
                
                # 显式创建父目录
                parent_dir = os.path.dirname(arcname)
                if parent_dir:
                    parts = parent_dir.split('/')
                    current_dir = ''
                    for part in parts:
                        current_dir = f"{current_dir}/{part}" if current_dir else part
                        if current_dir not in created_dirs:
                            zipf.mkdir(current_dir)
                            created_dirs.add(current_dir)
                
                # 写入文件
                zipf.write(abs_path, arcname)

        # 计算哈希并重命名
        md5_hash = calculate_md5(temp_path)
        final_name = f"{md5_hash}.Unity3D"
        final_path = os.path.join(output_dir, final_name)
        
        # 安全移动文件
        shutil.move(temp_path, final_path)
        return final_name

    except Exception as e:
        # 清理临时文件
        if temp_path and os.path.exists(temp_path):
            try:
                os.remove(temp_path)
            except Exception as cleanup_err:
                print(f"清理临时文件失败:{cleanup_err}")
        raise RuntimeError(f"压缩失败:{str(e)}") from e

def main(source_dir, max_size, output_dir):
    """主处理流程"""
    validate_environment()
    
    # 参数校验
    if not os.path.isdir(source_dir):
        raise ValueError(f"无效源目录:{source_dir}")
    if max_size <= 0:
        raise ValueError(f"无效分包大小:{max_size} 字节")
    
    # 准备输出目录
    os.makedirs(output_dir, exist_ok=True)
    print(f"源目录:{source_dir}")
    print(f"输出目录:{output_dir}")
    print(f"最大分包大小:{max_size / 1024 / 1024:.2f} MB\n")

    # 文件收集
    files = get_all_files(source_dir)
    if not files:
        raise RuntimeError("源目录中没有可压缩的文件")
    print(f"找到 {len(files)} 个待处理文件")

    # 智能分包
    packages = split_into_packages(files, max_size)
    print(f"生成 {len(packages)} 个分包\n")

    # 执行压缩
    results = []
    for idx, pkg in enumerate(packages, 1):
        try:
            print(f"[{idx}/{len(packages)}] 正在处理分包,包含 {len(pkg)} 个文件…")
            result_file = compress_package(pkg, source_dir, output_dir)
            results.append(os.path.basename(result_file))
        except Exception as e:
            print(f"错误:分包 {idx} 处理失败 – {str(e)}")
            if len(packages) == 1:  # 如果是唯一包则直接退出
                raise

    # 输出结果
    print("\n压缩完成,生成以下文件:")
    for name in results:
        print("\""+name+"\""+",")

if __name__ == '__main__':
    try:
        # 配置参数(按需修改)
        source_directory = r"/Resources"
        max_package_size = 100* 1024 * 1024  # 100MB
        output_directory = r"/zipout"
        
        main(source_directory, max_package_size, output_directory)
    except Exception as e:
        print(f"\n错误:{str(e)}")
        sys.exit(1)
 

作者:CreasyChan

物联沃分享整理
物联沃-IOTWORD物联网 » Python 3.8 分包脚本详解

发表回复