跳转至

下载与上传

文件(夹)下载

Downloader

文件下载管理类

功能: - 从云端下载单个文件到本地 - 从云端下载整个文件夹到本地

Source code in src/cpan123/Downloader.py
class Downloader:
    """文件下载管理类

    功能:
        - 从云端下载单个文件到本地
        - 从云端下载整个文件夹到本地

    """

    def __init__(self, auth: Auth, userinfo: UserInfoModel | None = None) -> None:
        self.auth = auth
        self.userinfo = userinfo
        self.file = File(auth, userinfo)
        self.file2 = File2(auth, userinfo)

    @validate_call
    def download_file(
        self,
        remote_path: str,
        local_path: Optional[str] = None,
        overwrite: bool = False,
        show_progress: bool = True,
    ) -> Optional[dict]:
        """从云端下载单个文件到本地

        Args:
            remote_path: 云端文件路径(绝对路径,如 "/folder/file.txt")
            local_path: 本地保存路径。如果为 None,保存到当前目录并使用云端文件名
            overwrite: 是否覆盖已存在的本地文件
            show_progress: 是否显示下载进度

        Returns:
            下载信息字典,包含 url、remote_path、local_path、filename、md5

        Example:
            ```python
            # 下载到当前目录
            downloader.download_file("/folder/file.txt")

            # 下载到指定位置
            downloader.download_file("/folder/file.txt", "downloads/myfile.txt")
            ```
        """
        # 转换为 PurePosixPath 处理云端路径
        cloud_path = PurePosixPath(remote_path)

        # 验证路径
        if not cloud_path.is_absolute():
            cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

        if not cloud_path.suffix:
            raise ValueError(f"路径似乎不是文件(没有文件扩展名): {cloud_path}")

        # 确定本地保存路径
        if local_path is None:
            save_path = Path(cloud_path.name)
        else:
            save_path = Path(local_path)

        # 检查本地文件是否存在
        if save_path.exists() and not overwrite:
            print(f"⚠️ 文件 {save_path} 已存在,跳过下载(使用 overwrite=True 强制覆盖)")
            return None

        # 查找云端文件
        fileId, fileItem = self._find_file_by_path(cloud_path)
        if not fileId or not fileItem:
            print(f"❌ 云端找不到文件: {cloud_path}")
            return None

        # 获取下载链接
        respjson = self.file.download_info(fileId)
        download_url = respjson.get("data", {}).get("downloadUrl", "")
        if not download_url:
            print(f"❌ 无法获取下载链接: {cloud_path}")
            return None

        # 构建返回信息
        download_info = {
            "url": download_url,
            "remote_path": str(cloud_path),
            "local_path": str(save_path),
            "filename": fileItem["filename"],
            "md5": fileItem["etag"],
        }

        # 确保父目录存在
        save_path.parent.mkdir(parents=True, exist_ok=True)

        # 下载文件
        try:
            if show_progress:
                print(f"📥 下载: {cloud_path} -> {save_path}")

            download_file(
                url=download_url,
                output_path=str(save_path),
                md5=fileItem["etag"],
                overwrite=overwrite,
                max_tries=5,
                retry_seconds=2,
            )

            if show_progress:
                print(f"✅ 下载完成: {save_path}")

            return download_info
        except Exception as e:
            print(f"❌ 下载失败: {e}")
            return None

    @validate_call
    def download_folder(
        self,
        remote_path: str,
        local_path: Optional[str] = None,
        overwrite: bool = False,
        show_progress: bool = True,
    ) -> dict:
        """从云端下载整个文件夹到本地

        Args:
            remote_path: 云端文件夹路径(绝对路径,如 "/folder")
            local_path: 本地保存目录。如果为 None,使用云端文件夹名作为目录名
            overwrite: 是否覆盖已存在的本地文件
            show_progress: 是否显示下载进度

        Returns:
            下载统计信息字典,包含 total、succeeded、failed

        Example:
            ```python
            # 下载到当前目录(会创建文件夹名的目录)
            downloader.download_folder("/my_folder")
            # 结果:./my_folder/...

            # 下载到指定目录
            downloader.download_folder("/my_folder", "downloads")
            # 结果:./downloads/...
            ```
        """
        # 转换为 PurePosixPath 处理云端路径
        cloud_path = PurePosixPath(remote_path)

        # 验证路径
        if not cloud_path.is_absolute():
            cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

        if cloud_path.suffix:
            raise ValueError(f"路径似乎是文件而不是文件夹: {cloud_path}")

        if cloud_path == PurePosixPath("/"):
            raise ValueError("不支持下载根目录,请指定具体文件夹")

        # 确定本地保存目录
        if local_path is None:
            save_dir = Path(cloud_path.name)
        else:
            save_dir = Path(local_path)

        save_dir.mkdir(parents=True, exist_ok=True)

        # 查找云端文件夹
        fileId, _ = self._find_file_by_path(cloud_path, is_dir=True)
        if not fileId:
            print(f"❌ 云端找不到文件夹: {cloud_path}")
            return {"total": 0, "succeeded": 0, "failed": 0, "files": []}

        # 获取文件夹中的所有文件
        file_list = self._get_file_list(fileId, current_path=str(cloud_path), base_path=str(cloud_path))

        # 过滤掉目录,只保留文件
        files_to_download = [f for f in file_list if f["type"] == 0 and f["trashed"] == 0]

        if not files_to_download:
            print(f"⚠️ 文件夹为空: {cloud_path}")
            return {"total": 0, "succeeded": 0, "failed": 0, "files": []}

        # 下载统计
        total = len(files_to_download)
        succeeded = 0
        failed = 0
        results = []

        if show_progress:
            print(f"📦 开始下载文件夹: {cloud_path} ({total} 个文件)")

        # 逐个下载文件
        pbar = tqdm(total=total, desc="下载进度", unit="file", disable=not show_progress)

        for file_info in files_to_download:
            try:
                # 构建本地路径(保持目录结构)
                rel_path = file_info["relative_path"]
                local_file_path = save_dir / rel_path

                # 确保父目录存在
                local_file_path.parent.mkdir(parents=True, exist_ok=True)

                # 检查是否需要下载
                if local_file_path.exists() and not overwrite:
                    succeeded += 1
                    results.append({"file": rel_path, "status": "skipped"})
                    pbar.update(1)
                    continue

                # 获取下载链接
                download_url = self.file.download_info(file_info["fileId"]).get("data", {}).get("downloadUrl", "")
                if not download_url:
                    failed += 1
                    results.append({"file": rel_path, "status": "failed", "error": "无法获取下载链接"})
                    pbar.update(1)
                    continue

                # 下载文件
                download_file(
                    url=download_url,
                    output_path=str(local_file_path),
                    md5=file_info["etag"],
                    verbose=False,
                    overwrite=overwrite,
                    max_tries=3,
                    retry_seconds=1,
                )

                succeeded += 1
                results.append({"file": rel_path, "status": "success"})
                pbar.update(1)

            except Exception as e:
                failed += 1
                results.append({"file": file_info.get("relative_path", "unknown"), "status": "failed", "error": str(e)})
                pbar.update(1)

        pbar.close()

        if show_progress:
            print(f"✅ 下载完成: 总计 {total} 个文件,成功 {succeeded} 个,失败 {failed} 个")

        return {
            "total": total,
            "succeeded": succeeded,
            "failed": failed,
            "files": results,
            "local_path": str(save_dir),
        }

    # ==================== 内部辅助方法 ====================

    def _find_file_by_path(self, cloud_path: PurePosixPath, is_dir: bool = False) -> tuple[Optional[int], Optional[dict]]:
        """根据云端路径查找文件或文件夹的 ID"""
        if not cloud_path.is_absolute():
            return None, None

        parts = cloud_path.parts[1:]  # 去掉根 "/"
        current_id = 0  # 从根目录开始
        current_item = None

        for index, name in enumerate(parts):
            found = False
            last_file_id = None

            while True:
                resjson = self.file.list_v2(parentFileId=current_id, lastFileId=last_file_id, limit=100)
                file_list = resjson.get("data", {}).get("fileList", [])

                if not file_list:
                    break

                for item in file_list:
                    if item["filename"] == name and item["trashed"] == 0:
                        # 中间路径必须是目录
                        if index < len(parts) - 1 and item["type"] != 1:
                            continue

                        # 最后一部分根据 is_dir 判断
                        if index == len(parts) - 1:
                            expected_type = 1 if is_dir else 0
                            if item["type"] != expected_type:
                                continue

                        current_id = item["fileId"]
                        current_item = item
                        found = True
                        break

                if found:
                    break

                last_file_id = resjson.get("data", {}).get("lastFileId", -1)
                if last_file_id == -1:
                    break

            if not found:
                return None, None

        return current_id, current_item

    def _get_file_list(self, parent_id: int, current_path: str = "", base_path: str = "") -> List[dict]:
        """递归获取文件夹下的所有文件"""
        file_list = []
        last_file_id = None

        while True:
            resjson = self.file.list_v2(parentFileId=parent_id, lastFileId=last_file_id, limit=100)

            if not resjson.get("data") or not resjson["data"].get("fileList"):
                break

            for item in resjson["data"]["fileList"]:
                # 构建完整路径(保持 / 开头)
                if current_path:
                    item_path = f"{current_path}/{item['filename']}"
                else:
                    item_path = f"/{item['filename']}"

                item["full_path"] = item_path

                # 计算相对路径
                if base_path:
                    try:
                        # 使用 PurePosixPath 计算相对路径
                        relative_path = str(PurePosixPath(item_path).relative_to(base_path))
                    except ValueError:
                        # 如果失败,使用文件名
                        relative_path = item["filename"]
                else:
                    relative_path = item["filename"]

                item["relative_path"] = relative_path

                # 如果是目录,递归获取子文件
                if item["type"] == 1:
                    file_list.extend(self._get_file_list(item["fileId"], item_path, base_path))
                else:
                    file_list.append(item)

            last_file_id = resjson.get("data", {}).get("lastFileId", -1)
            if last_file_id == -1:
                break

        return file_list

    @validate_call
    def download(
        self,
        remote_path: str,
        local_path: Optional[str] = None,
        overwrite: bool = False,
        show_progress: bool = True,
    ) -> Optional[dict]:
        """自动判断远端路径是文件还是文件夹并下载。

        如果 remote_path 指向文件夹,调用 download_folder;如果指向文件,调用 download_file。

        Returns:
            download_file 返回的 dict(单文件)或 download_folder 返回的统计 dict(文件夹)。
        """
        cloud_path = PurePosixPath(remote_path)

        # 规范化云端路径
        if not cloud_path.is_absolute():
            cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

        # 先尝试按文件夹查找
        folder_id, _ = self._find_file_by_path(cloud_path, is_dir=True)
        if folder_id:
            # 如果是文件夹,调用 download_folder
            return self.download_folder(remote_path, local_path=local_path, overwrite=overwrite, show_progress=show_progress)

        # 再尝试按文件查找
        file_id, _ = self._find_file_by_path(cloud_path, is_dir=False)
        if file_id:
            return self.download_file(remote_path, local_path=local_path, overwrite=overwrite, show_progress=show_progress)

        # 如果两者都找不到,尝试列出父目录看是否存在类似名称(容错)
        # 例如:用户传入的路径可能带/或不带后缀
        print(f"❌ 云端找不到路径: {cloud_path}")
        return None

download_file

download_file(
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> Optional[dict]

从云端下载单个文件到本地

Parameters:

Name Type Description Default
remote_path str

云端文件路径(绝对路径,如 "/folder/file.txt")

required
local_path Optional[str]

本地保存路径。如果为 None,保存到当前目录并使用云端文件名

None
overwrite bool

是否覆盖已存在的本地文件

False
show_progress bool

是否显示下载进度

True

Returns:

Type Description
Optional[dict]

下载信息字典,包含 url、remote_path、local_path、filename、md5

Example
1
2
3
4
5
# 下载到当前目录
downloader.download_file("/folder/file.txt")

# 下载到指定位置
downloader.download_file("/folder/file.txt", "downloads/myfile.txt")
Source code in src/cpan123/Downloader.py
@validate_call
def download_file(
    self,
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> Optional[dict]:
    """从云端下载单个文件到本地

    Args:
        remote_path: 云端文件路径(绝对路径,如 "/folder/file.txt")
        local_path: 本地保存路径。如果为 None,保存到当前目录并使用云端文件名
        overwrite: 是否覆盖已存在的本地文件
        show_progress: 是否显示下载进度

    Returns:
        下载信息字典,包含 url、remote_path、local_path、filename、md5

    Example:
        ```python
        # 下载到当前目录
        downloader.download_file("/folder/file.txt")

        # 下载到指定位置
        downloader.download_file("/folder/file.txt", "downloads/myfile.txt")
        ```
    """
    # 转换为 PurePosixPath 处理云端路径
    cloud_path = PurePosixPath(remote_path)

    # 验证路径
    if not cloud_path.is_absolute():
        cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

    if not cloud_path.suffix:
        raise ValueError(f"路径似乎不是文件(没有文件扩展名): {cloud_path}")

    # 确定本地保存路径
    if local_path is None:
        save_path = Path(cloud_path.name)
    else:
        save_path = Path(local_path)

    # 检查本地文件是否存在
    if save_path.exists() and not overwrite:
        print(f"⚠️ 文件 {save_path} 已存在,跳过下载(使用 overwrite=True 强制覆盖)")
        return None

    # 查找云端文件
    fileId, fileItem = self._find_file_by_path(cloud_path)
    if not fileId or not fileItem:
        print(f"❌ 云端找不到文件: {cloud_path}")
        return None

    # 获取下载链接
    respjson = self.file.download_info(fileId)
    download_url = respjson.get("data", {}).get("downloadUrl", "")
    if not download_url:
        print(f"❌ 无法获取下载链接: {cloud_path}")
        return None

    # 构建返回信息
    download_info = {
        "url": download_url,
        "remote_path": str(cloud_path),
        "local_path": str(save_path),
        "filename": fileItem["filename"],
        "md5": fileItem["etag"],
    }

    # 确保父目录存在
    save_path.parent.mkdir(parents=True, exist_ok=True)

    # 下载文件
    try:
        if show_progress:
            print(f"📥 下载: {cloud_path} -> {save_path}")

        download_file(
            url=download_url,
            output_path=str(save_path),
            md5=fileItem["etag"],
            overwrite=overwrite,
            max_tries=5,
            retry_seconds=2,
        )

        if show_progress:
            print(f"✅ 下载完成: {save_path}")

        return download_info
    except Exception as e:
        print(f"❌ 下载失败: {e}")
        return None

download_folder

download_folder(
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> dict

从云端下载整个文件夹到本地

Parameters:

Name Type Description Default
remote_path str

云端文件夹路径(绝对路径,如 "/folder")

required
local_path Optional[str]

本地保存目录。如果为 None,使用云端文件夹名作为目录名

None
overwrite bool

是否覆盖已存在的本地文件

False
show_progress bool

是否显示下载进度

True

Returns:

Type Description
dict

下载统计信息字典,包含 total、succeeded、failed

Example
1
2
3
4
5
6
7
# 下载到当前目录(会创建文件夹名的目录)
downloader.download_folder("/my_folder")
# 结果:./my_folder/...

# 下载到指定目录
downloader.download_folder("/my_folder", "downloads")
# 结果:./downloads/...
Source code in src/cpan123/Downloader.py
@validate_call
def download_folder(
    self,
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> dict:
    """从云端下载整个文件夹到本地

    Args:
        remote_path: 云端文件夹路径(绝对路径,如 "/folder")
        local_path: 本地保存目录。如果为 None,使用云端文件夹名作为目录名
        overwrite: 是否覆盖已存在的本地文件
        show_progress: 是否显示下载进度

    Returns:
        下载统计信息字典,包含 total、succeeded、failed

    Example:
        ```python
        # 下载到当前目录(会创建文件夹名的目录)
        downloader.download_folder("/my_folder")
        # 结果:./my_folder/...

        # 下载到指定目录
        downloader.download_folder("/my_folder", "downloads")
        # 结果:./downloads/...
        ```
    """
    # 转换为 PurePosixPath 处理云端路径
    cloud_path = PurePosixPath(remote_path)

    # 验证路径
    if not cloud_path.is_absolute():
        cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

    if cloud_path.suffix:
        raise ValueError(f"路径似乎是文件而不是文件夹: {cloud_path}")

    if cloud_path == PurePosixPath("/"):
        raise ValueError("不支持下载根目录,请指定具体文件夹")

    # 确定本地保存目录
    if local_path is None:
        save_dir = Path(cloud_path.name)
    else:
        save_dir = Path(local_path)

    save_dir.mkdir(parents=True, exist_ok=True)

    # 查找云端文件夹
    fileId, _ = self._find_file_by_path(cloud_path, is_dir=True)
    if not fileId:
        print(f"❌ 云端找不到文件夹: {cloud_path}")
        return {"total": 0, "succeeded": 0, "failed": 0, "files": []}

    # 获取文件夹中的所有文件
    file_list = self._get_file_list(fileId, current_path=str(cloud_path), base_path=str(cloud_path))

    # 过滤掉目录,只保留文件
    files_to_download = [f for f in file_list if f["type"] == 0 and f["trashed"] == 0]

    if not files_to_download:
        print(f"⚠️ 文件夹为空: {cloud_path}")
        return {"total": 0, "succeeded": 0, "failed": 0, "files": []}

    # 下载统计
    total = len(files_to_download)
    succeeded = 0
    failed = 0
    results = []

    if show_progress:
        print(f"📦 开始下载文件夹: {cloud_path} ({total} 个文件)")

    # 逐个下载文件
    pbar = tqdm(total=total, desc="下载进度", unit="file", disable=not show_progress)

    for file_info in files_to_download:
        try:
            # 构建本地路径(保持目录结构)
            rel_path = file_info["relative_path"]
            local_file_path = save_dir / rel_path

            # 确保父目录存在
            local_file_path.parent.mkdir(parents=True, exist_ok=True)

            # 检查是否需要下载
            if local_file_path.exists() and not overwrite:
                succeeded += 1
                results.append({"file": rel_path, "status": "skipped"})
                pbar.update(1)
                continue

            # 获取下载链接
            download_url = self.file.download_info(file_info["fileId"]).get("data", {}).get("downloadUrl", "")
            if not download_url:
                failed += 1
                results.append({"file": rel_path, "status": "failed", "error": "无法获取下载链接"})
                pbar.update(1)
                continue

            # 下载文件
            download_file(
                url=download_url,
                output_path=str(local_file_path),
                md5=file_info["etag"],
                verbose=False,
                overwrite=overwrite,
                max_tries=3,
                retry_seconds=1,
            )

            succeeded += 1
            results.append({"file": rel_path, "status": "success"})
            pbar.update(1)

        except Exception as e:
            failed += 1
            results.append({"file": file_info.get("relative_path", "unknown"), "status": "failed", "error": str(e)})
            pbar.update(1)

    pbar.close()

    if show_progress:
        print(f"✅ 下载完成: 总计 {total} 个文件,成功 {succeeded} 个,失败 {failed} 个")

    return {
        "total": total,
        "succeeded": succeeded,
        "failed": failed,
        "files": results,
        "local_path": str(save_dir),
    }

download

download(
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> Optional[dict]

自动判断远端路径是文件还是文件夹并下载。

如果 remote_path 指向文件夹,调用 download_folder;如果指向文件,调用 download_file。

Returns:

Type Description
Optional[dict]

download_file 返回的 dict(单文件)或 download_folder 返回的统计 dict(文件夹)。

Source code in src/cpan123/Downloader.py
@validate_call
def download(
    self,
    remote_path: str,
    local_path: Optional[str] = None,
    overwrite: bool = False,
    show_progress: bool = True,
) -> Optional[dict]:
    """自动判断远端路径是文件还是文件夹并下载。

    如果 remote_path 指向文件夹,调用 download_folder;如果指向文件,调用 download_file。

    Returns:
        download_file 返回的 dict(单文件)或 download_folder 返回的统计 dict(文件夹)。
    """
    cloud_path = PurePosixPath(remote_path)

    # 规范化云端路径
    if not cloud_path.is_absolute():
        cloud_path = PurePosixPath("/") / str(cloud_path).lstrip("./")

    # 先尝试按文件夹查找
    folder_id, _ = self._find_file_by_path(cloud_path, is_dir=True)
    if folder_id:
        # 如果是文件夹,调用 download_folder
        return self.download_folder(remote_path, local_path=local_path, overwrite=overwrite, show_progress=show_progress)

    # 再尝试按文件查找
    file_id, _ = self._find_file_by_path(cloud_path, is_dir=False)
    if file_id:
        return self.download_file(remote_path, local_path=local_path, overwrite=overwrite, show_progress=show_progress)

    # 如果两者都找不到,尝试列出父目录看是否存在类似名称(容错)
    # 例如:用户传入的路径可能带/或不带后缀
    print(f"❌ 云端找不到路径: {cloud_path}")
    return None

文件(夹)上传

Uploader

高层次上传器,封装 File2 接口,支持: - 单个文件分片上传 - 单个文件单步上传 - 目录上传(可多线程,策略可选)

约定: - 单步上传服务域名通过 domain 接口获取 - 分片上传 slice 使用 create 返回的 servers 任意其一

Source code in src/cpan123/Uploader.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
class Uploader:
    """高层次上传器,封装 File2 接口,支持:
    - 单个文件分片上传
    - 单个文件单步上传
    - 目录上传(可多线程,策略可选)

    约定:
    - 单步上传服务域名通过 domain 接口获取
    - 分片上传 slice 使用 create 返回的 servers 任意其一
    """

    def __init__(self, auth: Auth, userinfo: UserInfoModel | None = None) -> None:
        """初始化

        Args:
            auth (Auth): 已授权的 Auth 实例
            userinfo (UserInfoModel | None): 用户信息模型,默认为 None
        """
        self.auth = auth
        self.userinfo = userinfo
        self.file2 = File2(auth, userinfo=userinfo)
        self.chunked_threshold_bytes = 10 * 1024 * 1024

    # ---------------------- 单文件:分片上传 ----------------------
    def upload_file_chunked(
        self,
        file_path: str | Path,
        parentFileID: int = 0,
        duplicate: int = 1,
        containDir: bool = False,
        server: Optional[str] = None,
        slice_size: Optional[int] = None,
        poll_timeout_sec: int = 300,
        remote_path: Optional[str] = None,
        show_progress: bool = True,
        status_callback: Optional[Callable[[dict], None]] = None,
    ) -> dict:
        """按分片上传单个文件。

        Args:
            file_path: 本地文件路径
            parentFileID: 父目录 id
            duplicate: 重名策略(1 保留两者,2 覆盖原文件)
            containDir: 是否携带路径(为 True 时 filename 传相对路径)
            server: 指定上传域名(可选,不传使用 create 返回 servers[0])
            slice_size: 指定分片大小(可选,不传使用服务端下发 sliceSize)
            poll_timeout_sec: upload_complete 轮询超时时间
            show_progress: 是否显示上传进度(默认 True)
            status_callback: 上传状态回调,用于上层汇报阶段变化

        Returns:
            服务端最终响应 data 字段
        """
        path = Path(file_path)
        assert path.exists() and path.is_file(), f"文件不存在: {path}"

        size = path.stat().st_size
        size_text = self._format_size(size)
        etag = calculate_md5(path)
        # 生成 filename,是否带目录由 containDir 决定
        if not containDir:
            filename = path.name
        else:
            # 允许外部传入远程相对路径,避免把本机绝对路径上传
            if remote_path:
                filename = remote_path
            else:
                # 兜底:使用文件名(不带目录)
                filename = path.name

        # 1) 创建上传任务
        resp = self.file2.create(
            parentFileID=parentFileID,
            filename=filename,
            etag=etag,
            size=size,
            duplicate=duplicate,
            containDir=containDir,
        )

        # 检查响应状态
        if resp.get("code") != 0:
            error_msg = resp.get("message", "未知错误")
            raise RuntimeError(f"创建上传任务失败: {error_msg}")

        data = resp.get("data")
        if not data:
            raise RuntimeError("创建上传任务失败:响应数据为空")

        # 秒传
        if data.get("reuse") is True and data.get("fileID", 0) != 0:
            return data

        preuploadID = data.get("preuploadID")
        if not preuploadID:
            raise RuntimeError("创建分片任务失败:缺少 preuploadID")

        real_slice = int(slice_size or data.get("sliceSize") or 16 * 1024 * 1024)
        upload_server = server or (data.get("servers") or [None])[0]
        if not upload_server:
            raise RuntimeError("创建分片任务失败:缺少可用上传域名 servers")

        # 2) 逐片上传
        total_parts = math.ceil(size / real_slice) if real_slice else 0

        if status_callback:
            status_callback(
                {
                    "stage": "uploading",
                    "file": str(path),
                    "name": path.name,
                    "size": size,
                    "size_text": size_text,
                    "uploaded_parts": 0,
                    "total_parts": total_parts,
                }
            )

        if show_progress:
            with tqdm(
                total=size,
                unit="B",
                unit_scale=True,
                unit_divisor=1024,
                desc=f"上传 {filename}",
            ) as pbar:
                with path.open("rb") as f:
                    for part_no in range(1, total_parts + 1):
                        chunk = f.read(real_slice)
                        if not chunk:
                            break

                        # 使用 File2.slice 方法上传分片,传入上传服务器地址
                        self.file2.slice(
                            preuploadID=preuploadID,
                            sliceNo=part_no,
                            sliceMD5=Uploader.__md5_bytes(chunk),
                            slice=chunk,
                            upload_server=upload_server,
                        )

                        if status_callback:
                            status_callback(
                                {
                                    "stage": "uploading",
                                    "file": str(path),
                                    "name": path.name,
                                    "size": size,
                                    "size_text": size_text,
                                    "uploaded_parts": part_no,
                                    "total_parts": total_parts,
                                }
                            )

                        # 更新进度条
                        pbar.update(len(chunk))
        else:
            with path.open("rb") as f:
                for part_no in range(1, total_parts + 1):
                    chunk = f.read(real_slice)
                    if not chunk:
                        break

                    # 使用 File2.slice 方法上传分片,传入上传服务器地址
                    self.file2.slice(
                        preuploadID=preuploadID,
                        sliceNo=part_no,
                        sliceMD5=Uploader.__md5_bytes(chunk),
                        slice=chunk,
                        upload_server=upload_server,
                    )

                    if status_callback:
                        status_callback(
                            {
                                "stage": "uploading",
                                "file": str(path),
                                "name": path.name,
                                "size": size,
                                "size_text": size_text,
                                "uploaded_parts": part_no,
                                "total_parts": total_parts,
                            }
                        )

        # 3) 完成上传(必要时轮询)
        start = time.time()
        if status_callback:
            status_callback(
                {
                    "stage": "confirming",
                    "file": str(path),
                    "name": path.name,
                    "size": size,
                    "size_text": size_text,
                    "uploaded_parts": total_parts,
                    "total_parts": total_parts,
                }
            )
        while True:
            time.sleep(2)
            done = self.file2.upload_complete(preuploadID=preuploadID)

            # 检查响应状态
            # code=20103 表示文件正在校验中,需要继续轮询
            code = done.get("code", 0)
            if code != 0 and code != 20103:
                error_msg = done.get("message", "未知错误")
                raise RuntimeError(f"完成上传失败: {error_msg}")

            d = done.get("data")
            if d and d.get("completed") and d.get("fileID", 0) != 0:
                if status_callback:
                    status_callback(
                        {
                            "stage": "done",
                            "file": str(path),
                            "name": path.name,
                            "size": size,
                            "size_text": size_text,
                            "uploaded_parts": total_parts,
                            "total_parts": total_parts,
                        }
                    )
                return d

            if time.time() - start > poll_timeout_sec:
                if status_callback:
                    status_callback(
                        {
                            "stage": "timeout",
                            "file": str(path),
                            "name": path.name,
                            "size": size,
                            "size_text": size_text,
                            "uploaded_parts": total_parts,
                            "total_parts": total_parts,
                        }
                    )
                raise TimeoutError("上传完成确认超时")

    # ---------------------- 单文件:单步上传 ----------------------
    def upload_file_single(
        self,
        file_path: str | Path,
        parentFileID: int = 0,
        duplicate: int = 1,
        containDir: bool = False,
        domain: Optional[str] = None,
        single_limit_bytes: int = 1 * 1024 * 1024 * 1024,  # 1GB(接口限制)
        remote_path: Optional[str] = None,
    ) -> dict:
        """单步上传单个文件(小文件)。

        Args:
            file_path: 本地文件路径
            parentFileID: 父目录 id
            duplicate: 重名策略(1 保留两者,2 覆盖原文件)
            containDir: 是否携带路径
            domain: 指定上传域名(不传则自动获取)
            single_limit_bytes: 单步上传大小限制
        """
        path = Path(file_path)
        assert path.exists() and path.is_file(), f"文件不存在: {path}"
        size = path.stat().st_size
        if size > single_limit_bytes:
            raise ValueError("文件过大,请使用分片上传")

        etag = calculate_md5(path)
        if not containDir:
            filename = path.name
        else:
            if remote_path:
                filename = remote_path
            else:
                filename = path.name

        # 选择上传域名
        upload_domain = domain
        if not upload_domain:
            domains = self.file2.domain()

            # 检查响应状态
            if domains.get("code") != 0:
                error_msg = domains.get("message", "未知错误")
                raise RuntimeError(f"获取上传域名失败: {error_msg}")

            arr = domains.get("data", [])
            upload_domain = (arr or [None])[0]
        if not upload_domain:
            raise RuntimeError("获取上传域名失败")

        # 使用 File2.single_create 方法上传,传入上传服务器地址
        with path.open("rb") as fp:
            file_content = fp.read()

        res = self.file2.single_create(
            parentFileID=parentFileID,
            filename=filename,
            etag=etag,
            size=size,
            file=file_content,
            duplicate=duplicate,
            containDir=containDir,
            upload_server=upload_domain,
        )

        # 检查响应状态
        if res.get("code") != 0:
            error_msg = res.get("message", "未知错误")
            raise RuntimeError(f"单步上传失败: {error_msg}")

        data = res.get("data")
        if not data:
            raise RuntimeError("单步上传失败:响应数据为空")

        if data.get("completed") and data.get("fileID", 0) != 0:
            return data
        raise RuntimeError(f"单步上传失败: {res}")

    # ---------------------- 目录上传 ----------------------
    def upload_folder(
        self,
        folder_path: str | Path,
        parentFileID: int = 0,
        method: str = "chunked",  # 'single' | 'chunked' | 'auto'
        max_workers: int = 4,
        single_limit_bytes: int = 1 * 1024 * 1024 * 1024,
        duplicate: int = 1,
        contain_dir: bool = True,
        show_progress: bool = True,
    ) -> dict:
        """上传整个文件夹。

        - method='single':全部使用单步上传(大于限制的文件会自动切换分片)
        - method='chunked':全部使用分片上传
        - method='auto':小于分片阈值走单步,其他分片
        - 当 contain_dir=True 时,远程路径会包含根目录本身,例如:/文件夹名/子目录/文件
        - show_progress: 是否显示上传进度(默认 True)

        返回 (本地文件, data) 列表
        """
        root = Path(folder_path)
        assert root.exists() and root.is_dir(), f"目录不存在: {root}"

        files: list[tuple[Path, str]] = []  # (真实路径, 远程相对路径)
        for p in root.rglob("*"):
            if p.is_file():
                # 统一相对路径(用于 containDir)
                rel = p.relative_to(root)
                if contain_dir:
                    # 包含根目录本身,但不加开头的 /
                    remote = f"{root.name}/{rel.as_posix()}"
                else:
                    remote = p.name
                files.append((p, remote))

        results: list[tuple[Path, dict]] = []
        status_map: dict[str, dict] = {}
        status_lock = Lock()

        def _set_status(payload: dict) -> None:
            file_key = str(payload.get("file", ""))
            if not file_key:
                return
            with status_lock:
                status_map[file_key] = payload

        def _clear_status(real_path: Path) -> None:
            with status_lock:
                status_map.pop(str(real_path), None)

        def _progress_text() -> str:
            with status_lock:
                if not status_map:
                    return "空闲"
                snapshots = list(status_map.values())

            priority = {"confirming": 0, "uploading": 1, "failed": 2, "timeout": 3, "done": 4}
            snapshots.sort(
                key=lambda item: (
                    priority.get(str(item.get("stage")), 99),
                    str(item.get("name", "")),
                )
            )
            current = snapshots[0]
            stage = str(current.get("stage", "uploading"))
            name = str(current.get("name", ""))
            size_text = str(current.get("size_text", ""))
            uploaded_parts = int(current.get("uploaded_parts", 0) or 0)
            total_parts = int(current.get("total_parts", 0) or 0)

            if stage == "confirming":
                stage_text = "等待服务端确认"
            elif stage == "uploading":
                if total_parts > 0:
                    stage_text = f"上传分片 {uploaded_parts}/{total_parts}"
                else:
                    stage_text = "上传中"
            elif stage == "failed":
                stage_text = "上传失败"
            elif stage == "timeout":
                stage_text = "确认超时"
            else:
                stage_text = "处理中"

            active_count = len(snapshots)
            return f"活跃: {active_count} | {stage_text} | {name} ({size_text})"

        def _result_text(real_path: Path, item: dict) -> str:
            size_text = self._format_size(real_path.stat().st_size)
            if isinstance(item, dict) and "error" in item:
                return f"失败: {real_path.name} ({size_text}) -> {item.get('error')}"

            inner = item.get("data") if isinstance(item, dict) and isinstance(item.get("data"), dict) else item
            if isinstance(inner, dict):
                file_id = int(inner.get("fileID", 0) or 0)
                completed = bool(inner.get("completed"))
                reuse = bool(inner.get("reuse"))
                if file_id != 0 and (completed or reuse):
                    if reuse:
                        return f"已秒传: {real_path.name} ({size_text})"
                    return f"已上传: {real_path.name} ({size_text})"

            return f"失败: {real_path.name} ({size_text}) -> unexpected response"

        def _task(item: tuple[Path, str]) -> tuple[Path, dict]:
            real_path, remote = item
            size = real_path.stat().st_size
            try:
                use_chunked = method == "chunked" or (method == "auto" and size >= self.chunked_threshold_bytes)
                if use_chunked:
                    data = self.upload_file_chunked(
                        real_path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                        remote_path=remote,
                        show_progress=False,  # 多线程环境下禁用单文件进度条
                        status_callback=_set_status,
                    )
                else:
                    _set_status(
                        {
                            "stage": "uploading",
                            "file": str(real_path),
                            "name": real_path.name,
                            "size": size,
                            "size_text": self._format_size(size),
                            "uploaded_parts": 0,
                            "total_parts": 0,
                        }
                    )
                    data = self.upload_file_single(
                        real_path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                        single_limit_bytes=single_limit_bytes,
                        remote_path=remote,
                    )
                    _set_status(
                        {
                            "stage": "done",
                            "file": str(real_path),
                            "name": real_path.name,
                            "size": size,
                            "size_text": self._format_size(size),
                            "uploaded_parts": 0,
                            "total_parts": 0,
                        }
                    )
                return (real_path, data)
            except Exception as e:  # 汇总错误,便于一次性查看
                _set_status(
                    {
                        "stage": "failed",
                        "file": str(real_path),
                        "name": real_path.name,
                        "size": size,
                        "size_text": self._format_size(size),
                        "uploaded_parts": 0,
                        "total_parts": 0,
                    }
                )
                return (real_path, {"error": str(e)})

        # 多线程提交任务
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            fut_map = {ex.submit(_task, item): item for item in files}

            if show_progress:
                # 显示文件级别的进度条
                with tqdm(total=len(files), desc="上传文件", unit="file") as pbar:
                    pbar.set_postfix_str(_progress_text())
                    for fut in as_completed(fut_map):
                        result = fut.result()
                        results.append(result)
                        tqdm.write(_result_text(result[0], result[1]))
                        _clear_status(result[0])
                        pbar.set_postfix_str(_progress_text())
                        pbar.update(1)
            else:
                for fut in as_completed(fut_map):
                    result = fut.result()
                    results.append(result)
                    _clear_status(result[0])

        # 规范化每个文件的返回,统一为 {code, message, success, data}
        normalized: dict[str, dict] = {}
        for real_path, item in results:
            key = str(real_path)
            # 异常场景
            if isinstance(item, dict) and "error" in item:
                normalized[key] = {
                    "code": 1,
                    "message": str(item.get("error")),
                    "success": False,
                    "data": None,
                }
                continue

            # 解包外层 {code,message,data} 与直接 data 两种形态
            inner = item.get("data") if isinstance(item, dict) and isinstance(item.get("data"), dict) else item
            if not isinstance(inner, dict):
                normalized[key] = {
                    "code": 1,
                    "message": "unexpected response",
                    "success": False,
                    "data": None,
                }
                continue

            file_id = int(inner.get("fileID", 0) or 0)
            completed = bool(inner.get("completed"))
            reuse = bool(inner.get("reuse"))
            success = file_id != 0 and (completed or reuse)

            normalized[key] = {
                "code": 0 if success else 1,
                "message": "ok" if success else "failed",
                "success": success,
                "data": inner,
            }

        # 统计
        total = len(normalized)
        succeeded = sum(1 for v in normalized.values() if v.get("success"))
        failed = total - succeeded

        return {
            "code": 0 if failed == 0 else 1,
            "message": "ok" if failed == 0 else "partial failure",
            "total": total,
            "succeeded": succeeded,
            "failed": failed,
            "data": normalized,
            "x-traceID": "my-trace-id",
        }

    @staticmethod
    def __md5_bytes(b: bytes) -> str:
        h = hashlib.md5()
        h.update(b)
        return h.hexdigest()

    @staticmethod
    def _format_size(size: int) -> str:
        """将字节数转换为人类可读格式"""
        size_float = float(size)
        for unit in ["B", "KB", "MB", "GB", "TB"]:
            if size_float < 1024.0:
                return f"{size_float:.1f}{unit}"
            size_float /= 1024.0
        return f"{size_float:.1f}PB"

    # 如果选择的是文件,则调用 upload_file_chunked
    # 如果是目录,则调用 upload_folder
    def upload(
        self,
        file_path: str | Path,
        parentFileID: int = 0,
        duplicate: int = 1,
        contain_dir: bool = True,
        show_progress: bool = True,
        method: str = "chunked",
    ) -> dict:
        """根据路径类型选择上传方式。

        Args:
            file_path: 本地文件或目录路径
            parentFileID: 父目录 id
            duplicate: 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件)
            show_progress: 是否显示上传进度(默认 True)
            method: 上传方式,默认统一走分片上传

        Returns:
            统一汇总后的上传结果

        """
        path = Path(file_path)

        if path.is_file():
            try:
                if method == "chunked":
                    data = self.upload_file_chunked(
                        file_path=path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                        show_progress=show_progress,
                    )
                elif method == "single":
                    data = self.upload_file_single(
                        file_path=path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                    )
                else:
                    size = path.stat().st_size
                    if size >= self.chunked_threshold_bytes:
                        data = self.upload_file_chunked(
                            file_path=path,
                            parentFileID=parentFileID,
                            duplicate=duplicate,
                            containDir=contain_dir,
                            show_progress=show_progress,
                        )
                    else:
                        data = self.upload_file_single(
                            file_path=path,
                            parentFileID=parentFileID,
                            duplicate=duplicate,
                            containDir=contain_dir,
                        )
            except Exception as e:
                return {
                    "code": 1,
                    "message": "failed",
                    "total": 1,
                    "succeeded": 0,
                    "failed": 1,
                    "data": {
                        str(path): {
                            "code": 1,
                            "message": str(e),
                            "success": False,
                            "data": None,
                        }
                    },
                    "x-traceID": "my-trace-id",
                }

            success = False
            if isinstance(data, dict):
                file_id = int(data.get("fileID", 0) or 0)
                completed = bool(data.get("completed"))
                reuse = bool(data.get("reuse"))
                success = file_id != 0 and (completed or reuse)

            resp = {
                "code": 0 if success else 1,
                "message": "ok" if success else "failed",
                "total": 1,
                "succeeded": 1 if success else 0,
                "failed": 0 if success else 1,
                "data": {},
                "x-traceID": "my-trace-id",
            }
            if not success:
                resp["data"] = {
                    str(path): {
                        "code": 1,
                        "message": "failed",
                        "success": False,
                        "data": data if isinstance(data, dict) else None,
                    }
                }
        elif path.is_dir():
            resp = self.upload_folder(
                folder_path=path,
                parentFileID=parentFileID,
                method=method,
                duplicate=1 if duplicate == 1 else 2,
                contain_dir=contain_dir,
                show_progress=show_progress,
            )
        else:
            raise ValueError(f"路径既不是文件也不是目录: {path}")
        # 单文件成功时不返回明细,避免输出过大
        if path.is_file() and resp.get("succeeded") == resp.get("total"):
            resp["data"] = {}
        return resp

__init__

__init__(
    auth: Auth, userinfo: UserInfoModel | None = None
) -> None

初始化

Parameters:

Name Type Description Default
auth Auth

已授权的 Auth 实例

required
userinfo UserInfoModel | None

用户信息模型,默认为 None

None
Source code in src/cpan123/Uploader.py
def __init__(self, auth: Auth, userinfo: UserInfoModel | None = None) -> None:
    """初始化

    Args:
        auth (Auth): 已授权的 Auth 实例
        userinfo (UserInfoModel | None): 用户信息模型,默认为 None
    """
    self.auth = auth
    self.userinfo = userinfo
    self.file2 = File2(auth, userinfo=userinfo)
    self.chunked_threshold_bytes = 10 * 1024 * 1024

upload_file_chunked

upload_file_chunked(
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    containDir: bool = False,
    server: Optional[str] = None,
    slice_size: Optional[int] = None,
    poll_timeout_sec: int = 300,
    remote_path: Optional[str] = None,
    show_progress: bool = True,
    status_callback: Optional[
        Callable[[dict], None]
    ] = None,
) -> dict

按分片上传单个文件。

Parameters:

Name Type Description Default
file_path str | Path

本地文件路径

required
parentFileID int

父目录 id

0
duplicate int

重名策略(1 保留两者,2 覆盖原文件)

1
containDir bool

是否携带路径(为 True 时 filename 传相对路径)

False
server Optional[str]

指定上传域名(可选,不传使用 create 返回 servers[0])

None
slice_size Optional[int]

指定分片大小(可选,不传使用服务端下发 sliceSize)

None
poll_timeout_sec int

upload_complete 轮询超时时间

300
show_progress bool

是否显示上传进度(默认 True)

True
status_callback Optional[Callable[[dict], None]]

上传状态回调,用于上层汇报阶段变化

None

Returns:

Type Description
dict

服务端最终响应 data 字段

Source code in src/cpan123/Uploader.py
def upload_file_chunked(
    self,
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    containDir: bool = False,
    server: Optional[str] = None,
    slice_size: Optional[int] = None,
    poll_timeout_sec: int = 300,
    remote_path: Optional[str] = None,
    show_progress: bool = True,
    status_callback: Optional[Callable[[dict], None]] = None,
) -> dict:
    """按分片上传单个文件。

    Args:
        file_path: 本地文件路径
        parentFileID: 父目录 id
        duplicate: 重名策略(1 保留两者,2 覆盖原文件)
        containDir: 是否携带路径(为 True 时 filename 传相对路径)
        server: 指定上传域名(可选,不传使用 create 返回 servers[0])
        slice_size: 指定分片大小(可选,不传使用服务端下发 sliceSize)
        poll_timeout_sec: upload_complete 轮询超时时间
        show_progress: 是否显示上传进度(默认 True)
        status_callback: 上传状态回调,用于上层汇报阶段变化

    Returns:
        服务端最终响应 data 字段
    """
    path = Path(file_path)
    assert path.exists() and path.is_file(), f"文件不存在: {path}"

    size = path.stat().st_size
    size_text = self._format_size(size)
    etag = calculate_md5(path)
    # 生成 filename,是否带目录由 containDir 决定
    if not containDir:
        filename = path.name
    else:
        # 允许外部传入远程相对路径,避免把本机绝对路径上传
        if remote_path:
            filename = remote_path
        else:
            # 兜底:使用文件名(不带目录)
            filename = path.name

    # 1) 创建上传任务
    resp = self.file2.create(
        parentFileID=parentFileID,
        filename=filename,
        etag=etag,
        size=size,
        duplicate=duplicate,
        containDir=containDir,
    )

    # 检查响应状态
    if resp.get("code") != 0:
        error_msg = resp.get("message", "未知错误")
        raise RuntimeError(f"创建上传任务失败: {error_msg}")

    data = resp.get("data")
    if not data:
        raise RuntimeError("创建上传任务失败:响应数据为空")

    # 秒传
    if data.get("reuse") is True and data.get("fileID", 0) != 0:
        return data

    preuploadID = data.get("preuploadID")
    if not preuploadID:
        raise RuntimeError("创建分片任务失败:缺少 preuploadID")

    real_slice = int(slice_size or data.get("sliceSize") or 16 * 1024 * 1024)
    upload_server = server or (data.get("servers") or [None])[0]
    if not upload_server:
        raise RuntimeError("创建分片任务失败:缺少可用上传域名 servers")

    # 2) 逐片上传
    total_parts = math.ceil(size / real_slice) if real_slice else 0

    if status_callback:
        status_callback(
            {
                "stage": "uploading",
                "file": str(path),
                "name": path.name,
                "size": size,
                "size_text": size_text,
                "uploaded_parts": 0,
                "total_parts": total_parts,
            }
        )

    if show_progress:
        with tqdm(
            total=size,
            unit="B",
            unit_scale=True,
            unit_divisor=1024,
            desc=f"上传 {filename}",
        ) as pbar:
            with path.open("rb") as f:
                for part_no in range(1, total_parts + 1):
                    chunk = f.read(real_slice)
                    if not chunk:
                        break

                    # 使用 File2.slice 方法上传分片,传入上传服务器地址
                    self.file2.slice(
                        preuploadID=preuploadID,
                        sliceNo=part_no,
                        sliceMD5=Uploader.__md5_bytes(chunk),
                        slice=chunk,
                        upload_server=upload_server,
                    )

                    if status_callback:
                        status_callback(
                            {
                                "stage": "uploading",
                                "file": str(path),
                                "name": path.name,
                                "size": size,
                                "size_text": size_text,
                                "uploaded_parts": part_no,
                                "total_parts": total_parts,
                            }
                        )

                    # 更新进度条
                    pbar.update(len(chunk))
    else:
        with path.open("rb") as f:
            for part_no in range(1, total_parts + 1):
                chunk = f.read(real_slice)
                if not chunk:
                    break

                # 使用 File2.slice 方法上传分片,传入上传服务器地址
                self.file2.slice(
                    preuploadID=preuploadID,
                    sliceNo=part_no,
                    sliceMD5=Uploader.__md5_bytes(chunk),
                    slice=chunk,
                    upload_server=upload_server,
                )

                if status_callback:
                    status_callback(
                        {
                            "stage": "uploading",
                            "file": str(path),
                            "name": path.name,
                            "size": size,
                            "size_text": size_text,
                            "uploaded_parts": part_no,
                            "total_parts": total_parts,
                        }
                    )

    # 3) 完成上传(必要时轮询)
    start = time.time()
    if status_callback:
        status_callback(
            {
                "stage": "confirming",
                "file": str(path),
                "name": path.name,
                "size": size,
                "size_text": size_text,
                "uploaded_parts": total_parts,
                "total_parts": total_parts,
            }
        )
    while True:
        time.sleep(2)
        done = self.file2.upload_complete(preuploadID=preuploadID)

        # 检查响应状态
        # code=20103 表示文件正在校验中,需要继续轮询
        code = done.get("code", 0)
        if code != 0 and code != 20103:
            error_msg = done.get("message", "未知错误")
            raise RuntimeError(f"完成上传失败: {error_msg}")

        d = done.get("data")
        if d and d.get("completed") and d.get("fileID", 0) != 0:
            if status_callback:
                status_callback(
                    {
                        "stage": "done",
                        "file": str(path),
                        "name": path.name,
                        "size": size,
                        "size_text": size_text,
                        "uploaded_parts": total_parts,
                        "total_parts": total_parts,
                    }
                )
            return d

        if time.time() - start > poll_timeout_sec:
            if status_callback:
                status_callback(
                    {
                        "stage": "timeout",
                        "file": str(path),
                        "name": path.name,
                        "size": size,
                        "size_text": size_text,
                        "uploaded_parts": total_parts,
                        "total_parts": total_parts,
                    }
                )
            raise TimeoutError("上传完成确认超时")

upload_file_single

upload_file_single(
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    containDir: bool = False,
    domain: Optional[str] = None,
    single_limit_bytes: int = 1 * 1024 * 1024 * 1024,
    remote_path: Optional[str] = None,
) -> dict

单步上传单个文件(小文件)。

Parameters:

Name Type Description Default
file_path str | Path

本地文件路径

required
parentFileID int

父目录 id

0
duplicate int

重名策略(1 保留两者,2 覆盖原文件)

1
containDir bool

是否携带路径

False
domain Optional[str]

指定上传域名(不传则自动获取)

None
single_limit_bytes int

单步上传大小限制

1 * 1024 * 1024 * 1024
Source code in src/cpan123/Uploader.py
def upload_file_single(
    self,
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    containDir: bool = False,
    domain: Optional[str] = None,
    single_limit_bytes: int = 1 * 1024 * 1024 * 1024,  # 1GB(接口限制)
    remote_path: Optional[str] = None,
) -> dict:
    """单步上传单个文件(小文件)。

    Args:
        file_path: 本地文件路径
        parentFileID: 父目录 id
        duplicate: 重名策略(1 保留两者,2 覆盖原文件)
        containDir: 是否携带路径
        domain: 指定上传域名(不传则自动获取)
        single_limit_bytes: 单步上传大小限制
    """
    path = Path(file_path)
    assert path.exists() and path.is_file(), f"文件不存在: {path}"
    size = path.stat().st_size
    if size > single_limit_bytes:
        raise ValueError("文件过大,请使用分片上传")

    etag = calculate_md5(path)
    if not containDir:
        filename = path.name
    else:
        if remote_path:
            filename = remote_path
        else:
            filename = path.name

    # 选择上传域名
    upload_domain = domain
    if not upload_domain:
        domains = self.file2.domain()

        # 检查响应状态
        if domains.get("code") != 0:
            error_msg = domains.get("message", "未知错误")
            raise RuntimeError(f"获取上传域名失败: {error_msg}")

        arr = domains.get("data", [])
        upload_domain = (arr or [None])[0]
    if not upload_domain:
        raise RuntimeError("获取上传域名失败")

    # 使用 File2.single_create 方法上传,传入上传服务器地址
    with path.open("rb") as fp:
        file_content = fp.read()

    res = self.file2.single_create(
        parentFileID=parentFileID,
        filename=filename,
        etag=etag,
        size=size,
        file=file_content,
        duplicate=duplicate,
        containDir=containDir,
        upload_server=upload_domain,
    )

    # 检查响应状态
    if res.get("code") != 0:
        error_msg = res.get("message", "未知错误")
        raise RuntimeError(f"单步上传失败: {error_msg}")

    data = res.get("data")
    if not data:
        raise RuntimeError("单步上传失败:响应数据为空")

    if data.get("completed") and data.get("fileID", 0) != 0:
        return data
    raise RuntimeError(f"单步上传失败: {res}")

upload_folder

upload_folder(
    folder_path: str | Path,
    parentFileID: int = 0,
    method: str = "chunked",
    max_workers: int = 4,
    single_limit_bytes: int = 1 * 1024 * 1024 * 1024,
    duplicate: int = 1,
    contain_dir: bool = True,
    show_progress: bool = True,
) -> dict

上传整个文件夹。

  • method='single':全部使用单步上传(大于限制的文件会自动切换分片)
  • method='chunked':全部使用分片上传
  • method='auto':小于分片阈值走单步,其他分片
  • 当 contain_dir=True 时,远程路径会包含根目录本身,例如:/文件夹名/子目录/文件
  • show_progress: 是否显示上传进度(默认 True)

返回 (本地文件, data) 列表

Source code in src/cpan123/Uploader.py
def upload_folder(
    self,
    folder_path: str | Path,
    parentFileID: int = 0,
    method: str = "chunked",  # 'single' | 'chunked' | 'auto'
    max_workers: int = 4,
    single_limit_bytes: int = 1 * 1024 * 1024 * 1024,
    duplicate: int = 1,
    contain_dir: bool = True,
    show_progress: bool = True,
) -> dict:
    """上传整个文件夹。

    - method='single':全部使用单步上传(大于限制的文件会自动切换分片)
    - method='chunked':全部使用分片上传
    - method='auto':小于分片阈值走单步,其他分片
    - 当 contain_dir=True 时,远程路径会包含根目录本身,例如:/文件夹名/子目录/文件
    - show_progress: 是否显示上传进度(默认 True)

    返回 (本地文件, data) 列表
    """
    root = Path(folder_path)
    assert root.exists() and root.is_dir(), f"目录不存在: {root}"

    files: list[tuple[Path, str]] = []  # (真实路径, 远程相对路径)
    for p in root.rglob("*"):
        if p.is_file():
            # 统一相对路径(用于 containDir)
            rel = p.relative_to(root)
            if contain_dir:
                # 包含根目录本身,但不加开头的 /
                remote = f"{root.name}/{rel.as_posix()}"
            else:
                remote = p.name
            files.append((p, remote))

    results: list[tuple[Path, dict]] = []
    status_map: dict[str, dict] = {}
    status_lock = Lock()

    def _set_status(payload: dict) -> None:
        file_key = str(payload.get("file", ""))
        if not file_key:
            return
        with status_lock:
            status_map[file_key] = payload

    def _clear_status(real_path: Path) -> None:
        with status_lock:
            status_map.pop(str(real_path), None)

    def _progress_text() -> str:
        with status_lock:
            if not status_map:
                return "空闲"
            snapshots = list(status_map.values())

        priority = {"confirming": 0, "uploading": 1, "failed": 2, "timeout": 3, "done": 4}
        snapshots.sort(
            key=lambda item: (
                priority.get(str(item.get("stage")), 99),
                str(item.get("name", "")),
            )
        )
        current = snapshots[0]
        stage = str(current.get("stage", "uploading"))
        name = str(current.get("name", ""))
        size_text = str(current.get("size_text", ""))
        uploaded_parts = int(current.get("uploaded_parts", 0) or 0)
        total_parts = int(current.get("total_parts", 0) or 0)

        if stage == "confirming":
            stage_text = "等待服务端确认"
        elif stage == "uploading":
            if total_parts > 0:
                stage_text = f"上传分片 {uploaded_parts}/{total_parts}"
            else:
                stage_text = "上传中"
        elif stage == "failed":
            stage_text = "上传失败"
        elif stage == "timeout":
            stage_text = "确认超时"
        else:
            stage_text = "处理中"

        active_count = len(snapshots)
        return f"活跃: {active_count} | {stage_text} | {name} ({size_text})"

    def _result_text(real_path: Path, item: dict) -> str:
        size_text = self._format_size(real_path.stat().st_size)
        if isinstance(item, dict) and "error" in item:
            return f"失败: {real_path.name} ({size_text}) -> {item.get('error')}"

        inner = item.get("data") if isinstance(item, dict) and isinstance(item.get("data"), dict) else item
        if isinstance(inner, dict):
            file_id = int(inner.get("fileID", 0) or 0)
            completed = bool(inner.get("completed"))
            reuse = bool(inner.get("reuse"))
            if file_id != 0 and (completed or reuse):
                if reuse:
                    return f"已秒传: {real_path.name} ({size_text})"
                return f"已上传: {real_path.name} ({size_text})"

        return f"失败: {real_path.name} ({size_text}) -> unexpected response"

    def _task(item: tuple[Path, str]) -> tuple[Path, dict]:
        real_path, remote = item
        size = real_path.stat().st_size
        try:
            use_chunked = method == "chunked" or (method == "auto" and size >= self.chunked_threshold_bytes)
            if use_chunked:
                data = self.upload_file_chunked(
                    real_path,
                    parentFileID=parentFileID,
                    duplicate=duplicate,
                    containDir=contain_dir,
                    remote_path=remote,
                    show_progress=False,  # 多线程环境下禁用单文件进度条
                    status_callback=_set_status,
                )
            else:
                _set_status(
                    {
                        "stage": "uploading",
                        "file": str(real_path),
                        "name": real_path.name,
                        "size": size,
                        "size_text": self._format_size(size),
                        "uploaded_parts": 0,
                        "total_parts": 0,
                    }
                )
                data = self.upload_file_single(
                    real_path,
                    parentFileID=parentFileID,
                    duplicate=duplicate,
                    containDir=contain_dir,
                    single_limit_bytes=single_limit_bytes,
                    remote_path=remote,
                )
                _set_status(
                    {
                        "stage": "done",
                        "file": str(real_path),
                        "name": real_path.name,
                        "size": size,
                        "size_text": self._format_size(size),
                        "uploaded_parts": 0,
                        "total_parts": 0,
                    }
                )
            return (real_path, data)
        except Exception as e:  # 汇总错误,便于一次性查看
            _set_status(
                {
                    "stage": "failed",
                    "file": str(real_path),
                    "name": real_path.name,
                    "size": size,
                    "size_text": self._format_size(size),
                    "uploaded_parts": 0,
                    "total_parts": 0,
                }
            )
            return (real_path, {"error": str(e)})

    # 多线程提交任务
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        fut_map = {ex.submit(_task, item): item for item in files}

        if show_progress:
            # 显示文件级别的进度条
            with tqdm(total=len(files), desc="上传文件", unit="file") as pbar:
                pbar.set_postfix_str(_progress_text())
                for fut in as_completed(fut_map):
                    result = fut.result()
                    results.append(result)
                    tqdm.write(_result_text(result[0], result[1]))
                    _clear_status(result[0])
                    pbar.set_postfix_str(_progress_text())
                    pbar.update(1)
        else:
            for fut in as_completed(fut_map):
                result = fut.result()
                results.append(result)
                _clear_status(result[0])

    # 规范化每个文件的返回,统一为 {code, message, success, data}
    normalized: dict[str, dict] = {}
    for real_path, item in results:
        key = str(real_path)
        # 异常场景
        if isinstance(item, dict) and "error" in item:
            normalized[key] = {
                "code": 1,
                "message": str(item.get("error")),
                "success": False,
                "data": None,
            }
            continue

        # 解包外层 {code,message,data} 与直接 data 两种形态
        inner = item.get("data") if isinstance(item, dict) and isinstance(item.get("data"), dict) else item
        if not isinstance(inner, dict):
            normalized[key] = {
                "code": 1,
                "message": "unexpected response",
                "success": False,
                "data": None,
            }
            continue

        file_id = int(inner.get("fileID", 0) or 0)
        completed = bool(inner.get("completed"))
        reuse = bool(inner.get("reuse"))
        success = file_id != 0 and (completed or reuse)

        normalized[key] = {
            "code": 0 if success else 1,
            "message": "ok" if success else "failed",
            "success": success,
            "data": inner,
        }

    # 统计
    total = len(normalized)
    succeeded = sum(1 for v in normalized.values() if v.get("success"))
    failed = total - succeeded

    return {
        "code": 0 if failed == 0 else 1,
        "message": "ok" if failed == 0 else "partial failure",
        "total": total,
        "succeeded": succeeded,
        "failed": failed,
        "data": normalized,
        "x-traceID": "my-trace-id",
    }

upload

upload(
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    contain_dir: bool = True,
    show_progress: bool = True,
    method: str = "chunked",
) -> dict

根据路径类型选择上传方式。

Parameters:

Name Type Description Default
file_path str | Path

本地文件或目录路径

required
parentFileID int

父目录 id

0
duplicate int

当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件)

1
show_progress bool

是否显示上传进度(默认 True)

True
method str

上传方式,默认统一走分片上传

'chunked'

Returns:

Type Description
dict

统一汇总后的上传结果

Source code in src/cpan123/Uploader.py
def upload(
    self,
    file_path: str | Path,
    parentFileID: int = 0,
    duplicate: int = 1,
    contain_dir: bool = True,
    show_progress: bool = True,
    method: str = "chunked",
) -> dict:
    """根据路径类型选择上传方式。

    Args:
        file_path: 本地文件或目录路径
        parentFileID: 父目录 id
        duplicate: 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件)
        show_progress: 是否显示上传进度(默认 True)
        method: 上传方式,默认统一走分片上传

    Returns:
        统一汇总后的上传结果

    """
    path = Path(file_path)

    if path.is_file():
        try:
            if method == "chunked":
                data = self.upload_file_chunked(
                    file_path=path,
                    parentFileID=parentFileID,
                    duplicate=duplicate,
                    containDir=contain_dir,
                    show_progress=show_progress,
                )
            elif method == "single":
                data = self.upload_file_single(
                    file_path=path,
                    parentFileID=parentFileID,
                    duplicate=duplicate,
                    containDir=contain_dir,
                )
            else:
                size = path.stat().st_size
                if size >= self.chunked_threshold_bytes:
                    data = self.upload_file_chunked(
                        file_path=path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                        show_progress=show_progress,
                    )
                else:
                    data = self.upload_file_single(
                        file_path=path,
                        parentFileID=parentFileID,
                        duplicate=duplicate,
                        containDir=contain_dir,
                    )
        except Exception as e:
            return {
                "code": 1,
                "message": "failed",
                "total": 1,
                "succeeded": 0,
                "failed": 1,
                "data": {
                    str(path): {
                        "code": 1,
                        "message": str(e),
                        "success": False,
                        "data": None,
                    }
                },
                "x-traceID": "my-trace-id",
            }

        success = False
        if isinstance(data, dict):
            file_id = int(data.get("fileID", 0) or 0)
            completed = bool(data.get("completed"))
            reuse = bool(data.get("reuse"))
            success = file_id != 0 and (completed or reuse)

        resp = {
            "code": 0 if success else 1,
            "message": "ok" if success else "failed",
            "total": 1,
            "succeeded": 1 if success else 0,
            "failed": 0 if success else 1,
            "data": {},
            "x-traceID": "my-trace-id",
        }
        if not success:
            resp["data"] = {
                str(path): {
                    "code": 1,
                    "message": "failed",
                    "success": False,
                    "data": data if isinstance(data, dict) else None,
                }
            }
    elif path.is_dir():
        resp = self.upload_folder(
            folder_path=path,
            parentFileID=parentFileID,
            method=method,
            duplicate=1 if duplicate == 1 else 2,
            contain_dir=contain_dir,
            show_progress=show_progress,
        )
    else:
        raise ValueError(f"路径既不是文件也不是目录: {path}")
    # 单文件成功时不返回明细,避免输出过大
    if path.is_file() and resp.get("succeeded") == resp.get("total"):
        resp["data"] = {}
    return resp