diff --git a/packages/iamai-adapter-console/iamai/adapter/console/config.py b/packages/iamai-adapter-console/iamai/adapter/console/config.py index a77895b7..f6a39849 100644 --- a/packages/iamai-adapter-console/iamai/adapter/console/config.py +++ b/packages/iamai-adapter-console/iamai/adapter/console/config.py @@ -1,4 +1,5 @@ """Console 适配器配置。""" + from typing import Any, Dict from iamai.config import ConfigModel @@ -8,4 +9,4 @@ class Config(ConfigModel): """Console 配置类,将在适配器被加载时被混入到机器人主配置中。""" __config_name__ = "console" - show_raw: bool = False + show_raw: bool = False \ No newline at end of file diff --git a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/__init__.py b/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/__init__.py deleted file mode 100644 index dfd77f35..00000000 --- a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/__init__.py +++ /dev/null @@ -1,165 +0,0 @@ -"""DingTalk 协议适配器。 - -本适配器适配了钉钉企业自建机器人协议。 -协议详情请参考: [钉钉开放平台](https://developers.dingtalk.com/document/robots/robot-overview) 。 -""" -import hmac -import time -import base64 -import hashlib -from typing import Any, Dict, Union, Literal - -import aiohttp -from aiohttp import web - -from iamai.adapter import Adapter -from iamai.log import logger, error_or_exception - -from .config import Config -from .event import DingTalkEvent -from .message import DingTalkMessage -from .exceptions import ApiTimeout, NetworkError - -__all__ = ["DingTalkAdapter"] - - -class DingTalkAdapter(Adapter[DingTalkEvent, Config]): - """钉钉协议适配器。""" - - name: str = "dingtalk" - Config = Config - - app: web.Application = None - runner: web.AppRunner = None - site: web.TCPSite = None - - session: aiohttp.ClientSession = None - - async def startup(self): - """创建 aiohttp Application。""" - self.app = web.Application() - self.app.add_routes([web.post(self.config.url, self.handler)]) - - self.session = aiohttp.ClientSession() - - async def run(self): - """运行 aiohttp 服务器。""" - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.site = web.TCPSite(self.runner, self.config.host, self.config.port) - await self.site.start() - - async def shutdown(self): - """清理 aiohttp AppRunner。""" - if self.session is not None: - await self.session.close() - if self.site is not None: - await self.site.stop() - if self.runner is not None: - await self.runner.cleanup() - - async def handler(self, request: web.Request): - """处理 aiohttp 服务器的接收。 - - Args: - request: aiohttp 服务器的 Request 对象。 - """ - if "timestamp" not in request.headers or "sign" not in request.headers: - logger.error(f"Illegal http header, incomplete http header") - elif abs(int(request.headers["timestamp"]) - time.time() * 1000) > 3600000: - logger.error( - f'Illegal http header, timestamp: {request.headers["timestamp"]}' - ) - elif request.headers["sign"] != self.get_sign(request.headers["timestamp"]): - logger.error(f'Illegal http header, sign: {request.headers["sign"]}') - else: - try: - dingtalk_event = DingTalkEvent(adapter=self, **(await request.json())) - except Exception as e: - error_or_exception( - "Request parsing error:", - e, - self.bot.config.bot.log.verbose_exception, - ) - return web.Response() - await self.handle_event(dingtalk_event) - return web.Response() - - def get_sign(self, timestamp: str) -> str: - """计算签名。 - - Args: - timestamp: 时间戳。 - - Returns: - 签名。 - """ - hmac_code = hmac.new( - self.config.app_secret.encode("utf-8"), - "{}\n{}".format(timestamp, self.config.app_secret).encode("utf-8"), - digestmod=hashlib.sha256, - ).digest() - return base64.b64encode(hmac_code).decode("utf-8") - - async def send( - self, - webhook: str, - conversation_type: Literal["1", "2"], - msg: Union[str, Dict, DingTalkMessage], - at: Union[None, Dict, DingTalkMessage] = None, - ) -> Dict[str, Any]: - """发送消息。 - - Args: - webhook: Webhook 网址。 - conversation_type: 聊天类型,'1' 表示单聊,'2' 表示群聊。 - msg: 消息。 - at: At 对象,仅在群聊时生效,默认为空。 - - Returns: - 钉钉服务器的响应。 - - Raises: - TypeError: 传入参数类型错误。 - ValueError: 传入参数值错误。 - NetworkError: 调用 Webhook 地址时网络错误。 - """ - if isinstance(msg, DingTalkMessage): - pass - elif isinstance(msg, dict): - msg = DingTalkMessage.raw(msg) - elif isinstance(msg, str): - msg = DingTalkMessage.text(msg) - else: - raise TypeError( - f"msg must be str, Dict or DingTalkMessage, not {type(msg)!r}" - ) - - if at is not None: - if isinstance(at, DingTalkMessage): - if at.type == "at": - pass - else: - raise ValueError(f'at.type must be "at", not {at.type}') - elif isinstance(at, dict): - at = DingTalkMessage.raw(at) - else: - raise TypeError(f"at must be Dict or DingTalkMessage, not {type(at)!r}") - - if conversation_type == "1": - data = msg - elif conversation_type == "2": - if at is None: - data = {"msgtype": msg.type, **msg.as_dict()} - else: - data = {"msgtype": msg.type, **msg.as_dict(), **at.as_dict()} - else: - raise ValueError( - f'conversation_type must be "1" or "2" not {conversation_type}' - ) - - try: - async with self.session.post(webhook, json=data) as resp: - return await resp.json() - except aiohttp.ClientError: - raise NetworkError diff --git a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/config.py b/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/config.py deleted file mode 100644 index ce1716bb..00000000 --- a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/config.py +++ /dev/null @@ -1,21 +0,0 @@ -"""DingTalk 适配器配置。""" -from iamai.config import ConfigModel - - -class Config(ConfigModel): - """DingTalk 配置类,将在适配器被加载时被混入到机器人主配置中。 - - Attributes: - host: 本机域名。 - port: 监听的端口。 - url: 路径。 - api_timeout: 进行 API 调用时等待返回响应的超时时间。 - app_secret: 机器人的 appSecret。 - """ - - __config_name__ = "dingtalk" - host: str = "127.0.0.1" - port: int = 8080 - url: str = "/dingtalk" - api_timeout: int = 1000 - app_secret: str = "" diff --git a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/event.py b/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/event.py deleted file mode 100644 index 3e491a22..00000000 --- a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/event.py +++ /dev/null @@ -1,83 +0,0 @@ -"""DingTalk 适配器事件。""" -import time -from typing import TYPE_CHECKING, Any, Dict, List, Union, Literal, Optional - -from pydantic import Field, BaseModel, validator - -from iamai.event import Event - -from .message import DingTalkMessage -from .exceptions import WebhookExpiredError - -if TYPE_CHECKING: - from . import DingTalkAdapter # noqa - - -class UserInfo(BaseModel): - dingtalkId: str - staffId: Optional[str] - - -class Text(BaseModel): - content: str - - -class DingTalkEvent(Event["DingTalkAdapter"]): - """DingTalk 事件基类""" - - type: Optional[str] = Field(alias="msgtype") - - msgtype: str - msgId: str - createAt: str - conversationType: Literal["1", "2"] - conversationId: str - conversationTitle: Optional[str] - senderId: str - senderNick: str - senderCorpId: Optional[str] - sessionWebhook: str - sessionWebhookExpiredTime: int - isAdmin: Optional[bool] - chatbotCorpId: Optional[str] - isInAtList: Optional[bool] - senderStaffId: Optional[str] - chatbotUserId: str - atUsers: List[UserInfo] - text: Text - - message: Optional[DingTalkMessage] - response_msg: Union[None, str, Dict, DingTalkMessage] = None - response_at: Union[None, Dict, DingTalkMessage] = None - - @validator("message", always=True) - def set_ts_now(cls, v, values, **kwargs): # noqa - return DingTalkMessage.text(values["text"].content) - - async def reply( - self, - msg: Union[str, Dict, DingTalkMessage], - at: Union[None, Dict, DingTalkMessage] = None, - ) -> Dict[str, Any]: - """回复消息。 - - Args: - msg: 回复消息的内容,可以是 str, Dict 或 DingTalkMessage。 - at: 回复消息时 At 的对象,必须时 at 类型的 DingTalkMessage,或者符合标准的 Dict。 - - Returns: - 调用 Webhook 地址后钉钉服务器的响应。 - - Raises: - WebhookExpiredError: 当前事件的 Webhook 地址已经过期。 - ...: 同 `DingTalkAdapter.send()` 方法。 - """ - if self.sessionWebhookExpiredTime > time.time() * 1000: - return await self.adapter.send( - webhook=self.sessionWebhook, - conversation_type=self.conversationType, - msg=msg, - at=at, - ) - else: - raise WebhookExpiredError diff --git a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/exceptions.py b/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/exceptions.py deleted file mode 100644 index 44213418..00000000 --- a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/exceptions.py +++ /dev/null @@ -1,18 +0,0 @@ -"""DingTalk 适配器异常。""" -from iamai.exceptions import AdapterException - - -class DingTalkException(AdapterException): - """DingTalk 异常基类。""" - - -class NetworkError(DingTalkException): - """网络异常。""" - - -class WebhookExpiredError(DingTalkException): - """Webhook 地址已到期。""" - - -class ApiTimeout(DingTalkException): - """API 请求响应超时。""" diff --git a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/message.py b/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/message.py deleted file mode 100644 index 76c320a1..00000000 --- a/packages/iamai-adapter-dingtalk/iamai/adapter/dingtalk/message.py +++ /dev/null @@ -1,119 +0,0 @@ -"""DingTalk 适配器消息。""" -from typing import Any, Dict, List, Optional - -from iamai.message import MessageSegment - -__all__ = ["DingTalkMessage"] - - -class DingTalkMessage(MessageSegment[None]): - """DingTalk 消息""" - - @property - def _message_class(self) -> None: - return None - - def __str__(self): - if self.type == "text": - return self.data["content"] - else: - return super().__str__() - - def as_dict(self) -> Dict[str, Dict[str, Any]]: - """返回符合钉钉消息标准的消息字段字典。 - - Returns: - 符合钉钉消息标准的消息字段字典。 - """ - if self.type == "raw": - return self.data - else: - return {self.type: self.data} - - @classmethod - def raw(cls, data: Dict[str, Any]) -> "DingTalkMessage": - """DingTalk 原始消息""" - return cls(type="raw", data=data) - - @classmethod - def text(cls, content: str) -> "DingTalkMessage": - """DingTalk text 消息""" - return cls(type="text", data={"content": content}) - - @classmethod - def link( - cls, text: str, title: str, message_url: str, pic_url: Optional[str] = None - ): - """DingTalk link 消息""" - return cls( - type="link", - data={ - "text": text, - "title": title, - "messageUrl": message_url, - "picUrl": pic_url, - }, - ) - - @classmethod - def markdown(cls, title: str, text: str): - """DingTalk markdown 消息""" - return cls(type="markdown", data={"title": title, "text": text}) - - @classmethod - def action_card_single_btn( - cls, - title: str, - text: str, - single_title: str, - single_url: str, - btn_orientation: str = "0", - ): - """DingTalk 整体跳转 actionCard 消息""" - return cls( - type="actionCard", - data={ - "title": title, - "text": text, - "singleTitle": single_title, - "singleURL": single_url, - "btnOrientation": btn_orientation, - }, - ) - - @classmethod - def action_card_multi_btns( - cls, title: str, text: str, btns: list, btn_orientation: str = "0" - ): - """DingTalk 独立跳转 actionCard 消息""" - return cls( - type="actionCard", - data={ - "title": title, - "text": text, - "btns": btns, - "btnOrientation": btn_orientation, - }, - ) - - @classmethod - def feed_card(cls, links: list): - """DingTalk feedCard 消息""" - return cls(type="feedCard", data={"links": links}) - - @classmethod - def at( - cls, - at_mobiles: Optional[List[str]] = None, - at_user_ids: Optional[List[str]] = None, - is_at_all: bool = False, - ): - """DingTalk At 信息""" - return cls( - type="at", - data={ - "atMobiles": at_mobiles, - "atUserIds": at_user_ids, - "isAtAll": is_at_all, - }, - )