Skip to main content

架构概述

AI Gateway 采用现代化的微服务架构设计,基于适配器模式实现多AI平台的统一接口。整个系统具有高可扩展性、高可用性和易于维护的特点。

流程展示

核心设计模式

适配器模式 (Adapter Pattern)

AI Gateway 的核心设计模式是适配器模式,它允许不兼容的接口能够协同工作。
# 抽象基类定义统一接口
class BaseAgent(ABC):
    @abstractmethod
    async def chat(self, request: ChatRequest) -> UnifiedChatResponse:
        pass
    
    @abstractmethod
    async def upload_file(self, file: UploadFile, created_by: str) -> FileResponse:
        pass

# 具体适配器实现
class FastGPTAgent(BaseAgent):
    async def chat(self, request: ChatRequest) -> UnifiedChatResponse:
        # 将统一请求转换为FastGPT特定格式
        fastgpt_request = self._convert_to_fastgpt_format(request)
        # 调用FastGPT API
        response = await self._call_fastgpt_api(fastgpt_request)
        # 将FastGPT响应转换为统一格式
        return self._convert_to_unified_format(response)

注册表模式 (Registry Pattern)

使用注册表模式管理所有可用的Agent实例,实现动态加载和配置。
class AgentRegistry:
    def __init__(self):
        self._agents = {}
    
    def register(self, name: str, agent: BaseAgent):
        self._agents[name] = agent
    
    def get_agent(self, name: str) -> BaseAgent:
        return self._agents.get(name)
    
    def list_agents(self) -> List[str]:
        return list(self._agents.keys())

技术栈

后端框架

  • FastAPI - 现代化的Python Web框架,支持异步处理和自动API文档生成
  • Pydantic - 数据验证和序列化库,确保类型安全
  • Uvicorn - ASGI服务器,支持高并发请求处理

数据处理

  • Pydantic Models - 统一的数据模型定义
  • JSON Schema - API文档自动生成
  • Type Hints - 完整的类型注解支持

文件处理

  • aiofiles - 异步文件操作
  • python-multipart - 文件上传处理
  • boto3 - AWS S3集成
  • minio - MinIO对象存储支持

数据流

请求处理流程

  1. 客户端发送请求 - 包含agent类型和聊天内容
  2. Gateway解析请求 - 从请求头获取agent类型
  3. 获取Agent实例 - 从注册表获取对应的Agent
  4. 调用Agent方法 - 执行具体的聊天逻辑
  5. 格式转换 - 将平台特定响应转换为统一格式
  6. 返回结果 - 向客户端返回统一格式的响应

文件上传流程

  1. 客户端上传文件 - 通过multipart/form-data格式
  2. 文件验证 - 检查文件类型和大小
  3. 存储到对象存储 - 上传到S3或MinIO
  4. 返回文件信息 - 包含文件URL和元数据

核心组件

BaseAgent 抽象基类

所有Agent都必须继承自BaseAgent基类,实现统一的接口。
class BaseAgent(ABC):
    """AI Agent抽象基类"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self._validate_config()
    
    @abstractmethod
    async def chat(self, request: ChatRequest) -> UnifiedChatResponse:
        """处理聊天请求"""
        pass
    
    @abstractmethod
    async def upload_file(self, file: UploadFile, created_by: str) -> FileResponse:
        """处理文件上传"""
        pass

统一数据模型

使用Pydantic定义统一的数据模型,确保数据验证和类型安全。
class ChatRequest(BaseModel):
    query: str
    user: str
    uid: Optional[str] = None
    stream: bool = False
    detail: bool = False
    app_id: Optional[str] = None
    chat_id: Optional[str] = None
    conversation_id: Optional[str] = None
    files: Optional[List[str]] = None

class UnifiedChatResponse(BaseModel):
    id: str
    model: str
    usage: Usage
    choices: List[Choice]
    detail: Optional[Dict[str, Any]] = None

扩展性设计

新增Agent支持

要添加新的AI平台支持,只需:
  1. 创建新的Agent类继承BaseAgent
  2. 实现必要的抽象方法
  3. 在Agent类中注册到registry
  4. 配置相应的环境变量
class NewPlatformAgent(BaseAgent):
    async def chat(self, request: ChatRequest) -> UnifiedChatResponse:
        # 实现具体的聊天逻辑
        pass
    
    async def upload_file(self, file: UploadFile, created_by: str) -> FileResponse:
        # 实现文件上传逻辑
        pass

# 注册到registry
registry.register("new_platform", NewPlatformAgent(config))

性能优化

异步处理

整个系统基于异步架构,支持高并发请求处理:
@app.post("/api/v1/chat")
async def chat(request: ChatRequest, agent: str = Header(...)):
    agent_instance = registry.get_agent(agent)
    if not agent_instance:
        raise HTTPException(status_code=400, detail="Unsupported agent")
    
    return await agent_instance.chat(request)

连接池

对于外部API调用,使用连接池优化性能:
import aiohttp

class FastGPTAgent(BaseAgent):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.session = aiohttp.ClientSession()
    
    async def _call_api(self, url: str, data: Dict):
        async with self.session.post(url, json=data) as response:
            return await response.json()

安全考虑

输入验证

使用Pydantic进行严格的输入验证:
class ChatRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=10000)
    user: str = Field(..., min_length=1, max_length=100)

API密钥管理

通过环境变量安全管理API密钥:
# 使用环境变量,避免硬编码
API_KEY = os.getenv("API_KEY")
if not API_KEY:
    raise ValueError("API_KEY environment variable is required")

文件上传安全

ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.doc', '.docx', '.jpg', '.png'}
MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB

def validate_file(file: UploadFile):
    if file.size > MAX_FILE_SIZE:
        raise HTTPException(status_code=400, detail="File too large")
    
    if not any(file.filename.endswith(ext) for ext in ALLOWED_EXTENSIONS):
        raise HTTPException(status_code=400, detail="File type not allowed")

部署架构

了解AI Gateway的生产环境部署架构