peewee
可以完全可以应付个人或企业的中小型项目的 Model 层
,上手容易,功能很强大
pwiz
a model generator
peewee可以通过定义Model直接在数据库中创建表 反过来也可以,用数据库中已经存在的表创建Model
参数 | 解释 | 举例 |
---|---|---|
-h | help 显示帮助 | — |
-e | engine 数据库引擎 | -e mysql |
-H | host 数据库地址 | -H localhost |
-p | port 数据库端口 | -p 3306 |
-u | user 数据库用户 | -u user |
-P | password 数据库密码 | -P |
-s | schema 模式 | -s public |
-t | tables 指定生成来自表格 | -t tweet,users,relationships |
-v | VIEWs 指定生成来自视图 | -v |
-o | order 保留表格列顺序 | -o |
当前 engine支持的数据库:
sqlite
mysql
postgresql
命令行语句
$ python -m pwiz -e mysql -H localhost -p 3306 -u root -P -t table_names database_name > model.py
Playhouse
库, 数据类型转换
model_to_dict
dict_to_model
from playhouse.shortcuts import model_to_dict, dict_to_model
# ……
if __name__ == "__main__":
# ……
user = User.create(username='jack')
# 把数据对象转成字典
u = model_to_dict(user)
print(u)
实际使用
# models.py
import os
import sys
from peewee import *
from playhouse.pool import PooledMySQLDatabase
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from conf import config
database_pool = PooledMySQLDatabase(
config.DB_NAME,
max_connections=20,
stale_timeout=300,
**config.MYSQL_CONF
)
database = MySQLDatabase(config.DB_NAME,
**config.MYSQL_CONF)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class AuthUser(BaseModel):
date_joined = DateTimeField()
email = CharField()
first_name = CharField()
is_active = IntegerField()
is_staff = IntegerField()
is_superuser = IntegerField()
last_login = DateTimeField(null=True)
last_name = CharField()
password = CharField()
username = CharField(unique=True)
class Meta:
table_name = 'auth_user'
# main.py
import redis
import logging
import time
import json
import os
import sys
from threading import Thread
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from db.models import Application, database, database_pool, SubmitApplicationBackend
from src.enums import *
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s : [%(levelname)s] %(message)s', # 定义输出log的格式
datefmt='%Y-%m-%d %H:%M:%S'
)
class RedisPubSub:
"""
Redis 发布订阅类
"""
def __init__(self, channel):
self.__pool = redis.ConnectionPool(host=host, port=port, db=8, # db=2
max_connections=10)
self.__conn = redis.Redis(connection_pool=self.__pool)
self.chan_pub = self.chan_sub = channel
self.pubsub = ''
def publish(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True
def subscribe(self):
pubsub = self.__conn.pubsub()
pubsub.subscribe(self.chan_sub)
return pubsub
def heartbeat(pubsub):
while 1:
pubsub.ping('I am alive :)')
time.sleep(3)
if __name__ == '__main__':
r = RedisPubSub('crms')
ps = r.subscribe()
t_ping = Thread(target=heartbeat, args=(ps, ))
t_ping.setDaemon(True)
t_ping.start()
while 1:
msg_iter = ps.listen()
for msg in msg_iter:
if msg["type"] == "pong":
logger.info(f'pinged as:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')
elif msg["type"] == "message" and msg["channel"].decode("utf-8") == r.chan_sub:
handled_data = json.loads(msg["data"])
account_id = handled_data["account_id"]
apply_id = handled_data["apply_id"]
operation_type = handled_data["operation_type"]
print(f'received data {apply_id}: {account_id} at:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}', apply_id)
if operation_type == ApplicationActionTypeThriftEnum.BIND_BM:
with database_pool.connection_context():
with database_pool.atomic():
SubmitApplicationBackend.update(
confirm_time=confirm_time,
confirm_user=confirmor,
operation_fail_reason=reason,
operation_status=status
).where(
SubmitApplicationBackend.account_id == account_id,
SubmitApplicationBackend.application_id == apply_id
).execute()
query_ = SubmitApplicationBackend.select().\
where(SubmitApplicationBackend.application_id==apply_id)
count_failed = query_. \
where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.FAILED). \
count()
count_ae_rebate = query_. \
where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.AE_REBATE). \
count()
count_handling = query_. \
where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.PENDING_CONFIRM). \
count()
if count_failed > 0:
status = ApplicationStatusEnum.HANDLE_FAILED_FB
else:
if count_ae_rebate > 0:
status = ApplicationStatusEnum.AE_REBATE
else:
if count_handling > 0:
status = ApplicationStatusEnum.PENDING_HANDLE_FB
else:
status = ApplicationStatusEnum.FINISHED_FB
Application.update(status=status).\
where(Application.id==apply_id).\
execute()
SubmitApplicationBackend.update(application_status=status).\
where(SubmitApplicationBackend.application_id==apply_id).\
execute()
print(f"saved data {apply_id}: {account_id}")
else:
...
# msg = ps.parse_response()
# [b'pong', b'ping_msg']
# if msg[0].decode('utf-8') == 'pong':
# logger.info(f'pinged as:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')
# # [b'message', b'interaction_service', b'{data}']
# if msg[0].decode('utf-8') == 'message' and msg[1].decode('utf-8') == r.chan_pub:
# data = json.loads(msg[2].decode('utf-8'))
# print(data)
peewee-async
import asyncio
import peewee
import logging
import os
import logging.config
import yaml
from peewee_async import Manager, MySQLDatabase
yaml_path = 'conf/config.yaml'
cf = {}
if os.path.exists(yaml_path):
with open(yaml_path, 'r', encoding='utf-8') as f:
cf = yaml.safe_load(f)
logging.config.dictConfig(cf['logging_conf'])
else:
raise ValueError('conf file does not exist!')
database = MySQLDatabase(
cf["mysql"]["db"],
host=cf["mysql"]["host"],
port=int(cf["mysql"]["port"]),
user=cf["mysql"]["user"],
password=cf["mysql"]["password"]
)
class PageBlock(peewee.Model):
key = peewee.CharField(max_length=40, unique=True)
text = peewee.TextField(default='')
class Meta:
database = database
loop = asyncio.get_event_loop() # Note: custom loop!
objects = Manager(database, loop=loop)
objects.database.allow_sync = False # this will raise AssertionError on ANY sync call
async def my_async_func():
await objects.create_or_get(
PageBlock, key="title",
text="Peewee is AWESOME with async!"
)
title = await objects.get(
PageBlock, key="title"
)
print(f"Was: {title.text}")
title.text = "Peewee is super awesome with async"
await objects.update(title)
print(f"New: {title.text}")
await database.close_async()
# https://stackoverflow.com/questions/40420243/event-loop-is-closed-error-in-aiomysql
loop.run_until_complete(my_async_func())
loop.close()
tricks 更新字段
class Book(Model):
id = PrimaryKeyField()
title = CharField(max_length=64, unique=True)
author = CharField(max_length=64)
publisher = CharField(max_length=64)
price = DecimalField(max_digits=10, decimal_places=2)
desc = CharField(max_length=256)
class Meta:
database = database
condition_dict = {"author":"鲁迅", "id":1}
q = Book.update({"price":30}).where(
*[
getattr(Book, key) == value for key, value in condition_dict.items()
]
)
q.execute()