uvloop (linux
)
sanic
是一个异步框架,使用了 Python 中的协程,
事件循环,事件循环的构建依赖于 asyncio
这个库,
但 asyncio
构建的事件循环效率是不高的,所以有一个第三方库 uvloop
。
uvloop
使用 Cython
编写,基于 libuv
,它可以让 asyncio
变得更快。
即便我们不开发 Sanic
服务,也可以使用 uvloop
来替换 asyncio
内部的事件循环。
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
sanic 基本介绍
Sanic
是 Python3.7+
Web 服务器和 Web 框架,旨在提高性能。
它允许使用 Python3.5
中添加的 async/await
语法,
这使得您的代码有效的避免阻塞从而达到提升响应速度的目的
Sanic
具备开箱即用的功能,它可以用于编写,部署和扩展生产级 Web 应用程序
- 内置极速 web server
- 生产准备就绪
- 极高的拓展性
- 支持 ASGI
- 简单直观的 API 设计
- 社区保障
quick start 上手
请求与响应
from sanic import \
Sanic,\
request,\
response
from sanic.response import text, json
app = Sanic("DemoApp")
@app.get("/index")
async def hello_world(request: request.Request):
print(request.url) # http://localhost:8888/index?a=1&b=2&a=3
print(request.path) # /index
print(request.method) # GET
print(request.query_args) # [('a', '1'), ('b', '2'), ('a', '3')]
print(request.args) # {'a': ['1', '3'], 'b': ['2']}
return json({"name": "a", "message": "hello"})
@app.get("/text")
async def text(request: request.Request):
# 里面传入一个字符串即可, 至于状态码、响应头可以根据自身需求决定是否设置
return response.text("text")
@app.get("/json")
async def json(request: request.Request):
# 第一个参数传入一个字典即可
return response.json({"age": 16}, ensure_ascii=False)
@app.get("/html")
async def html(request: request.Request):
# 里面也接收一个字符串, 如果是文件的话, 可以先读取出来
# 当然还可以使用 jinja2 进行渲染
return response.html("<h3>HEAD</h3>")
@app.get("/file")
async def file(request: request.Request):
# 由于调用 response.file 会得到一个协程, 因此我们需要使用 await
# 接收如下参数:
"""
location: 要返回的文件的路径, 必传参数
status: 成功响应时的 http 状态码 200, 正常返回的话无需修改
mime_type: 如果为空, 内部会通过 mimetypes 模块的 guess_type 函数进行推断
headers: 自定义 http 响应头
filename: 文件名, 如果传递了会写入到响应头 headers 的 Content-Disposition 中
"""
return await response.file("1.txt")
@app.get("/raw")
async def raw(request: request.Request):
# 和 text 非常类似, 但是它第一个参数接收的不是字符串、而是字节串
return response.raw(b"apply")
@app.route("/stream", methods=["GET"])
async def stream(request: request.Request):
# Sanic 返回流数据给浏览器, 不是一次性把所有数据返回, 而是一部分一部分地返回
# 参数如下:
"""
streaming_fn: 用于写流响应数据的协程函数
status: 状态码, 默认 200
headers: 自定义的 http 响应头
content_type: 默认是纯文本的 content_type, 可以根据实际情况进行修改
"""
async def stream_fn(res: response.StreamingHTTPResponse):
await res.write("你")
import asyncio
await asyncio.sleep(2)
await res.write("好么")
# # 但是浏览器访问的时候是 3s 之后一起显示, 因为浏览器上面一旦显示内容, 就代表这一次请求已经结束了
# 所以是整体一起显示的
return response.stream(stream_fn)
@app.get("/file_stream")
async def file_stream(request: request.Request):
# 从名字上看, 这是 file 和 stream 的结合
# file_stream 也是返回文件, 但它是一边读一边返回, 每次返回一定大小的数据
# 而 file 则是一次性返回所有数据, 很明显当文件过大时, file_stream 可以大大节省内存
"""
参数相比 response.file 会多一个 chunk_size, 表示每次读取的文件数据大小, 默认 4096
如果是 file_stream, 你会发现响应头中少了 Content-Length, 因为流的形式每次只是返回一个 chunk, 无法得知整个文件的大小
但是多了一个 Transfer-Encoding
"""
return await response.file_stream("1.txt")
@app.get("/redirect")
async def redirect(request: request.Request):
# 会跳转到路由 /text
return response.redirect("/text")
@app.route("/cookie", methods=["GET"])
async def cookie(request: request.Request):
"""
expire(类型:datetime):cookie在客户端的浏览器到期时间
path(类型:string):cookie应用的URL子集。默认为/
comment(类型:string):注释(metadata)
domain(类型:string):指定cookie有效的域, 明确指定的域必须始终以点开头
max-age(类型:数字):cookie存活的秒数
secure(类型:boolean):指定cookie是否只通过HTTPS发送
httponly(类型:boolean):指定cookie是否能被Javascript读取
"""
res = response.text("text")
# 获取 cookie
print(request.cookies.get("test"))
# 设置完 cookie 之后再返回
res.cookies["name"] = "cookie_name"
# 如果想指定 cookie 存活的秒数, 那么可以这么做
res.cookies["name"]["max-age"] = 5
# 删除 cookie, 直接 del 即可
del res.cookies["name"]
return res
# print(app.config)
app.run(host="localhost", port=8888, auto_reload=True)
# sanic server.app 启动
#
路由
@app.route("/girl/<age1:int>/<age2:int>", methods=["GET"])
int: 整数
number: 浮点数
alpha: 只含有大小写字母的字符串
uuid: uuid
string: 任意字符串, 不指定 :类型 时的默认行为, 注意字符串中不包含 /
path: 任意字符串, 和 string 的区别是可以包含 /; 比如: /<index>, 如果访问 /字符串 的话, index 的值就是 "字符串"; 但如果是 /字符串1/字符串2 的话, 就会报错, 因为路由无法匹配; 而 /<index:path> 的话就不一样了, 如果访问 /字符串1/字符串2, 那么 index 的值就是 "字符串1/字符串2
REGEX_TYPES = {
"string": (str, r"[^/]+"),
"int": (int, r"-?\d+"),
"number": (float, r"-?(?:\d+(?:\.\d*)?|\.\d+)"),
"alpha": (str, r"[A-Za-z]+"),
"path": (str, r"[^/].*?"),
"uuid": (
uuid.UUID,
r"[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-"
r"[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}",
),
}
# 也可以自定义 类型+匹配规则
from sanic import router
router.REGEX_TYPES["email"] = (str, r"email_regex")
@app.get("/get_email/<email_info:email>")
url_for
@app.get("/index")
async def index(request: request.Request):
cookies = request.cookies
if cookies.get("session") is not None:
return response.text("Home Page")
else:
# 这里的第一个参数 "error_handler", 对应的是视图函数名(这里其实不太准确, 至于为什么后面说)
# 关键字参数 error_msg, 就是函数中接收的参数
url = app.url_for("error_handler", error_msg="no login")
return response.redirect(url)
@app.get("/<error_msg>")
def error_handler(request: request.Request, error_msg):
return response.text(error_msg)
斜杠 strict_slashes
@app.get("/index", strict_slashes=True) # /index 可以访问, /index/ 则不行
@app.get("/index/", strict_slashes=True) # /index/ 可以访问, /index 则不行
@app.get("/index") # /index、/index/ 均可
@app.get("/index/") # /index、/index/ 均可
自定义路由名称
@app.get("/index")
async def index(request: request.Request):
cookies = request.cookies
if cookies.get("session") is not None:
return response.text("home page")
else:
url = app.url_for("error", error_msg="no login")
return response.redirect(url)
@app.get("/<error_msg>", name="error")
def error_handler(request: request.Request, error_msg):
return response.text(error_msg)
静态文件路由
app = Sanic("sanic_service")
app.static("/static", "../bg")
# 静态路由也是可以指定 name 属性的
app.static("/files", "../files", name="my_files")
app.url_for("static", name="my_files", filename="3.png") # 等价于 /files/3.png
# 如果我们提供的文件非常大的话,那么可以选择使用流文件来替代文件下载:
# Sanic 将会使用 file_stream() 替代 file() 来提供静态文件访问。它默认的块大小是 1kb,当然我们也可以给这个参数传递一个具体的数值来显式地指定块大小。
app.static("/static", "../bg", stream_large_files=True)
版本控制
@app.get("/index", version=4)
def index(request: request.Request):
return response.text("这是第四代 URL")
# http://localhost:8888/v4/index
websocket 路由
from sanic.websocket import WebSocketConnection
app = Sanic("sanic_service")
@app.websocket("/ws")
async def websocket(request: request.Request, ws: WebSocketConnection):
while True:
res = await ws.recv()
res = {"a": 1}.get(res)
await ws.send(f"收到 信息: {res}")
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
ws = new WebSocket("ws://localhost:8888/ws");
//如果连接成功, 会打印下面这句话, 否则不会打印
ws.onopen = function () {
console.log('连接成功')
};
//接收数据, 服务端有数据过来, 会执行
ws.onmessage = function (event) {
console.log(event.data)
};
//服务端主动断开连接, 会执行.
//客户端主动断开的话, 不执行
ws.onclose = function () { }
</script>
</body>
</html>
中间件
-
app.middleware 请求中间件
middleware(“request”) 等价于 Flask 中的 before_request middleware(“response”) 则等价于 after_request
@app.middleware("request")
def request_midd1(request: request.Request):
# 为请求设置一个请求头
request.headers["name"] = "a1"
@app.middleware("response")
def response_midd1(request: request.Request,
response: response.HTTPResponse):
# 注意: 响应中间件的视图函数需要接收两个参数, 一个 request、一个 response
# 为返回的内容设置一个头部信息
response.headers["name"] = "1a"
-
app.listener 监听器
在应用程序的生命周期内的多个时间点运行一些指定的逻辑
@app.listener(“before_server_start”): 在服务开始之前执行一些代码 @app.listener(“after_server_start”): 在服务开始之后执行一些代码 @app.listener(“before_server_stop”): 在服务结束之前执行一些代码 @app.listener(“after_server_stop”): 在服务结束之后执行一些代码
# 1
async def setup(ap, loop):
app.config["db"] = await create_async_engine()
app.register_listener(setup, "before_server_start")
# 2
@app.listener("before_server_start")
async def setup(ap, loop):
app.config["db"] = await create_async_engine()
执行后台任务 app.add_task
async background_task(app):
await xxx()
app.add_task(background_task(app))
蓝图 BluePrint
from sanic import Blueprint, response, Sanic
# 给蓝图指定一个名字
bp1 = Blueprint("comment_blueprint", url_prefix="/comment")
bp2 = Blueprint("apply_blueprint", url_prefix="/apply")
@bp1.get("/comment/<cid>")
async def get_comment_by_id(request, cid):
return response.text(f"获取 id 为 {cid} 的comment")
@bp2.get("/apply/<aid>")
async def get_apply_by_id(request, aid):
return response.text(f"获取 id 为 {aid} 的apply")
app = Sanic("main_app")
app.blueprint(bp1)
app.blueprint(bp2)
# # http://127.0.0.1:8000/comment/comment/1
app.run(auto_reload=True)
嵌套蓝图
app.blueprint(Blueprint.group(bp1, bp2, url_prefix="/apis"))
# http://127.0.0.1:8000/apis/comment/comment/1
bp
和 app
本质上一样的,所以 app
里面有的方法 bp
也有
只不过 bp
最后需要注册到 app
中
-
蓝图路与中间件
websocket 路由
可以通过 `@bp.websocket` 装饰器、或者 bp.add_websocket_route 方法进行注册。
中间件
蓝图组也是可以注册中间件的,注册的中间件会作用组内的在每一个蓝图上
url_for
对蓝图注册的路由使用 url_for,那么切记 name 一定是 "蓝图名.路由名" 的形式
异常
404
from sanic.exceptions import NotFound
@app.exception(NotFound)
async def not_fount(request, exception):
return response.text(f"不存在该url")
server error
# 1
from sanic.exceptions import ServerError
raise ServerError("server_error", status_code=500)
# 2
from sanic.exceptions import abort
raise abort(500, "server_error")
处理异常
# 1
@app.exception(Exception)
async def error_handle(request: request.Request, exception):
if isinstance(exception, NotFound):
return response.text("not founnd")
else:
return response.text(f"出现了错误: {exception}")
# 2
async def error_handle(request: request.Request, exception):
if isinstance(exception, NotFound):
return response.text("not founnd")
else:
return response.text(f"出现了错误: {exception}")
app.error_handler.add(Exception, error_handle)
# 3
from sanic.handlers import ErrorHandler
class UDErrorHandler(ErrorHandler):
def default(self, request, exception):
"""自己的逻辑处理"""
# 最后执行父类的 default 方法
return super().default(request, exception)
CBV 视图类 HTTPMethodView
from sanic.views import HTTPMethodView
app = Sanic("sanic_service")
class MyView(HTTPMethodView):
async def get(self, request):
return response.text("GET")
async def post(self, request):
return response.text("POST")
async def put(self, request):
return response.text("PUT")
async def delete(self, request):
return response.text("DELETE")
# 注册路由, 有两种方式
app.add_route(MyView.as_view(), "/index") # 这种方式也是用于视图函数的注册
# 或者通过 app.route("/index")(MyView.as_view())
app.run(host="127.0.0.1", port=8887)
视图类
增加装饰器
from sanic.views import HTTPMethodView
from sanic import Sanic, response
from functools import wraps
app = Sanic("demo")
def deco(func):
@wraps(func)
async def inner(*args, **kwargs):
ret = await func(*args, **kwargs)
if isinstance(ret, response.HTTPResponse):
b = ret.body
ret.body = (b.decode("utf-8") + " decorated1 ").encode("utf-8")
return ret
return inner
def deco1(func):
@wraps(func)
async def inner(*args, **kwargs):
ret = await func(*args, **kwargs)
if isinstance(ret, response.HTTPResponse):
b = ret.body
ret.body = (b.decode("utf-8") + " decorated2 ").encode("utf-8")
return ret
return inner
class MyView(HTTPMethodView):
decorators = [deco, deco1]
async def get(self, request):
return response.text("GET JSON")
# GET JSON decorated1 decorated2
app.add_route(MyView.as_view(), "/")
app.run(auto_reload=True)
Sanic streaming
流式响应 + 流式请求
只有 post、put、patch 方法有 stream 参数,其它没有。假设是 GET 请求,那么 request.stream 返回的是一个 None
# 流式请求 + 流式响应
@app.post("/index1", stream=True)
async def index1(request):
async def streaming(streaming_response: response.StreamingHTTPResponse):
while True:
# 读取数据
body = await request.stream.read()
if body is None:
break
# 写入数据
await streaming_response.write(body)
# 调用 response.stream, 接收带有一个 StreamingHTTPResponse 对象参数的协程函数, 用以写操作
return response.stream(streaming, content_type="application/octet-stream")
# 流式请求
@app.post("/index2", stream=True)
async def index2(request):
# 简单粗暴, 直接将数据读出来, 拼接在一起
# 当然由于涉及字符串的拼接, 所以效率不高, 可以写入到缓存中, 或者 append 到数组中再 join
result = b""
while True:
body = await request.stream.read()
if body is None:
break
result += body
return response.raw(result)
跨域 sanic_cors
# 1
from sanic_cors import CORS
app = Sanic("sanic_service")
# 只需要这一步, 即可解决跨域问题
CORS(app)
# 2 限制只能指定的资源进行跨域
from sanic_cors import CORS
app = Sanic("sanic_service")
CORS(app, resources={r"/api/*": {"origins": "*"}})
# 3 给某个指定的路由进行跨域
from sanic_cors import cross_origin
app = Sanic("sanic_service")
@app.get("/index")
@cross_origin(app)
async def index(request: request.Request):
Sanic 的部署
在 Linux 上安装 Sanic
的时候会自动安装 uvloop
-
app.run
host(默认为 127.0.0.1): 服务器运行的主机地址 port(默认为 8000): 服务器监听的端口 debug(默认为False): 是否开启 debug 模式, 会让服务器变慢 auto_reload(默认为None): 是否自动重启, 如果开启 debug, 那么 auto_reload 会自动开启 ssl(默认为None): 开启 SSL 加密 sock(默认为None): 服务器可以创建来自该 socket 的连接 workers(默认为1): 需要创建的工作进程的个数 默认情况下 Sanic 只使用一个 CPU 核心在主进程中进行监听,为了提高性能,我们应该利用多核。 os.cpu_count() # 12 loop(默认为None): 一个 asyncio 兼容的事件循环, 如果为 None, Sanic 将创建自己的事件循环; 首先尝试使用 uvloop, 如果环境中不存在, 将退化使用 asyncio 自带的事件循环; protocol(默认为HttpProtocol): asyncio.Protocol 的子类 access_log(默认为True): 开启请求处理的日志(显著降低server的速度)
-
通过 Gunicorn 运行
gunicorn sanic_service:app --bind 0.0.0.0:8887 --daemon --workers=5 --worker-class sanic.worker.GunicornWorker --backlog 1000: 最大挂起的连接数 --log-level LEVEL: 输出的日志等级, 可选参数有: debug、info、warning、error、critical
不需要在server.py 中 运行 app app.run()
为了提高性能,我们可以设置环境变量 env SANIC_ACCESS_LOG="False"
来禁止写入日志,
或者指定 app.config.ACCESS_LOG
等于 False