Python FastMCP实现详解:MCP实践、SSE与STDIO通信模式全解析

Python FastMCP实现MCP实践全解析:SSE与STDIO通信模式详解

一、MCP简介

MCP(Model Context Protocol,模型上下文协议)是由Anthropic公司于2024年推出的开放标准,旨在统一AI模型与外部数据源、工具之间的通信方式。MCP提供了一套规范化的接口,使大语言模型(LLM)能够更便捷地与各类外部工具和资源进行交互。

MCP的核心价值在于:

  1. 标准化接口:为AI模型提供统一的工具发现和使用接口
  2. 安全通信:定义了一套安全的模型与工具之间的通信协议
  3. 跨平台兼容:适用于不同AI提供商的模型
  4. 提升AI能力:让AI能够访问最新数据和执行复杂操作

MCP采用客户端-服务器架构,其中:

  • MCP Host:使用AI的应用程序,如Claude客户端、Cursor等集成了大语言模型的应用
  • MCP Client:与MCP Server建立连接的组件,负责处理通信细节
  • MCP Server:集成外部数据源并提供功能接口的组件,作为AI模型与外部系统之间的连接器
  • 二、FastMCP简介

    FastMCP是一个基于Python的高级框架,专为构建MCP服务器而设计。它极大简化了MCP服务器的开发流程,让开发者能够以最小的代码量创建功能强大的MCP服务器。

    FastMCP的主要特点包括:

    1. 简洁的API:通过装饰器模式,简化MCP服务器的创建
    2. 丰富的功能:支持工具(Tools)、资源(Resources)、提示模板(Prompts)等MCP核心元素
    3. 多种传输方式:支持stdio和SSE等不同传输协议
    4. 类型安全:利用Python的类型提示,自动生成MCP协议所需的模式定义
    5. 内置图像处理:支持图像数据的自动格式转换和处理

    使用FastMCP,开发者可以专注于业务逻辑,而不必过多关注底层协议细节。

    三、MCP中SSE和STDIO的区别

    MCP服务端当前支持两种与客户端的数据通信方式:标准输入输出(stdio)和基于HTTP的服务器推送事件(SSE)。这两种方式有着明显的区别和各自的适用场景。

    3.1 标准输入输出(STDIO)

    原理:

    STDIO是一种用于本地通信的传输方式。在这种模式下,MCP客户端会将服务器程序作为子进程启动,双方通过标准输入和标准输出进行数据交换。具体而言:

  • 客户端通过标准输入(stdin)向服务器发送请求
  • 服务器通过标准输出(stdout)返回响应
  • 服务器可以通过标准错误(stderr)输出日志和错误信息
  • 特点:

  • 低延迟:本地进程间通信,无网络开销
  • 简单直接:不需要网络配置和端口管理
  • 安全性高:通信限制在本地进程内,无需考虑网络安全
  • 适合单会话:一个客户端对应一个服务器实例
  • 适用场景:

  • 客户端和服务器在同一台机器上运行的场景
  • 需要本地访问文件系统或设备的工具
  • 对延迟敏感的应用
  • 开发测试阶段
  • 3.2 服务器推送事件(SSE)

    原理:

    SSE(Server-Sent Events)是一种基于HTTP的单向通信技术,允许服务器向客户端推送数据。在MCP中,SSE实现了:

  • 客户端通过HTTP POST请求发送数据到服务器
  • 服务器通过持久的HTTP连接向客户端推送事件和数据
  • 通信基于标准的HTTP协议,便于在网络环境中部署
  • 特点:

  • 网络传输:基于HTTP协议,可跨网络通信
  • 多客户端支持:单个服务器实例可同时服务多个客户端
  • 易于部署:可部署在云服务、容器或微服务架构中
  • 扩展性好:适合分布式环境
  • 适用场景:

  • 客户端和服务器位于不同物理位置
  • 需要支持多客户端连接
  • 需要远程访问的服务
  • 生产环境和企业级应用
  • 3.3 SSE与STDIO的详细比较

    | 特性 | STDIO | SSE |

    |——|——-|—–|

    | 通信方式 | 进程间通信 | HTTP网络通信 |

    | 部署位置 | 本地 | 本地或远程 |

    | 客户端数量 | 单客户端 | 多客户端 |

    | 延迟 | 极低 | 受网络影响 |

    | 安全性考虑 | 本地安全 | 需考虑网络安全 |

    | 可扩展性 | 有限 | 高 |

    | 运行方式 | 作为子进程 | 作为网络服务 |

    | 适用环境 | 开发环境、桌面应用 | 生产环境、云服务、分布式系统 |

    四、Python FastMCP实现STDIO方式

    4.1 基本示例

    下面是一个使用FastMCP实现STDIO通信方式的基本示例,创建一个简单的计算器工具:

    # calculator_stdio.py
    from fastmcp import FastMCP
    
    # 创建MCP服务器实例
    mcp = FastMCP("Calculator")
    
    @mcp.tool()
    def add(a: int, b: int) -> int:
        """将两个数字相加"""
        return a + b
    
    @mcp.tool()
    def subtract(a: int, b: int) -> int:
        """从第一个数中减去第二个数"""
        return a - b
    
    @mcp.tool()
    def multiply(a: int, b: int) -> int:
        """将两个数相乘"""
        return a * b
    
    @mcp.tool()
    def divide(a: float, b: float) -> float:
        """将第一个数除以第二个数"""
        if b == 0:
            raise ValueError("除数不能为零")
        return a / b
    
    if __name__ == "__main__":
        # 使用stdio传输方式启动服务器
        mcp.run(transport="stdio")
    

    在这个示例中,我们:

    1. 导入了FastMCP
    2. 创建了一个名为"Calculator"的MCP服务器实例
    3. 使用@mcp.tool()装饰器定义了四个计算工具
    4. 通过mcp.run(transport="stdio")以STDIO模式启动服务器

    4.2 运行和使用

    运行STDIO模式的MCP服务器:

    python calculator_stdio.py
    

    在这种模式下,服务器将等待标准输入上的请求,然后通过标准输出返回响应。通常,这种模式的服务器会与特定的MCP客户端集成,如Claude Desktop。

    要将此服务器注册到Claude Desktop,可以使用FastMCP的CLI工具:

    fastmcp install calculator_stdio.py
    

    五、Python FastMCP实现SSE方式

    5.1 基本示例

    下面是使用FastMCP实现SSE通信方式的示例,创建一个简单的天气服务:

    # weather_sse.py
    from fastmcp import FastMCP
    import random
    
    # 创建MCP服务器实例,指定端口
    mcp = FastMCP("Weather Service", port=8000)
    
    # 模拟的天气数据
    weather_data = {
        "New York": {"temp": range(10, 25), "conditions": ["sunny", "cloudy", "rainy"]},
        "London": {"temp": range(5, 20), "conditions": ["cloudy", "rainy", "foggy"]},
        "Tokyo": {"temp": range(15, 30), "conditions": ["sunny", "cloudy", "humid"]},
        "Sydney": {"temp": range(20, 35), "conditions": ["sunny", "clear", "hot"]},
    }
    
    @mcp.tool()
    def get_weather(city: str) -> dict:
        """获取指定城市的当前天气"""
        if city not in weather_data:
            return {"error": f"无法找到城市 {city} 的天气数据"}
        
        data = weather_data[city]
        temp = random.choice(list(data["temp"]))
        condition = random.choice(data["conditions"])
        
        return {
            "city": city,
            "temperature": temp,
            "condition": condition,
            "unit": "celsius"
        }
    
    @mcp.resource("weather://cities")
    def get_available_cities() -> list:
        """获取所有可用的城市列表"""
        return list(weather_data.keys())
    
    @mcp.resource("weather://forecast/{city}")
    def get_forecast(city: str) -> dict:
        """获取指定城市的天气预报资源"""
        if city not in weather_data:
            return {"error": f"无法找到城市 {city} 的天气预报"}
        
        forecast = []
        for i in range(5):  # 5天预报
            data = weather_data[city]
            temp = random.choice(list(data["temp"]))
            condition = random.choice(data["conditions"])
            forecast.append({
                "day": i + 1,
                "temperature": temp,
                "condition": condition
            })
        
        return {
            "city": city,
            "forecast": forecast,
            "unit": "celsius"
        }
    
    if __name__ == "__main__":
        # 使用SSE传输方式启动服务器
        mcp.run(transport="sse")
    

    在这个示例中:

    1. 我们创建了一个名为"Weather Service"的MCP服务器,并指定了端口为8000
    2. 定义了一个get_weather工具函数,用于获取城市的当前天气
    3. 添加了两个资源:一个返回所有可用城市的列表,另一个返回指定城市的天气预报
    4. 通过mcp.run(transport="sse")以SSE模式启动服务器

    5.2 运行和使用

    运行SSE模式的MCP服务器:

    python weather_sse.py
    

    服务器将在指定端口(本例中为8000)启动,并监听HTTP连接。您可以通过浏览器访问:

    http://localhost:8000/sse
    

    要与此服务器交互,可以使用支持SSE传输的MCP客户端,或者使用如下Python代码创建一个简单的客户端:

    # sse_client.py
    import asyncio
    from mcp import ClientSession
    from mcp.client.sse import sse_client
    
    async def main():
        # 连接到SSE服务器
        async with sse_client(url="http://localhost:8000/sse") as streams:
            async with ClientSession(*streams) as session:
                # 初始化会话
                await session.initialize()
                
                # 列出可用工具
                tools_response = await session.list_tools()
                print("Available tools:")
                for tool in tools_response.tools:
                    print(f" - {tool.name}: {tool.description}")
                
                # 列出可用资源
                resources_response = await session.list_resources()
                print("\nAvailable resources:")
                for resource in resources_response.resources:
                    print(f" - {resource.uri}: {resource.description}")
                
                # 调用天气工具
                print("\nCalling get_weather tool for London...")
                weather_response = await session.call_tool("get_weather", {"city": "London"})
                print(weather_response.content[0].text)
                
                # 读取资源
                print("\nReading weather://cities resource...")
                cities_response = await session.read_resource("weather://cities")
                print(cities_response[0].content)
                
                # 读取带参数的资源
                print("\nReading weather forecast for Tokyo...")
                forecast_response = await session.read_resource("weather://forecast/Tokyo")
                print(forecast_response[0].content)
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    六、详细代码示例:数据库查询工具

    下面是一个更复杂的示例,创建一个MySQL数据库查询工具,同时支持SSE和STDIO两种传输方式:

    6.1 STDIO版本

    # mysql_stdio.py
    from fastmcp import FastMCP
    from mysql.connector import connect, Error
    from dotenv import load_dotenv
    import os
    import logging
    
    # 配置日志
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    logger = logging.getLogger(__name__)
    
    # 创建MCP服务器
    mcp = FastMCP("MySQL Query Tool")
    
    def get_db_config():
        """从环境变量获取数据库配置"""
        # 加载.env文件
        load_dotenv()
        
        config = {
            "host": os.getenv("MYSQL_HOST", "localhost"),
            "port": int(os.getenv("MYSQL_PORT", "3306")),
            "user": os.getenv("MYSQL_USER"),
            "password": os.getenv("MYSQL_PASSWORD"),
            "database": os.getenv("MYSQL_DATABASE"),
        }
        
        if not all([config["user"], config["password"], config["database"]]):
            raise ValueError("缺少必需的数据库配置")
        
        return config
    
    @mcp.tool()
    def execute_sql(query: str) -> str:
        """执行SQL查询语句
        
        参数:
            query (str): 要执行的SQL语句,支持多条语句以分号分隔
        
        返回:
            str: 查询结果,格式化为可读的文本
        """
        config = get_db_config()
        logger.info(f"执行SQL查询: {query}")
        
        try:
            with connect(**config) as conn:
                with conn.cursor() as cursor:
                    statements = [stmt.strip() for stmt in query.split(";") if stmt.strip()]
                    results = []
                    
                    for statement in statements:
                        try:
                            cursor.execute(statement)
                            
                            # 检查语句是否返回了结果集
                            if cursor.description:
                                columns = [desc[0] for desc in cursor.description]
                                rows = cursor.fetchall()
                                
                                # 格式化输出
                                result = [" | ".join(columns)]
                                result.append("-" * len(result[0]))
                                
                                for row in rows:
                                    formatted_row = [str(val) if val is not None else "NULL" for val in row]
                                    result.append(" | ".join(formatted_row))
                                
                                results.append("\n".join(result))
                            else:
                                conn.commit()  # 提交非查询语句
                                results.append(f"查询执行成功。影响行数: {cursor.rowcount}")
                        
                        except Error as stmt_error:
                            results.append(f"执行语句 '{statement}' 出错: {str(stmt_error)}")
                    
                    return "\n\n".join(results)
        
        except Error as e:
            error_msg = f"执行SQL '{query}' 时出错: {e}"
            logger.error(error_msg)
            return error_msg
    
    @mcp.tool()
    def get_table_structure(table_name: str) -> str:
        """获取指定表的结构信息
        
        参数:
            table_name (str): 表名
        
        返回:
            str: 表结构信息,包含字段名、类型、是否为NULL和默认值
        """
        query = f"DESCRIBE {table_name};"
        return execute_sql(query)
    
    @mcp.tool()
    def list_tables() -> str:
        """列出数据库中的所有表"""
        query = "SHOW TABLES;"
        return execute_sql(query)
    
    @mcp.resource("db://tables")
    def get_tables_resource() -> list:
        """获取数据库中的所有表名作为资源"""
        config = get_db_config()
        
        try:
            with connect(**config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("SHOW TABLES;")
                    return [row[0] for row in cursor.fetchall()]
        except Error as e:
            logger.error(f"获取表列表时出错: {e}")
            return []
    
    @mcp.resource("db://schema/{table}")
    def get_table_schema(table: str) -> dict:
        """获取指定表的模式定义作为资源"""
        config = get_db_config()
        
        try:
            with connect(**config) as conn:
                with conn.cursor() as cursor:
                    # 获取表结构
                    cursor.execute(f"DESCRIBE {table};")
                    columns = cursor.fetchall()
                    
                    schema = {
                        "table": table,
                        "columns": []
                    }
                    
                    for col in columns:
                        schema["columns"].append({
                            "name": col[0],
                            "type": col[1],
                            "nullable": col[2] == "YES",
                            "key": col[3],
                            "default": col[4],
                            "extra": col[5]
                        })
                    
                    return schema
        except Error as e:
            logger.error(f"获取表 {table} 的模式时出错: {e}")
            return {"error": str(e)}
    
    if __name__ == "__main__":
        logger.info("启动MySQL查询工具 (STDIO模式)")
        mcp.run(transport="stdio")
    

    6.2 SSE版本

    只需修改最后的运行部分即可将上述示例转换为SSE模式:

    if __name__ == "__main__":
        logger.info("启动MySQL查询工具 (SSE模式)")
        # 指定端口为9000
        mcp.port = 9000
        mcp.run(transport="sse")
    

    6.3 创建.env文件

    为了使上述MySQL示例工作,需要创建一个.env文件,包含数据库连接信息:

    # MySQL数据库配置
    MYSQL_HOST=localhost
    MYSQL_PORT=3306
    MYSQL_USER=root
    MYSQL_PASSWORD=your_password
    MYSQL_DATABASE=your_database
    

    七、总结

    Python FastMCP为开发者提供了一种简单高效的方式来构建MCP服务器,无论是使用STDIO还是SSE传输方式。下面是选择合适传输方式的简要指南:

    何时选择STDIO:

  • 开发本地工具和集成时
  • 需要低延迟的场景
  • 与Claude Desktop等桌面应用集成时
  • 开发和测试阶段
  • 何时选择SSE:

  • 部署到服务器或云环境时
  • 需要支持多客户端的场景
  • 构建分布式AI系统时
  • 生产环境部署
  • 在实际开发中,您可能会发现先使用STDIO模式进行本地开发和测试,然后转换为SSE模式进行生产部署是一种常见的工作流程。FastMCP的设计使这种转换变得非常简单,只需更改一行代码即可。

    通过本文介绍的示例和概念,您应该能够开始使用Python FastMCP构建自己的MCP服务器,无论是简单的计算工具还是复杂的数据库查询接口。这将使您的AI应用能够无缝集成各种外部工具和数据源,大大提升其功能和实用性。

    作者:lingding_cn

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python FastMCP实现详解:MCP实践、SSE与STDIO通信模式全解析

    发表回复