跳转至

Auth类

Auth 类

Jwt

JWT 授权与 Token 管理, JWT授权一般没有refresh_token, 直接通过 client_id 和 client_secret 获取token

Source code in src/cpan123/authtype/Jwt.py
class Jwt:
    """JWT 授权与 Token 管理, JWT授权一般没有refresh_token, 直接通过 client_id 和 client_secret 获取token"""

    REFRESH_THRESHOLD = 60  # 提前刷新秒数

    def __init__(self, envpath: str | None = None, verbose: bool = False):
        """
        JWT 授权与 Token 管理

        Args:
            envpath: 环境文件路径。未指定时按优先级查找:用户指定 > 当前目录 .env > 用户目录 .env.123
            verbose: 是否启用详细日志输出
        """
        self.env = EnvConfig(envpath)
        self.verbose = verbose
        self._load_config()
        self.session = self._create_client()

    # -------------------- 配置 --------------------
    def _load_config(self):
        env = self.env
        self.client_id = env.get("CLIENT_ID")
        self.client_secret = env.get("CLIENT_SECRET")
        self.access_token = env.get("ACCESS_TOKEN")
        self.expires_at = env.get_int("EXPIRES_AT", 0)

    # -------------------- HTTP 客户端 --------------------
    def _create_client(self) -> httpx.Client:
        hooks = {"request": [log_request], "response": [log_response]} if self.verbose else None
        timeout = httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=30.0)
        return httpx.Client(headers={"User-Agent": UA}, timeout=timeout, event_hooks=hooks)

    # def _is_iso8601_format(self, date_str):
    #     try:
    #         datetime.fromisoformat(date_str)
    #         return True
    #     except ValueError:
    #         return False
    # -------------------- Token 管理 --------------------
    # def _expire2int(self, expire_value: str | int | float) -> int:
    #     """将过期时间值转换为整数秒"""
    #     if isinstance(expire_value, int):
    #         t = expire_value
    #     elif isinstance(expire_value, (str, float)):
    #         try:
    #             t = int(float(expire_value))
    #         except (TypeError, ValueError):
    #             log.error(f"无法将过期时间转换为整数: {expire_value}")
    #             raise AuthError(-1, "无效的过期时间格式") from None
    #     else:
    #         log.error(f"过期时间值类型不支持: {type(expire_value)}")
    #         raise AuthError(-1, "无效的过期时间类型")
    #     if t < 0 or t >= 20 * 365 * 24 * 3600:
    #         log.error(f"过期时间数值异常: {t}")
    #         raise AuthError(-1, "过期时间数值异常")
    #     return t
    @property
    def is_token_valid(self) -> bool:
        return self.expires_at > time.time() + self.REFRESH_THRESHOLD

    def _get_key(self, data: dict, key: str) -> str:
        """从响应数据中获取指定 key 的值,支持多层嵌套"""
        return data.get(key) or data.get("data", {}).get(key) or ""

    def _fetch_token(self) -> None:
        """通过 client_id/secret 获取新的 access_token"""
        if not self.client_id or not self.client_secret:
            raise AuthError(-1, "缺少 client_id 或 client_secret,无法刷新 token")

        data = {
            "clientID": self.client_id,
            "clientSecret": self.client_secret,
        }
        headers = {"Platform": "open_platform"}
        respjson = self._do_request("POST", url=API.JWT.TOKEN, headers=headers, data=data).json()
        self._update_token(respjson)

    def _update_token(self, data: dict):
        """更新本地 token 并写回 .env"""
        code = int(self._get_key(data, "code") or 0)
        if code != 0:
            raise AuthError(int(code), f"获取 token 失败: {data}")
        access_token = self._get_key(data, "accessToken")
        expiredAt = self._get_key(data, "expiredAt")

        try:
            expires_in = int(datetime.fromisoformat(expiredAt).timestamp())
        except (TypeError, ValueError):
            log.error(f"无法将过期时间转换为整数: {expiredAt}")
            raise AuthError(-1, "无效的过期时间格式") from None

        if not access_token:
            raise AuthError(-1, "响应缺少 access_token")

        self.access_token = access_token
        self.expires_at = expires_in

        for k, v in {
            "ACCESS_TOKEN": access_token,
            "EXPIRES_AT": str(int(self.expires_at)),
        }.items():
            self.env.set(k, v)

    # -------------------- 请求方法 --------------------
    def _do_request(self, method: str, url: str, **kwargs) -> httpx.Response:
        try:
            resp = self.session.request(method, url, **kwargs)
            resp.raise_for_status()
            return resp
        except httpx.RequestError as e:
            log.error(f"method: {method}, url: {url}, 网络请求失败: {e}")
            raise AuthError(-1, f"网络错误: {e}") from e
        except httpx.HTTPStatusError as e:
            raise AuthError(e.response.status_code, f"HTTP 错误: {e}") from e

    # -------------------- Token 自动获取 --------------------
    def _get_token_if_needed(self) -> str:
        if not self.access_token or not self.is_token_valid:
            self._fetch_token()
        return self.access_token

    def refresh_token(self) -> str:
        """强制刷新 access_token"""
        self._fetch_token()
        return self.access_token

    # -------------------- 公共接口 --------------------
    def get_access_token(self) -> str:
        """获取有效的 access_token, 如果过期则自动刷新"""
        access_token = self._get_token_if_needed()
        if not access_token:
            raise AuthError(40140116, "无法获取有效的 access_token")
        return access_token

__init__

__init__(envpath: str | None = None, verbose: bool = False)

JWT 授权与 Token 管理

Parameters:

Name Type Description Default
envpath str | None

环境文件路径。未指定时按优先级查找:用户指定 > 当前目录 .env > 用户目录 .env.123

None
verbose bool

是否启用详细日志输出

False
Source code in src/cpan123/authtype/Jwt.py
def __init__(self, envpath: str | None = None, verbose: bool = False):
    """
    JWT 授权与 Token 管理

    Args:
        envpath: 环境文件路径。未指定时按优先级查找:用户指定 > 当前目录 .env > 用户目录 .env.123
        verbose: 是否启用详细日志输出
    """
    self.env = EnvConfig(envpath)
    self.verbose = verbose
    self._load_config()
    self.session = self._create_client()

refresh_token

refresh_token() -> str

强制刷新 access_token

Source code in src/cpan123/authtype/Jwt.py
def refresh_token(self) -> str:
    """强制刷新 access_token"""
    self._fetch_token()
    return self.access_token

get_access_token

get_access_token() -> str

获取有效的 access_token, 如果过期则自动刷新

Source code in src/cpan123/authtype/Jwt.py
def get_access_token(self) -> str:
    """获取有效的 access_token, 如果过期则自动刷新"""
    access_token = self._get_token_if_needed()
    if not access_token:
        raise AuthError(40140116, "无法获取有效的 access_token")
    return access_token

Auth

Bases: Jwt

带自动 Bearer Token 的授权请求类

该类继承自 Jwt, 自动在请求中添加 Bearer Token 进行授权。

Source code in src/cpan123/Auth.py
class Auth(Jwt):
    """带自动 Bearer Token 的授权请求类

    该类继承自 Jwt, 自动在请求中添加 Bearer Token 进行授权。

    """

    def request(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
        """带授权头的请求

        根据不同的要求, 添加一些通用的请求头。

        Args:
            method: HTTP 方法,如 "GET", "POST" 等
            url: 请求的完整 URL 或相对路径
            **kwargs: 传递给 httpx 请求的其他参数,如 headers, params, json 等

        Returns:
            httpx.Response: HTTP 响应对象
        """
        url = url if url.startswith("http") else urljoin(API.API_BASE, url)

        headers = kwargs.pop("headers", {})
        headers["Authorization"] = f"Bearer {self.get_access_token()}"
        headers["Platform"] = "open_platform"
        kwargs["headers"] = headers

        # 对 kwargs 进行处理, 如果有params, data, json等, 删除None值
        for key in ["params", "data", "json"]:
            if key in kwargs and isinstance(kwargs[key], dict):
                kwargs[key] = {k: v for k, v in kwargs[key].items() if v is not None}

        return self._do_request(method, url, **kwargs)

    def request_json(self, method: str, url: str, **kwargs: Any) -> dict:
        """带授权头的请求,并解析为统一响应模型

        Args:
            method: HTTP 方法,如 "GET", "POST" 等
            url: 请求的完整 URL 或相对路径
            kwargs: 传递给 httpx 请求的其他参数,如 headers, params, json 等

        Returns:
            解析后的响应数据,符合统一响应模型
        """

        verbose = kwargs.pop("verbose", getattr(self, "verbose", True))

        invalid_codes = {401, 40140116}
        last_error: Exception | None = None

        for attempt in range(2):
            resp = None
            try:
                resp = self.request(method, url, **kwargs)
                resp.raise_for_status()
                respjson = resp.json()
            except AuthError as e:
                last_error = e
                if e.code in invalid_codes and attempt == 0:
                    self.refresh_token()
                    continue
                if verbose:
                    log.error(f"授权失败: {e}")
                    log.error(f"请求方法: {method}, URL: {url}")
                raise ValueError("授权失败,请检查凭据。") from e
            except Exception as e:
                last_error = e
                if verbose:
                    log.error(f"请求失败: {e}")
                    log.error(f"请求方法: {method}, URL: {url}")
                    log.error(f"原始响应: {resp.text if resp is not None else '无响应'}")
                raise ValueError("请求过程中发生错误,请检查日志以获取详细信息。") from e

            if respjson.get("code") == 429:
                return respjson

            message = respjson.get("message", "")
            if (respjson.get("code") in invalid_codes or "expired" in message.lower()) and attempt == 0:
                self.refresh_token()
                continue

            try:
                parsed = BaseResponse.model_validate(respjson)
                return parsed.model_dump()
            except Exception as e:
                if verbose:
                    log.error(f"解析响应 JSON 失败: {e}")
                    log.error(f"请求方法: {method}, URL: {url}")
                    log.error(f"原始响应: {respjson}")
                return respjson

        if last_error is not None:
            raise ValueError("多次刷新 token 后仍失败,请检查凭据。") from last_error
        raise ValueError("请求失败,且未返回任何错误详情。")

request

request(
    method: str, url: str, **kwargs: Any
) -> httpx.Response

带授权头的请求

根据不同的要求, 添加一些通用的请求头。

Parameters:

Name Type Description Default
method str

HTTP 方法,如 "GET", "POST" 等

required
url str

请求的完整 URL 或相对路径

required
**kwargs Any

传递给 httpx 请求的其他参数,如 headers, params, json 等

{}

Returns:

Type Description
Response

httpx.Response: HTTP 响应对象

Source code in src/cpan123/Auth.py
def request(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
    """带授权头的请求

    根据不同的要求, 添加一些通用的请求头。

    Args:
        method: HTTP 方法,如 "GET", "POST" 等
        url: 请求的完整 URL 或相对路径
        **kwargs: 传递给 httpx 请求的其他参数,如 headers, params, json 等

    Returns:
        httpx.Response: HTTP 响应对象
    """
    url = url if url.startswith("http") else urljoin(API.API_BASE, url)

    headers = kwargs.pop("headers", {})
    headers["Authorization"] = f"Bearer {self.get_access_token()}"
    headers["Platform"] = "open_platform"
    kwargs["headers"] = headers

    # 对 kwargs 进行处理, 如果有params, data, json等, 删除None值
    for key in ["params", "data", "json"]:
        if key in kwargs and isinstance(kwargs[key], dict):
            kwargs[key] = {k: v for k, v in kwargs[key].items() if v is not None}

    return self._do_request(method, url, **kwargs)

request_json

request_json(method: str, url: str, **kwargs: Any) -> dict

带授权头的请求,并解析为统一响应模型

Parameters:

Name Type Description Default
method str

HTTP 方法,如 "GET", "POST" 等

required
url str

请求的完整 URL 或相对路径

required
kwargs Any

传递给 httpx 请求的其他参数,如 headers, params, json 等

{}

Returns:

Type Description
dict

解析后的响应数据,符合统一响应模型

Source code in src/cpan123/Auth.py
def request_json(self, method: str, url: str, **kwargs: Any) -> dict:
    """带授权头的请求,并解析为统一响应模型

    Args:
        method: HTTP 方法,如 "GET", "POST" 等
        url: 请求的完整 URL 或相对路径
        kwargs: 传递给 httpx 请求的其他参数,如 headers, params, json 等

    Returns:
        解析后的响应数据,符合统一响应模型
    """

    verbose = kwargs.pop("verbose", getattr(self, "verbose", True))

    invalid_codes = {401, 40140116}
    last_error: Exception | None = None

    for attempt in range(2):
        resp = None
        try:
            resp = self.request(method, url, **kwargs)
            resp.raise_for_status()
            respjson = resp.json()
        except AuthError as e:
            last_error = e
            if e.code in invalid_codes and attempt == 0:
                self.refresh_token()
                continue
            if verbose:
                log.error(f"授权失败: {e}")
                log.error(f"请求方法: {method}, URL: {url}")
            raise ValueError("授权失败,请检查凭据。") from e
        except Exception as e:
            last_error = e
            if verbose:
                log.error(f"请求失败: {e}")
                log.error(f"请求方法: {method}, URL: {url}")
                log.error(f"原始响应: {resp.text if resp is not None else '无响应'}")
            raise ValueError("请求过程中发生错误,请检查日志以获取详细信息。") from e

        if respjson.get("code") == 429:
            return respjson

        message = respjson.get("message", "")
        if (respjson.get("code") in invalid_codes or "expired" in message.lower()) and attempt == 0:
            self.refresh_token()
            continue

        try:
            parsed = BaseResponse.model_validate(respjson)
            return parsed.model_dump()
        except Exception as e:
            if verbose:
                log.error(f"解析响应 JSON 失败: {e}")
                log.error(f"请求方法: {method}, URL: {url}")
                log.error(f"原始响应: {respjson}")
            return respjson

    if last_error is not None:
        raise ValueError("多次刷新 token 后仍失败,请检查凭据。") from last_error
    raise ValueError("请求失败,且未返回任何错误详情。")