服务端 - Sanic 异步效率和架构
2025-06-11 15:11:06 0 举报
AI智能生成
Sanic是一个高性能的Python框架,专门设计用于异步HTTP请求,为基于Web的应用程序提供了杰出的处理速度和效率。 使用非阻塞驱动作为后端,利用Python协程,Sanic能够突破传统同步框架的性能限制,特别适合构建需要高并发和快速响应的应用。 分析了完善的中间件功能,事件信号系统。
作者其他创作
大纲/内容
特点
内置支持 ASGI:
遵循 ASGI 规范,可以运行在任何兼容 ASGI 的服务器上(如 uvicorn, hypercorn, daphne)。
内置了高性能的生产级服务器(基于 sanic.worker 和 uvloop),开箱即用,无需额外配置 WSGI 服务器(如 uWSGI, Gunicorn)。
内置了高性能的生产级服务器(基于 sanic.worker 和 uvloop),开箱即用,无需额外配置 WSGI 服务器(如 uWSGI, Gunicorn)。
极致的性能:
这是 Sanic 最突出的特点。它使用 uvloop(一个基于 libuv 的快速事件循环替代品)作为默认事件循环,显著提升了 asyncio 的性能。
其简洁高效的设计(无全局请求对象、直接传递请求/响应对象)减少了开销。
其简洁高效的设计(无全局请求对象、直接传递请求/响应对象)减少了开销。
异步的支持:
从底层到 API 设计都围绕 async/await 构建。允许你编写非阻塞的视图函数、中间件、信号处理器等。
能够轻松集成其他异步库(如 asyncpg 访问 PostgreSQL, aiohttp 发起 HTTP 请求, aioredis 访问 Redis 等),避免阻塞事件循环。
能够轻松集成其他异步库(如 asyncpg 访问 PostgreSQL, aiohttp 发起 HTTP 请求, aioredis 访问 Redis 等),避免阻塞事件循环。
简洁直观的 API:
设计理念类似于 Flask,学习曲线相对平缓。
路由定义清晰(@app.route 或 @app.get, @app.post 等装饰器)。
支持基于类的视图,提供更好的代码组织方式,特别是对于 RESTful API 资源。
请求 (request) 和响应 (response) 对象作为参数直接传递给处理函数,易于操作。
路由定义清晰(@app.route 或 @app.get, @app.post 等装饰器)。
支持基于类的视图,提供更好的代码组织方式,特别是对于 RESTful API 资源。
请求 (request) 和响应 (response) 对象作为参数直接传递给处理函数,易于操作。
强大的中间件:
支持请求前 (request) 和响应后 (response) 中间件。
中间件也可以是异步函数,方便实现认证、日志、CORS、请求/响应修改等全局逻辑。
信号系统:
提供内置的信号机制(如 before_server_start, after_server_stop, http.routing.before, http.lifecycle.response 等)
允许在应用生命周期的特定时刻注入自定义逻辑。
ASGI vs WSGI
客户端请求 → ASGI服务器(uvicorn/hypercorn) → ASGI协议解码 → Sanic应用 → 异步视图处理
WSGI
之前Python Web框架主要遵循**WSGI (Web Server Gateway Interface)** 标准。
WSGI是一个同步的接口规范,定义了一个简单的同步调用约定:Web服务器接收请求,然后调用应用(一个可调用对象),应用处理请求并返回响应。
虽然WSGI为Python Web开发带来了统一性,但它本质上是同步阻塞的。如果一个视图函数在处理时需要等待I/O(如数据库查询、外部API调用),整个工作线程会被阻塞,无法处理其他请求,会导致性能瓶颈。
ASGI
**ASGI (Asynchronous Server Gateway Interface)** 应运而生。
ASGI是WSGI的精神继承者,但设计为**异步友好**。它允许应用处理异步事件,支持WebSocket、HTTP/2等协议,并且更适合长时间连接(如SSE、WebSocket)。
ASGI定义了一个异步调用约定。一个ASGI应用是一个异步函数(`async def`),它接收三个参数: scope, receive, send
请求对象
在视图函数中通过 request 对象访问请求信息 (URL 参数、Params、Headers、Body 等)
响应对象
使用response 模块中的函数创建并返回响应对象 (text, json, html, file, stream, redirect 等)
聊聊中间件
提供灵活的请求/响应处理管道,尤其擅长处理请求前(request)和响应后(response)两个关键阶段的中间件操作。
请求前中间件(Request Middleware)
仅使用Request 对象一个参数
在请求到达路由处理函数之前执行,常用于:
认证授权,访问控制
请求数据预处理
请求日志记录
请求数据预处理
请求日志记录
核心特性:
执行顺序:按注册顺序执行
中断能力:可直接返回响应,跳过后续处理
请求修改:可修改请求对象
中断能力:可直接返回响应,跳过后续处理
请求修改:可修改请求对象
案例
# 日志中间件
@app.middleware("request")
async def log_middleware(request: Request):
request.ctx.start_time = time.time()
print(f"{request.method} {request.path} from {request.ip}")
@app.middleware("request")
async def log_middleware(request: Request):
request.ctx.start_time = time.time()
print(f"{request.method} {request.path} from {request.ip}")
# 限流控制
from sanic.exceptions import TooManyRequests
@app.middleware("request")
async def rate_limit_middleware(request: Request):
ip = request.ip
current = await redis.incr(f"rate:{ip}")
if current == 1:
await redis.expire(f"rate:{ip}", 60)
if current > 100: # 每分钟100次
raise TooManyRequests("Rate limit exceeded")
from sanic.exceptions import TooManyRequests
@app.middleware("request")
async def rate_limit_middleware(request: Request):
ip = request.ip
current = await redis.incr(f"rate:{ip}")
if current == 1:
await redis.expire(f"rate:{ip}", 60)
if current > 100: # 每分钟100次
raise TooManyRequests("Rate limit exceeded")
其他
响应后中间件(Response Middleware)
使用Request, Response 两个参数
在路由处理函数生成响应后执行,常用于:
添加统一响应头
性能监控
错误格式标准化
性能监控
错误格式标准化
核心特性:
执行顺序:按注册顺序反向执行(洋葱模型)
响应修改:可修改响应对象
异常处理:即使路由抛出异常也会执行
响应修改:可修改响应对象
异常处理:即使路由抛出异常也会执行
案例
# 响应日志中间件
@app.middleware("response")
async def response_log_middleware(request: Request, response):
duration = (time.time() - request.ctx.start_time) * 1000
print(f"{request.method} {request.path} → {response.status} ({duration:.2f}ms)")
@app.middleware("response")
async def response_log_middleware(request: Request, response):
duration = (time.time() - request.ctx.start_time) * 1000
print(f"{request.method} {request.path} → {response.status} ({duration:.2f}ms)")
其他
中间件顺序管理建议
安全相关中间件最先注册
核心业务逻辑最后注册
核心业务逻辑最后注册
聊聊信号系统
定义
Sanic的信号系统提供了一种发布/订阅模型,允许开发者注册处理函数(订阅者)来响应框架内部发出的特定事件(信号)。
作用
Sanic的信号系统基于异步事件,允许开发者在应用生命周期的特定时刻注入自定义逻辑
信号系统是Sanic框架中用于解耦和扩展功能的重要机制。
这些信号在应用的不同阶段被触发,例如服务器启动前、服务器停止后、路由处理前等。
案例
before_server_start:服务器启动前
python@app.signal("server.init.before")
async def setup_db(app):
app.ctx.db = await create_db_pool()
app.ctx.redis = await create_redis_pool()
python@app.signal("server.init.before")
async def setup_db(app):
app.ctx.db = await create_db_pool()
app.ctx.redis = await create_redis_pool()
after_server_start:服务器启动后
@app.signal("server.init.after")
async def notify_ready(app):
await send_slack("Server is ready!")
await init_background_tasks(app)
@app.signal("server.init.after")
async def notify_ready(app):
await send_slack("Server is ready!")
await init_background_tasks(app)
before_server_stop:服务器停止前@app.signal("server.shutdown.before")
async def cleanup(app):
await app.ctx.db.close()
await app.ctx.redis.close()
async def cleanup(app):
await app.ctx.db.close()
await app.ctx.redis.close()
after_server_stop:服务器停止后
@app.signal("server.shutdown.after")
async def final_log(app):
logger.info("Server shutdown complete")
@app.signal("server.shutdown.after")
async def final_log(app):
logger.info("Server shutdown complete")
动态功能开关
@app.signal("http.handler.before")
async def feature_toggle(request):
# 根据请求头启用实验性功能
if request.headers.get("X-New-Feature") == "enabled":
request.ctx.use_new_algorithm = True
@app.signal("http.handler.before")
async def feature_toggle(request):
# 根据请求头启用实验性功能
if request.headers.get("X-New-Feature") == "enabled":
request.ctx.use_new_algorithm = True
响应缓存
@app.signal("http.handler.after")
async def cache_response(request, response):
if request.method == "GET" and response.status == 200:
await redis.set(
f"response:{request.path}",
response.body,
ex=300 # 缓存5分钟
)
@app.signal("http.handler.after")
async def cache_response(request, response):
if request.method == "GET" and response.status == 200:
await redis.set(
f"response:{request.path}",
response.body,
ex=300 # 缓存5分钟
)
跨请求上下文
@app.signal("http.lifecycle.complete")
async def global_metrics(request, response):
# 聚合所有请求的指标
METRICS["requests"].inc()
METRICS["status_codes"][response.status].inc()
@app.signal("http.lifecycle.complete")
async def global_metrics(request, response):
# 聚合所有请求的指标
METRICS["requests"].inc()
METRICS["status_codes"][response.status].inc()
架构组件
核心组件
信号(Signal):预定义或自定义的事件标识符(如 http.lifecycle.response)
发布者(Publisher):框架内部在关键节点触发信号的组件
订阅者(Subscriber):开发者注册的信号处理函数
事件总线(Event Bus):负责将信号路由到所有注册的监听器
发布者(Publisher):框架内部在关键节点触发信号的组件
订阅者(Subscriber):开发者注册的信号处理函数
事件总线(Event Bus):负责将信号路由到所有注册的监听器
核心方法
# 简化版信号分发实现
class Signal:
def __init__(self, name):
self.name = name
self.listeners = []
async def dispatch(self, **context):
for listener in self.listeners:
# 异步执行监听器
await listener(**context)
class Sanic:
def __init__(self):
self.signal_router = SignalRouter()
def signal(self, signal_name):
def decorator(handler):
self.signal_router.get(signal_name).add_listener(handler)
return handler
return decorator
async def dispatch(self, signal_name, **context):
if signal := self.signal_router.get(signal_name):
await signal.dispatch(**context)
# 在请求处理流程中触发信号
async def handle_request(request):
# 请求开始
await app.dispatch("http.lifecycle.begin", request=request)
# 路由前
await app.dispatch("http.routing.before", request=request)
# ... 执行路由匹配...
# 处理器前
await app.dispatch("http.handler.before", request=request)
try:
# 执行处理器...
response = await handler(request)
# 处理器后
await app.dispatch("http.handler.after", request=request, response=response)
except Exception as e:
# 异常处理
response = error_handler(e)
# 响应生命周期
await app.dispatch("http.lifecycle.response", request=request, response=response)
# 响应发送
await app.dispatch("http.lifecycle.send", request=request, response=response)
# 请求完成
await app.dispatch("http.lifecycle.complete", request=request, response=response)
class Signal:
def __init__(self, name):
self.name = name
self.listeners = []
async def dispatch(self, **context):
for listener in self.listeners:
# 异步执行监听器
await listener(**context)
class Sanic:
def __init__(self):
self.signal_router = SignalRouter()
def signal(self, signal_name):
def decorator(handler):
self.signal_router.get(signal_name).add_listener(handler)
return handler
return decorator
async def dispatch(self, signal_name, **context):
if signal := self.signal_router.get(signal_name):
await signal.dispatch(**context)
# 在请求处理流程中触发信号
async def handle_request(request):
# 请求开始
await app.dispatch("http.lifecycle.begin", request=request)
# 路由前
await app.dispatch("http.routing.before", request=request)
# ... 执行路由匹配...
# 处理器前
await app.dispatch("http.handler.before", request=request)
try:
# 执行处理器...
response = await handler(request)
# 处理器后
await app.dispatch("http.handler.after", request=request, response=response)
except Exception as e:
# 异常处理
response = error_handler(e)
# 响应生命周期
await app.dispatch("http.lifecycle.response", request=request, response=response)
# 响应发送
await app.dispatch("http.lifecycle.send", request=request, response=response)
# 请求完成
await app.dispatch("http.lifecycle.complete", request=request, response=response)
插件生态系统
拥有一个不断增长的社区插件生态系统(通过 sanic-ext 扩展包或第三方库):
CORS (跨域资源共享)
模板渲染 (Jinja2)
依赖注入
请求验证 (Pydantic)
OpenAPI/Swagger 文档自动生成
健康检查端点
配置管理
数据库集成(ORM/ODM 适配)
模板渲染 (Jinja2)
依赖注入
请求验证 (Pydantic)
OpenAPI/Swagger 文档自动生成
健康检查端点
配置管理
数据库集成(ORM/ODM 适配)
0 条评论
下一页