# 5.2 项目开发指南

> **第5周 | 第2课 | 项目开发指南 | 预计时长：60分钟**

---

## 学习目标

- 掌握AI Agent项目的标准开发流程和阶段划分
- 能够搭建规范的项目目录结构和依赖配置
- 实现核心Agent并逐步集成工具、多Agent协作和记忆系统
- 使用代码脚手架快速启动开发

---

## 一、项目目录结构

无论选择哪个项目，都建议使用以下标准化的目录结构：

```
my-agent-project/
├── .env                          # 环境变量（不要提交到Git）
├── .env.example                  # 环境变量模板（提交到Git）
├── .gitignore
├── README.md                     # 项目说明文档
├── requirements.txt              # Python依赖
├── pyproject.toml                # 项目配置（可选）
│
├── src/                          # 源代码
│   ├── __init__.py
│   ├── main.py                   # 入口文件
│   ├── config.py                 # 配置管理
│   │
│   ├── agents/                   # Agent定义
│   │   ├── __init__.py
│   │   ├── coordinator.py        # 协调器/路由器Agent
│   │   ├── worker_a.py           # 工作Agent A
│   │   ├── worker_b.py           # 工作Agent B
│   │   └── ...
│   │
│   ├── tools/                    # 自定义工具
│   │   ├── __init__.py
│   │   ├── search_tool.py        # 搜索工具
│   │   ├── api_tool.py           # API调用工具
│   │   └── ...
│   │
│   ├── memory/                   # 记忆系统
│   │   ├── __init__.py
│   │   ├── short_term.py         # 短期记忆
│   │   ├── long_term.py          # 长期记忆（向量库）
│   │   └── persistence.py        # 持久化存储
│   │
│   ├── prompts/                  # Prompt模板
│   │   ├── __init__.py
│   │   ├── system_prompts.py     # 系统提示词
│   │   └── user_prompts.py       # 用户提示词模板
│   │
│   └── utils/                    # 工具函数
│       ├── __init__.py
│       ├── logger.py             # 日志配置
│       ├── validators.py         # 输入验证
│       └── formatters.py         # 输出格式化
│
├── mcp_servers/                  # MCP服务器（如使用）
│   ├── __init__.py
│   ├── server_a.py
│   └── ...
│
├── tests/                        # 测试文件
│   ├── __init__.py
│   ├── test_agents.py
│   ├── test_tools.py
│   └── test_memory.py
│
├── data/                         # 数据文件（不提交到Git）
│   ├── memory.db                 # SQLite数据库
│   └── ...
│
└── logs/                         # 日志文件
    └── app.log
```

### 快速创建项目骨架

```bash
# 方法1：使用命令行快速创建
mkdir -p my-agent-project/{src/{agents,tools,memory,prompts,utils},mcp_servers,tests,data,logs}
touch my-agent-project/.env{,.example}
touch my-agent-project/{README.md,requirements.txt,pyproject.toml}
touch my-agent-project/src/{__init__.py,main.py,config.py}
touch my-agent-project/src/agents/{__init__.py,coordinator.py}
touch my-agent-project/src/tools/{__init__.py}
touch my-agent-project/src/memory/{__init__.py,short_term.py,long_term.py,persistence.py}
touch my-agent-project/src/prompts/{__init__.py,system_prompts.py,user_prompts.py}
touch my-agent-project/src/utils/{__init__.py,logger.py,validators.py,formatters.py}
touch my-agent-project/tests/{__init__.py,test_agents.py,test_tools.py}

# 方法2：使用Python脚本
python -c "
import os
dirs = ['src/agents', 'src/tools', 'src/memory', 'src/prompts', 'src/utils',
        'mcp_servers', 'tests', 'data', 'logs']
for d in dirs:
    os.makedirs(d, exist_ok=True)
    with open(f'{d}/__init__.py', 'w') if 'mcp_servers' not in d else None: pass
files = ['.env.example', 'README.md', 'requirements.txt', 'src/main.py',
         'src/config.py', 'src/agents/coordinator.py']
for f in files:
    if not os.path.exists(f):
        open(f, 'w').close()
"
```

---

## 二、Phase 1：搭建项目基础

### 2.1 环境配置

**.env.example 模板：**

```bash
# LLM API配置
OPENAI_API_KEY=sk-your-key-here
OPENAI_BASE_URL=https://api.openai.com/v1
MODEL_NAME=gpt-4o-mini

# 可选：其他LLM提供商
# ANTHROPIC_API_KEY=sk-ant-...
# DASHSCOPE_API_KEY=sk-...        # 阿里云通义千问
# DEEPSEEK_API_KEY=sk-...          # DeepSeek

# 向量数据库（如使用）
# CHROMA_PATH=./data/chroma_db

# 外部API
# TUSHARE_TOKEN=your-tushare-token
# WEATHER_API_KEY=your-key
```

**requirements.txt 模板：**

```txt
# 核心依赖
openai>=1.0.0
python-dotenv>=1.0.0

# Agent框架（选择一个）
langchain>=0.3.0
langchain-openai>=0.2.0
# 或
crewai>=0.80.0
# 或
autogen-agentchat>=0.4.0

# 记忆/向量存储
chromadb>=0.5.0
# 或
faiss-cpu>=1.7.0

# MCP（如使用）
mcp>=1.0.0

# 工具库
pydantic>=2.0.0
httpx>=0.27.0
tenacity>=9.0.0          # 重试机制

# 可选：CLI界面
rich>=13.0.0
click>=8.1.0

# 可选：Web界面
streamlit>=1.40.0
# 或
fastapi>=0.115.0
uvicorn>=0.32.0

# 测试
pytest>=8.0.0
pytest-asyncio>=0.24.0
```

### 2.2 配置管理模块

```python
# src/config.py
"""配置管理模块 - 从.env读取配置，提供全局访问"""
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import Optional

load_dotenv()


class AppConfig(BaseModel):
    """应用配置"""
    # LLM配置
    api_key: str = Field(default_factory=lambda: os.getenv("OPENAI_API_KEY", ""))
    base_url: str = Field(default_factory=lambda: os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"))
    model_name: str = Field(default_factory=lambda: os.getenv("MODEL_NAME", "gpt-4o-mini"))

    # 记忆配置
    memory_db_path: str = Field(default="./data/memory.db")
    vector_store_path: str = Field(default="./data/chroma_db")

    # 外部API
    external_api_key: Optional[str] = Field(default=None)

    # 日志
    log_level: str = Field(default="INFO")
    log_file: str = Field(default="./logs/app.log")

    def validate(self) -> list[str]:
        """验证必需的配置项"""
        missing = []
        if not self.api_key:
            missing.append("OPENAI_API_KEY")
        return missing


# 全局配置实例
config = AppConfig()
```

### 2.3 日志配置

```python
# src/utils/logger.py
"""日志配置模块"""
import logging
import os
from datetime import datetime


def setup_logger(name: str = "agent", level: str = "INFO",
                 log_file: str = "./logs/app.log") -> logging.Logger:
    """配置并返回logger实例"""
    os.makedirs(os.path.dirname(log_file), exist_ok=True)

    logger = logging.getLogger(name)
    logger.setLevel(getattr(logging, level.upper()))

    # 避免重复添加handler
    if logger.handlers:
        return logger

    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    # 控制台handler
    console = logging.StreamHandler()
    console.setFormatter(formatter)
    logger.addHandler(console)

    # 文件handler
    file_handler = logging.FileHandler(log_file, encoding="utf-8")
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

    return logger
```

### 2.4 输入验证

```python
# src/utils/validators.py
"""输入验证模块"""
from pydantic import BaseModel, Field, field_validator
from typing import Optional


class UserInput(BaseModel):
    """用户输入验证模型"""
    query: str = Field(..., min_length=1, max_length=2000, description="用户查询内容")
    context: Optional[str] = Field(default=None, max_length=5000)

    @field_validator("query")
    @classmethod
    def query_not_empty(cls, v: str) -> str:
        v = v.strip()
        if not v:
            raise ValueError("查询内容不能为空")
        return v
```

---

## 三、Phase 2：构建核心Agent

核心Agent是整个系统的"大脑"，负责理解用户意图并调用合适的工具完成任务。

### 3.1 单Agent基础实现

```python
# src/agents/worker_a.py  （以内容营销为例）
"""核心工作Agent - 负责主要任务执行"""
from openai import OpenAI
from src.config import config
from src.utils.logger import setup_logger
from typing import Optional

logger = setup_logger("worker")


class ContentAgent:
    """内容生成Agent"""

    def __init__(self):
        self.client = OpenAI(
            api_key=config.api_key,
            base_url=config.base_url
        )
        self.model = config.model_name

    SYSTEM_PROMPT = """你是一个专业的内容营销专家。你的职责是：
1. 理解用户提供的主题
2. 搜集相关素材和观点
3. 撰写一篇结构清晰、有吸引力的文章
4. 文章应包含：引人注目的标题、开头段落、3-5个要点、行动号召结尾

请用中文输出，语气{tone}。"""

    def generate(self, topic: str, style: str = "专业") -> dict:
        """生成内容

        Args:
            topic: 主题
            style: 文章风格（专业/轻松/营销向）

        Returns:
            包含标题、正文、标签的字典
        """
        logger.info(f"开始生成内容 - 主题: {topic}, 风格: {style}")

        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT.format(tone=style)},
            {"role": "user", "content": f"请为主题'{topic}'撰写一篇营销文章"}
        ]

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=0.8,
                max_tokens=2000
            )

            content = response.choices[0].message.content
            logger.info(f"内容生成完成，长度: {len(content)} 字符")

            return {
                "success": True,
                "content": content,
                "topic": topic,
                "style": style
            }

        except Exception as e:
            logger.error(f"内容生成失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "topic": topic
            }
```

### 3.2 协调器/路由器Agent

```python
# src/agents/coordinator.py
"""协调器Agent - 分发请求到对应的工作Agent"""
from src.agents.worker_a import ContentAgent
from src.utils.logger import setup_logger
from typing import Any

logger = setup_logger("coordinator")


class Coordinator:
    """协调器 - 根据意图路由到不同Agent"""

    def __init__(self):
        self.content_agent = ContentAgent()
        # self.other_agent = OtherAgent()  # 添加更多Agent

    def route(self, user_input: str, intent: str = "content",
              **kwargs) -> dict:
        """路由请求到合适的Agent

        Args:
            user_input: 用户输入
            intent: 意图类型（content/analysis/support等）
            **kwargs: 额外参数

        Returns:
            Agent处理结果
        """
        logger.info(f"收到请求 - 意图: {intent}, 输入: {user_input[:50]}...")

        routes = {
            "content": self.content_agent.generate,
            # "analysis": self.analysis_agent.run,
            # "support": self.support_agent.handle,
        }

        handler = routes.get(intent)
        if not handler:
            return {
                "success": False,
                "error": f"未知的意图类型: {intent}"
            }

        try:
            result = handler(user_input, **kwargs)
            logger.info(f"请求处理完成 - 成功: {result.get('success')}")
            return result
        except Exception as e:
            logger.error(f"路由处理异常: {e}")
            return {"success": False, "error": str(e)}
```

### 3.3 Prompt模板管理

```python
# src/prompts/system_prompts.py
"""Prompt模板集中管理"""

# === 内容营销Agent ===
CONTENT_WRITER_PROMPT = """你是资深内容营销专家，擅长撰写{industry}行业的高质量文章。

要求：
- 文章结构：标题 -> 引言 -> 主体(3-5个要点) -> 总结 -> 行动号召
- 字数：{word_count}字左右
- 风格：{style}
- 语言：中文

请确保内容准确、有价值、有吸引力。"""

CONTENT_SUMMARIZER_PROMPT = """请阅读以下内容，提取关键信息并生成摘要：

原文：
{content}

请输出：
1. 一句话摘要（50字以内）
2. 3-5个关键词
3. 核心观点列表
"""

# === 通用 ===
INTENT_CLASSIFIER_PROMPT = """请判断以下用户输入的意图类型：

可选类型：content, analysis, support, other
用户输入：{input}

只输出类型名称，不要解释。"""

ERROR_RESPONSE_PROMPT = """抱歉，处理你的请求时遇到了问题：
{error}

请尝试：
1. 检查输入是否完整
2. 换一个方式描述你的需求
3. 如果问题持续存在，请联系管理员"""
```

---

## 四、Phase 3：添加工具和MCP服务器

工具是Agent的"手脚"，让Agent能够与外部世界交互。

### 4.1 自定义工具模板

```python
# src/tools/search_tool.py
"""搜索工具 - 从网络获取信息"""
import httpx
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential
from src.utils.logger import setup_logger

logger = setup_logger("search_tool")


class SearchTool:
    """网络搜索工具"""

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key
        self.base_url = "https://api.example.com/search"  # 替换为实际API

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        reraise=True
    )
    def search(self, query: str, num_results: int = 5) -> list[dict]:
        """执行搜索

        Args:
            query: 搜索关键词
            num_results: 返回结果数量

        Returns:
            搜索结果列表
        """
        logger.info(f"执行搜索: {query}")

        headers = {}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        params = {"q": query, "num": num_results}

        response = httpx.get(self.base_url, params=params, headers=headers,
                            timeout=10)
        response.raise_for_status()

        data = response.json()
        results = self._parse_results(data)
        logger.info(f"搜索完成，返回 {len(results)} 条结果")
        return results

    def _parse_results(self, data: dict) -> list[dict]:
        """解析搜索结果"""
        # 根据实际API响应格式调整
        return [
            {
                "title": item.get("title", ""),
                "snippet": item.get("snippet", ""),
                "url": item.get("url", ""),
            }
            for item in data.get("results", [])[:5]
        ]
```

### 4.2 MCP服务器模板

```python
# mcp_servers/server_a.py
"""MCP服务器示例 - 提供天气数据"""
from mcp.server.fastmcp import FastMCP
import httpx

mcp = FastMCP("WeatherServer")


@mcp.tool()
async def get_weather(city: str) -> str:
    """获取指定城市的天气信息

    Args:
        city: 城市名称
    """
    api_key = "your-api-key"
    url = f"https://api.weather.com/v1/location/{city}/observations"

    async with httpx.AsyncClient() as client:
        response = await client.get(
            url,
            params={"apiKey": api_key, "language": "zh-CN"},
            timeout=10
        )
        response.raise_for_status()
        data = response.json()

    return (
        f"城市: {city}\n"
        f"温度: {data['temperature']}°C\n"
        f"天气: {data['condition']}\n"
        f"湿度: {data['humidity']}%\n"
        f"风速: {data['wind_speed']}km/h"
    )


@mcp.tool()
async def get_weather_forecast(city: str, days: int = 3) -> str:
    """获取指定城市的天气预报

    Args:
        city: 城市名称
        days: 预报天数（1-7）
    """
    # 实现天气预报获取逻辑
    pass


if __name__ == "__main__":
    mcp.run()
```

### 4.3 工具注册与Agent集成

```python
# 在Agent中注册和使用工具
from src.tools.search_tool import SearchTool


class EnhancedContentAgent:
    """增强版内容Agent - 集成搜索工具"""

    def __init__(self):
        self.client = OpenAI(...)
        self.search_tool = SearchTool()

    def generate_with_research(self, topic: str) -> dict:
        """带资料搜集的内容生成"""
        # 步骤1：搜索相关资料
        search_results = self.search_tool.search(topic, num_results=5)
        context = "\n".join(
            f"- {r['title']}: {r['snippet']}" for r in search_results
        )

        # 步骤2：基于搜集的资料生成内容
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {"role": "user", "content": f"""
主题：{topic}
参考资料：
{context}

请基于以上资料撰写一篇营销文章。
"""}
        ]

        response = self.client.chat.completions.create(
            model=self.model, messages=messages
        )

        return {
            "success": True,
            "content": response.choices[0].message.content,
            "sources": search_results
        }
```

---

## 五、Phase 4：多Agent协作

当任务复杂到单个Agent无法完成时，需要引入多Agent协作。

### 5.1 多Agent协作模式

```
模式A：流水线模式（Pipeline）
┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
│ Agent A│───→│ Agent B│───→│ Agent C│───→│  输出   │
│ 搜集资料│    │ 撰写文章│    │ 审核修改│    │        │
└────────┘    └────────┘    └────────┘    └────────┘

模式B：星型模式（Hub & Spoke）
              ┌─────────────┐
              │  协调器Agent  │
              └──┬───┬───┬──┘
                 │   │   │
            ┌────┘   │   └────┐
            ▼         ▼        ▼
        ┌──────┐ ┌──────┐ ┌──────┐
        │AgentA│ │AgentB│ │AgentC│
        └──────┘ └──────┘ └──────┘

模式C：辩论模式（Debate）
┌────────┐          ┌────────┐
│Agent A │ ←─────→  │Agent B │
│ 正方观点│  多轮辩论  │ 反方观点│
└───┬────┘          └───┬────┘
    └─────────┬─────────┘
              ▼
        ┌──────────┐
        │ 裁判Agent │
        │ 综合判断   │
        └──────────┘
```

### 5.2 流水线模式实现

```python
# src/agents/pipeline.py
"""流水线模式 - 多Agent顺序协作"""
from src.utils.logger import setup_logger

logger = setup_logger("pipeline")


class ContentPipeline:
    """内容生成流水线"""

    def __init__(self):
        from src.agents.worker_a import ResearchAgent, WriterAgent, ReviewerAgent
        self.researcher = ResearchAgent()
        self.writer = WriterAgent()
        self.reviewer = ReviewerAgent()

    def run(self, topic: str, style: str = "专业") -> dict:
        """执行完整的内容生成流水线

        步骤：
        1. ResearchAgent 搜集资料
        2. WriterAgent 撰写草稿
        3. ReviewerAgent 审核修改
        """
        logger.info(f"启动内容生成流水线 - 主题: {topic}")

        # Step 1: 资料搜集
        logger.info("Step 1: 资料搜集中...")
        research_result = self.researcher.search(topic)
        if not research_result["success"]:
            return research_result
        context = research_result["data"]

        # Step 2: 撰写草稿
        logger.info("Step 2: 撰写草稿中...")
        draft_result = self.writer.write(topic, context, style)
        if not draft_result["success"]:
            return draft_result
        draft = draft_result["content"]

        # Step 3: 审核修改
        logger.info("Step 3: 审核修改中...")
        review_result = self.reviewer.review(draft)
        if not review_result["success"]:
            return review_result

        logger.info("内容生成流水线完成")
        return {
            "success": True,
            "content": review_result["content"],
            "research_summary": context,
            "draft": draft,
            "review_comments": review_result["comments"]
        }
```

### 5.3 使用CrewAI框架的多Agent协作

```python
# 如果使用CrewAI框架
from crewai import Agent, Task, Crew
from crewai.tools import tool


@tool
def search_tool(query: str) -> str:
    """搜索工具"""
    # 实现搜索逻辑
    return f"搜索结果: {query}"


# 定义Agent
researcher = Agent(
    role="研究员",
    goal="搜集关于{topic}的最新信息和数据",
    backstory="你是资深行业研究员，擅长快速搜集和整理信息",
    tools=[search_tool],
    verbose=True
)

writer = Agent(
    role="内容创作者",
    goal="基于研究资料撰写高质量文章",
    backstory="你是资深内容创作者，擅长将复杂信息转化为易懂的文章",
    verbose=True
)

reviewer = Agent(
    role="编辑审核",
    goal="审核文章质量并提出改进建议",
    backstory="你是资深编辑，对内容质量有严格要求",
    verbose=True
)

# 定义Task
research_task = Task(
    description="搜集关于{topic}的最新信息，包括行业趋势、关键数据和专家观点",
    expected_output="一份包含关键信息点的研究报告",
    agent=researcher
)

writing_task = Task(
    description="基于研究报告撰写一篇关于{topic}的营销文章",
    expected_output="一篇结构完整、内容充实的营销文章",
    agent=writer,
    context=[research_task]
)

review_task = Task(
    description="审核文章质量，检查准确性、可读性和吸引力",
    expected_output="审核后的终稿和改进建议",
    agent=reviewer,
    context=[writing_task]
)

# 组建Crew
crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, writing_task, review_task],
    verbose=True,
    process="sequential"  # 顺序执行
)

# 执行
result = crew.kickoff(inputs={"topic": "AI在营销中的应用"})
```

---

## 六、Phase 5：添加记忆和持久化

### 6.1 短期记忆（对话历史）

```python
# src/memory/short_term.py
"""短期记忆 - 维护当前对话上下文"""
from collections import deque
from typing import Optional


class ShortTermMemory:
    """基于滑动窗口的短期记忆"""

    def __init__(self, max_messages: int = 20):
        self.messages: deque = deque(maxlen=max_messages)

    def add(self, role: str, content: str):
        """添加一条消息"""
        self.messages.append({"role": role, "content": content})

    def get_messages(self) -> list[dict]:
        """获取所有消息（用于LLM上下文）"""
        return list(self.messages)

    def get_recent(self, n: int = 5) -> list[dict]:
        """获取最近n条消息"""
        return list(self.messages)[-n:]

    def clear(self):
        """清空记忆"""
        self.messages.clear()

    def get_summary(self) -> str:
        """生成对话摘要"""
        if not self.messages:
            return "无对话历史"
        return "对话历史（最近{}条）:\n{}".format(
            len(self.messages),
            "\n".join(f"[{m['role']}] {m['content'][:100]}"
                      for m in self.messages[-5:])
        )
```

### 6.2 长期记忆（向量存储）

```python
# src/memory/long_term.py
"""长期记忆 - 使用向量数据库存储和检索"""
import chromadb
from chromadb.utils import embedding_functions
from src.config import config
from src.utils.logger import setup_logger
from typing import Optional

logger = setup_logger("long_term_memory")


class LongTermMemory:
    """基于ChromaDB的长期记忆"""

    def __init__(self, collection_name: str = "default"):
        self.client = chromadb.PersistentClient(
            path=config.vector_store_path
        )
        self.embedding_fn = embedding_functions.DefaultEmbeddingFunction()
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.embedding_fn
        )

    def store(self, content: str, metadata: Optional[dict] = None) -> str:
        """存储一条记忆

        Args:
            content: 记忆内容
            metadata: 附加元数据

        Returns:
            记忆ID
        """
        import uuid
        memory_id = str(uuid.uuid4())

        self.collection.add(
            documents=[content],
            metadatas=[metadata or {}],
            ids=[memory_id]
        )
        logger.info(f"存储记忆: {memory_id}")
        return memory_id

    def search(self, query: str, n_results: int = 3) -> list[dict]:
        """搜索相关记忆

        Args:
            query: 查询内容
            n_results: 返回数量

        Returns:
            相关记忆列表
        """
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )

        memories = []
        for i in range(len(results["ids"][0])):
            memories.append({
                "id": results["ids"][0][i],
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i]
            })

        logger.info(f"搜索记忆: 找到 {len(memories)} 条相关结果")
        return memories

    def delete(self, memory_id: str):
        """删除指定记忆"""
        self.collection.delete(ids=[memory_id])
        logger.info(f"删除记忆: {memory_id}")
```

### 6.3 持久化存储（SQLite）

```python
# src/memory/persistence.py
"""持久化存储 - 使用SQLite保存会话记录"""
import sqlite3
import json
from datetime import datetime
from src.config import config
from src.utils.logger import setup_logger

logger = setup_logger("persistence")


class PersistenceStore:
    """SQLite持久化存储"""

    def __init__(self, db_path: str = None):
        self.db_path = db_path or config.memory_db_path
        self._init_db()

    def _init_db(self):
        """初始化数据库表"""
        import os
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)

        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS sessions (
                    id TEXT PRIMARY KEY,
                    created_at TEXT NOT NULL,
                    topic TEXT,
                    status TEXT DEFAULT 'active'
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS messages (
                    id TEXT PRIMARY KEY,
                    session_id TEXT NOT NULL,
                    role TEXT NOT NULL,
                    content TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    FOREIGN KEY (session_id) REFERENCES sessions(id)
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS task_results (
                    id TEXT PRIMARY KEY,
                    session_id TEXT NOT NULL,
                    task_type TEXT,
                    result TEXT,
                    created_at TEXT NOT NULL,
                    FOREIGN KEY (session_id) REFERENCES sessions(id)
                )
            """)
            conn.commit()

    def create_session(self, session_id: str, topic: str = "") -> None:
        """创建新会话"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO sessions (id, created_at, topic) VALUES (?, ?, ?)",
                (session_id, datetime.now().isoformat(), topic)
            )

    def save_message(self, session_id: str, role: str, content: str) -> str:
        """保存消息"""
        import uuid
        msg_id = str(uuid.uuid4())
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO messages (id, session_id, role, content, created_at) VALUES (?, ?, ?, ?, ?)",
                (msg_id, session_id, role, content, datetime.now().isoformat())
            )
        return msg_id

    def get_session_messages(self, session_id: str, limit: int = 50) -> list[dict]:
        """获取会话消息"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT role, content, created_at FROM messages WHERE session_id = ? ORDER BY created_at DESC LIMIT ?",
                (session_id, limit)
            )
            return [
                {"role": row[0], "content": row[1], "created_at": row[2]}
                for row in cursor.fetchall()
            ]

    def save_result(self, session_id: str, task_type: str, result: dict) -> str:
        """保存任务结果"""
        import uuid
        result_id = str(uuid.uuid4())
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO task_results (id, session_id, task_type, result, created_at) VALUES (?, ?, ?, ?, ?)",
                (result_id, session_id, task_type, json.dumps(result, ensure_ascii=False), datetime.now().isoformat())
            )
        return result_id
```

---

## 七、入口文件

```python
# src/main.py
"""应用入口"""
import argparse
import sys
from src.config import config
from src.utils.logger import setup_logger
from src.agents.coordinator import Coordinator

logger = setup_logger("main")


def main():
    """主入口"""
    parser = argparse.ArgumentParser(description="AI Agent 项目")
    parser.add_argument("--query", "-q", type=str, help="查询内容")
    parser.add_argument("--style", "-s", type=str, default="专业",
                        choices=["专业", "轻松", "营销向"], help="内容风格")
    parser.add_argument("--intent", "-i", type=str, default="content",
                        help="意图类型")

    args = parser.parse_args()

    # 验证配置
    missing = config.validate()
    if missing:
        logger.error(f"缺少必需配置: {', '.join(missing)}")
        logger.error("请复制 .env.example 为 .env 并填写配置")
        sys.exit(1)

    # 初始化协调器
    coordinator = Coordinator()

    if args.query:
        # 命令行模式
        result = coordinator.route(
            user_input=args.query,
            intent=args.intent,
            style=args.style
        )

        if result.get("success"):
            print("\n" + "=" * 50)
            print(result.get("content", "处理完成"))
            print("=" * 50)
        else:
            print(f"处理失败: {result.get('error')}")
            sys.exit(1)
    else:
        # 交互模式
        print("AI Agent 已就绪。输入 'quit' 退出。")
        print("-" * 40)

        while True:
            try:
                query = input("\n> ").strip()
                if query.lower() in ("quit", "exit", "q"):
                    print("再见！")
                    break
                if not query:
                    continue

                result = coordinator.route(
                    user_input=query,
                    intent=args.intent,
                    style=args.style
                )

                if result.get("success"):
                    print(result.get("content", "处理完成"))
                else:
                    print(f"处理失败: {result.get('error')}")

            except KeyboardInterrupt:
                print("\n再见！")
                break
            except Exception as e:
                logger.error(f"未处理的异常: {e}")
                print(f"发生错误: {e}")


if __name__ == "__main__":
    main()
```

---

## 八、动手练习

### 练习1：搭建项目骨架

```bash
# 1. 创建项目目录
mkdir -p my-project/{src/{agents,tools,memory,prompts,utils},mcp_servers,tests,data,logs}

# 2. 初始化Git
cd my-project && git init

# 3. 创建虚拟环境
python -m venv .venv && source .venv/bin/activate  # Linux/Mac
# 或
python -m venv .venv && .venv\Scripts\activate      # Windows

# 4. 安装依赖
pip install openai python-dotenv langchain chromadb httpx tenacity rich

# 5. 复制.env.example并填写
cp .env.example .env
# 编辑 .env，填入你的API密钥
```

### 练习2：实现核心Agent

1. 填写 `src/config.py`（参考上面的模板）
2. 实现你的核心Agent（参考 Phase 2 的代码）
3. 运行测试：`python -m src.main --query "测试输入"`

### 练习3：添加一个自定义工具

1. 在 `src/tools/` 下创建你的工具文件
2. 实现工具的调用逻辑（注意添加重试和错误处理）
3. 在Agent中集成并测试该工具

### 练习4：实现记忆系统

1. 初始化短期记忆，测试对话上下文维护
2. 初始化长期记忆（ChromaDB），测试存储和检索
3. 将记忆集成到Agent的对话流程中

---

## 九、开发阶段 Checklist

### Phase 1: 项目基础
- [ ] 项目目录结构创建完成
- [ ] .env 文件配置完毕，所有API密钥就绪
- [ ] requirements.txt 依赖安装成功
- [ ] config.py 能正确读取环境变量
- [ ] logger.py 能输出日志到控制台和文件
- [ ] validators.py 能验证输入

### Phase 2: 核心Agent
- [ ] 核心Agent类实现，能接收输入并返回输出
- [ ] Prompt模板管理就绪
- [ ] 协调器/路由器实现
- [ ] 错误处理完整（API调用失败、超时等）
- [ ] 命令行和交互模式均可运行

### Phase 3: 工具和MCP
- [ ] 至少2个自定义工具实现
- [ ] 工具包含重试机制和超时控制
- [ ] Agent能正确调用工具
- [ ] （可选）MCP服务器运行正常

### Phase 4: 多Agent协作
- [ ] 多Agent协作模式确定（流水线/星型/辩论）
- [ ] Agent之间能正确传递数据
- [ ] 每个Agent的职责清晰不重叠
- [ ] 流水线异常能正确中断和报告

### Phase 5: 记忆和持久化
- [ ] 短期记忆能维护对话上下文
- [ ] 长期记忆能存储和检索相关信息
- [ ] SQLite持久化存储会话记录
- [ ] 重启后能恢复之前的会话状态

---

## 小结

本课完成了以下工作：

1. **搭建了完整的项目骨架**：标准化的目录结构、配置管理、日志系统、输入验证。
2. **实现了核心Agent**：单Agent的基本实现、Prompt模板管理、协调器/路由器。
3. **集成了工具和MCP**：自定义工具模板（含重试和错误处理）、MCP服务器示例。
4. **实现了多Agent协作**：三种协作模式（流水线、星型、辩论）的代码实现。
5. **添加了记忆系统**：短期记忆（滑动窗口）、长期记忆（ChromaDB向量存储）、持久化（SQLite）。

**关键要点：**
- 从简单开始：先让单个Agent跑通，再逐步添加工具和多Agent。
- 每个阶段都要测试通过后再进入下一阶段。
- 错误处理和日志记录从第一天就要做好，后期排查问题会节省大量时间。

**下一步：** 进入 5.3 中期评审，检查开发进度，诊断和修复常见问题。
