peewee的使用

peewee 可以完全可以应付个人或企业的中小型项目的 Model 层,上手容易,功能很强大

pwiz a model generator

peewee可以通过定义Model直接在数据库中创建表 反过来也可以,用数据库中已经存在的表创建Model

参数 解释 举例
-h help 显示帮助
-e engine 数据库引擎 -e mysql
-H host 数据库地址 -H localhost
-p port 数据库端口 -p 3306
-u user 数据库用户 -u user
-P password 数据库密码 -P
-s schema 模式 -s public
-t tables 指定生成来自表格 -t tweet,users,relationships
-v VIEWs 指定生成来自视图 -v
-o order 保留表格列顺序 -o
当前 engine支持的数据库:

    sqlite
    mysql
    postgresql

命令行语句

$ python -m pwiz -e mysql -H localhost -p 3306 -u root -P  -t table_names database_name > model.py

Playhouse库, 数据类型转换

model_to_dict

dict_to_model

from playhouse.shortcuts import model_to_dict, dict_to_model

# ……

if __name__ == "__main__":
    # ……
    user = User.create(username='jack')
    # 把数据对象转成字典
    u = model_to_dict(user)    
    print(u)

实际使用


# models.py

import os
import sys

from peewee import *
from playhouse.pool import PooledMySQLDatabase

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from conf import config


database_pool = PooledMySQLDatabase(
    config.DB_NAME,
    max_connections=20,
    stale_timeout=300,
    **config.MYSQL_CONF
)

database = MySQLDatabase(config.DB_NAME,
                         **config.MYSQL_CONF)

class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta:
        database = database

class AuthUser(BaseModel):
    date_joined = DateTimeField()
    email = CharField()
    first_name = CharField()
    is_active = IntegerField()
    is_staff = IntegerField()
    is_superuser = IntegerField()
    last_login = DateTimeField(null=True)
    last_name = CharField()
    password = CharField()
    username = CharField(unique=True)

    class Meta:
        table_name = 'auth_user'



#  main.py

import redis
import logging
import time
import json
import os
import sys
from threading import Thread

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from db.models import Application, database, database_pool, SubmitApplicationBackend
from src.enums import *

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s : [%(levelname)s] %(message)s',  # 定义输出log的格式
                    datefmt='%Y-%m-%d %H:%M:%S'
                    )

class RedisPubSub:
    """
    Redis 发布订阅类
    """

    def __init__(self, channel):
        self.__pool = redis.ConnectionPool(host=host, port=port, db=8, # db=2
                                           max_connections=10)
        self.__conn = redis.Redis(connection_pool=self.__pool)
        self.chan_pub = self.chan_sub = channel
        self.pubsub = ''

    def publish(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pubsub  = self.__conn.pubsub()
        pubsub.subscribe(self.chan_sub)
        return pubsub


def heartbeat(pubsub):
    while 1:
        pubsub.ping('I am alive :)')
        time.sleep(3)


if __name__ == '__main__':
    r = RedisPubSub('crms')
    ps = r.subscribe()

    t_ping = Thread(target=heartbeat, args=(ps, ))
    t_ping.setDaemon(True)
    t_ping.start()

    while 1:
        msg_iter = ps.listen()
        for msg in msg_iter:

            if msg["type"] == "pong":
                logger.info(f'pinged as:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')

            elif msg["type"] == "message" and  msg["channel"].decode("utf-8") == r.chan_sub:
                handled_data = json.loads(msg["data"])
                account_id = handled_data["account_id"]
                apply_id = handled_data["apply_id"]
                operation_type = handled_data["operation_type"]
                print(f'received data {apply_id}: {account_id} at:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}', apply_id)

                if operation_type == ApplicationActionTypeThriftEnum.BIND_BM:
                    with database_pool.connection_context():
                        with database_pool.atomic():
                            SubmitApplicationBackend.update(
                                confirm_time=confirm_time,
                                confirm_user=confirmor,
                                operation_fail_reason=reason,
                                operation_status=status
                            ).where(
                                SubmitApplicationBackend.account_id == account_id,
                                SubmitApplicationBackend.application_id == apply_id
                            ).execute()

                        query_ = SubmitApplicationBackend.select().\
                            where(SubmitApplicationBackend.application_id==apply_id)


                        count_failed = query_. \
                            where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.FAILED). \
                            count()

                        count_ae_rebate = query_. \
                            where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.AE_REBATE). \
                            count()

                        count_handling = query_. \
                            where(SubmitApplicationBackend.operation_status == AeOperationStatusEnum.PENDING_CONFIRM). \
                            count()

                        if count_failed > 0:
                            status = ApplicationStatusEnum.HANDLE_FAILED_FB
                        else:
                            if count_ae_rebate > 0:
                                status = ApplicationStatusEnum.AE_REBATE
                            else:
                                if count_handling > 0:
                                    status = ApplicationStatusEnum.PENDING_HANDLE_FB
                                else:
                                    status = ApplicationStatusEnum.FINISHED_FB

                    Application.update(status=status).\
                            where(Application.id==apply_id).\
                            execute()

                    SubmitApplicationBackend.update(application_status=status).\
                        where(SubmitApplicationBackend.application_id==apply_id).\
                        execute()

                print(f"saved data {apply_id}: {account_id}")


            else:
                ...


        # msg = ps.parse_response()

        # [b'pong', b'ping_msg']
        # if msg[0].decode('utf-8') == 'pong':
        #     logger.info(f'pinged as:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')

        # # [b'message', b'interaction_service', b'{data}']
        # if msg[0].decode('utf-8') == 'message' and msg[1].decode('utf-8') == r.chan_pub:
        #     data = json.loads(msg[2].decode('utf-8'))
        #     print(data)

peewee-async

import asyncio
import peewee
import logging
import os
import logging.config
import yaml
from peewee_async import Manager, MySQLDatabase

yaml_path = 'conf/config.yaml'
cf = {}
if os.path.exists(yaml_path):
    with open(yaml_path, 'r', encoding='utf-8') as f:
        cf = yaml.safe_load(f)
        logging.config.dictConfig(cf['logging_conf'])
else:
    raise ValueError('conf file does not exist!')

database = MySQLDatabase(
        cf["mysql"]["db"],
        host=cf["mysql"]["host"],
        port=int(cf["mysql"]["port"]),
        user=cf["mysql"]["user"],
        password=cf["mysql"]["password"]
        )


class PageBlock(peewee.Model):
    key = peewee.CharField(max_length=40, unique=True)
    text = peewee.TextField(default='')

    class Meta:
        database = database



loop = asyncio.get_event_loop() # Note: custom loop!
objects = Manager(database, loop=loop)
objects.database.allow_sync = False   # this will raise AssertionError on ANY sync call


async def my_async_func():
    await objects.create_or_get(
        PageBlock, key="title",
        text="Peewee is AWESOME with async!"
    )

    title = await objects.get(
        PageBlock, key="title"
    )
    print(f"Was: {title.text}")

    title.text = "Peewee is super awesome with async"
    await objects.update(title)
    print(f"New: {title.text}")

    await database.close_async()
    # https://stackoverflow.com/questions/40420243/event-loop-is-closed-error-in-aiomysql

loop.run_until_complete(my_async_func())
loop.close()

tricks 更新字段


class Book(Model):
    id = PrimaryKeyField()
    title = CharField(max_length=64, unique=True)
    author = CharField(max_length=64)
    publisher = CharField(max_length=64)
    price = DecimalField(max_digits=10, decimal_places=2)
    desc = CharField(max_length=256)

    class Meta:
        database = database

condition_dict = {"author":"鲁迅", "id":1}

q = Book.update({"price":30}).where(
    *[
       getattr(Book, key) == value for key, value in condition_dict.items()
    ]
)

q.execute()

Buy me a 肥仔水!