thrift的简单使用(四)--使用thriftpy2

thrift的简单使用(四)


使用thriftpy2 启动服务端

首先定义 thrift 通讯文件


include "./Common/common.thrift"

namespace java ....thrift
namespace php ...
namespace cpp xxx.xx
namespace py xxx.xx

service TestService  {
    string getTestResult(
        1: string task_id,
        2: string task_status,
        3: common.Status data_result,
        4: string fail_reason
    ),
}



# encoding : utf-8
# __author__ = 'jm'

import thriftpy2
import asyncio
import yaml
import aiomysql
import logging.config
from functools import wraps
import os
import datetime
from thriftpy2.rpc import make_aio_server # 启动异步服务端
from conf import base_conf, logger

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
THRIFT_PATH = os.path.join(BASE_DIR, "xxxx", "xxx.thrift")

sync_task_thrift = thriftpy2.load(THRIFT_PATH, module_name="xxx_thrift")


def get_log(func):
    @wraps(func)
    def inner_wrap(*args, **options):
        task_status = True
        err_msg = ""
        try:
            result = func(*args, **options)
            return result
        except Exception as err:
            task_status = False
            err_msg = str(err)
        finally:
            info = "任务{0}执行成功".format(func.__name__) if task_status else   "任务{0}执行失败, 失败原因:{1}".format(func.__name__, err_msg)
            logger.info(info)

    return inner_wrap


# 利用thriftpy2 实现服务端代码

class Dispatcher(object):

    def __init__(self):
        self._read_conf()
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self._connect_mysql(self.loop))

    def _read_conf(self):
        self.args = base_conf

    async def _connect_mysql(self, loop):
        self.pool = await aiomysql.create_pool(
            host = self.args['DATABASE']["MYSQL"]["HOST"],
            port = self.args['DATABASE']["MYSQL"]["PORT"],
            user = self.args['DATABASE']["MYSQL"]["USER"],
            password = self.args['DATABASE']["MYSQL"]["PASSWD"],
            db = self.args['DATABASE']["MYSQL"]["DB"],
            loop = loop
        )

    async def _select(self, sql, param, pool):
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                print(param)
                await cur.execute(sql, param)
                (r,) = await cur.fetchone()
                return r

    async def _execute(self, sql, param, pool):
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(sql, param)
                await conn.commit()

    async def getTestResult(self, task_id, task_status, data_result, fail_reason):
        try:
            if task_status == "success":
                # pass
                # SQL_UPDATE_TASK_STATUS = "xxxxx"
                # await self._execute(SQL_UPDATE_TASK_STATUS, (task_id), self.pool)

            else:
                SQL_UPDATE_TASK_STATUS = "xxxxx"
                await self._execute(SQL_UPDATE_TASK_STATUS, (fail_reason, task_id), self.pool)

        except Exception as e:
            raise xxxxx.common.ServerException(500, str(e))

        return "success"



server = make_aio_server(xxxx.TestService, Dispatcher(), "127.0.0.1", 6000)
server.serve()

使用 thrift 自己实现客户端

首先使用 定义好的 thrift通讯 文件, 生成python package

thrift -out  --gen py  xxx.thrift 

# encoding : utf-8
# __author__ = 'jm'

import sys
sys.path.append("gen-py")

from xxxx.xxx.TestService import *
from xxx.xxxx.ttypes import *
from xxx.common.ttypes import *


from thrift.transport import TSocket,TTransport
from thrift.protocol import TBinaryProtocol


tsocket = TSocket.TSocket( '127.0.0.1', 6000)
# 这里要和 thriftpy2 使用的传输协议规则一致
transport = TTransport.TBufferedTransportFactory().getTransport(tsocket)
# 这里要和 thriftpy2 使用的传输协议规则一致
protocol = TBinaryProtocol.TBinaryProtocolFactory().getProtocol(transport)
client = Client(protocol)
transport.open()

client.getTestResult("91212d82282f98c6c73a4233bf8c561b", "failed", response_data, "test")


使用 thriftpy2 实现客户端


import thriftpy2
import asyncio
import os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
THRIFT_PATH = os.path.join(BASE_DIR, "xxxx", "xxxx.thrift")

sync_task_thrift = thriftpy2.load(THRIFT_PATH, module_name="xxxxx")


from thriftpy2.rpc import make_aio_client, make_client

#
# async def request():
#     client = await make_aio_client(sync_task_thrift.SmbSyncTaskService, '127.0.0.1', 6000)
#     print(await client.getCreateCampaignTaskResult(task_id='91212d82282f98c6c73a4233bf8c561b', task_status="failed", data_result=response_data , fail_reason="ok"))
#     client.close()
#
#
# coroutine = request()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(coroutine)

client =make_client(xxx.TestService, '127.0.0.1', 6000)
client.getCTestResult(task_id='91212d82282f98c6c73a4233bf8c561b', task_status="failed", data_result=response_data , fail_reason="ok")


Buy me a 肥仔水!