thrift的简单使用(四)
使用
thriftpy2
启动服务端
首先定义 thrift 通讯文件
include "./Common/common.thrift"
namespace java ....thrift
namespace php ...
namespace cpp xxx.xx
namespace py xxx.xx
service TestService {
string getTestResult(
1: string task_id,
2: string task_status,
3: common.Status data_result,
4: string fail_reason
),
}
# encoding : utf-8
# __author__ = 'jm'
import thriftpy2
import asyncio
import yaml
import aiomysql
import logging.config
from functools import wraps
import os
import datetime
from thriftpy2.rpc import make_aio_server # 启动异步服务端
from conf import base_conf, logger
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
THRIFT_PATH = os.path.join(BASE_DIR, "xxxx", "xxx.thrift")
sync_task_thrift = thriftpy2.load(THRIFT_PATH, module_name="xxx_thrift")
def get_log(func):
@wraps(func)
def inner_wrap(*args, **options):
task_status = True
err_msg = ""
try:
result = func(*args, **options)
return result
except Exception as err:
task_status = False
err_msg = str(err)
finally:
info = "任务{0}执行成功".format(func.__name__) if task_status else "任务{0}执行失败, 失败原因:{1}".format(func.__name__, err_msg)
logger.info(info)
return inner_wrap
# 利用thriftpy2 实现服务端代码
class Dispatcher(object):
def __init__(self):
self._read_conf()
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self._connect_mysql(self.loop))
def _read_conf(self):
self.args = base_conf
async def _connect_mysql(self, loop):
self.pool = await aiomysql.create_pool(
host = self.args['DATABASE']["MYSQL"]["HOST"],
port = self.args['DATABASE']["MYSQL"]["PORT"],
user = self.args['DATABASE']["MYSQL"]["USER"],
password = self.args['DATABASE']["MYSQL"]["PASSWD"],
db = self.args['DATABASE']["MYSQL"]["DB"],
loop = loop
)
async def _select(self, sql, param, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
print(param)
await cur.execute(sql, param)
(r,) = await cur.fetchone()
return r
async def _execute(self, sql, param, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, param)
await conn.commit()
async def getTestResult(self, task_id, task_status, data_result, fail_reason):
try:
if task_status == "success":
# pass
# SQL_UPDATE_TASK_STATUS = "xxxxx"
# await self._execute(SQL_UPDATE_TASK_STATUS, (task_id), self.pool)
else:
SQL_UPDATE_TASK_STATUS = "xxxxx"
await self._execute(SQL_UPDATE_TASK_STATUS, (fail_reason, task_id), self.pool)
except Exception as e:
raise xxxxx.common.ServerException(500, str(e))
return "success"
server = make_aio_server(xxxx.TestService, Dispatcher(), "127.0.0.1", 6000)
server.serve()
使用 thrift 自己实现客户端
首先使用 定义好的 thrift通讯 文件, 生成python package
thrift -out --gen py xxx.thrift
# encoding : utf-8
# __author__ = 'jm'
import sys
sys.path.append("gen-py")
from xxxx.xxx.TestService import *
from xxx.xxxx.ttypes import *
from xxx.common.ttypes import *
from thrift.transport import TSocket,TTransport
from thrift.protocol import TBinaryProtocol
tsocket = TSocket.TSocket( '127.0.0.1', 6000)
# 这里要和 thriftpy2 使用的传输协议规则一致
transport = TTransport.TBufferedTransportFactory().getTransport(tsocket)
# 这里要和 thriftpy2 使用的传输协议规则一致
protocol = TBinaryProtocol.TBinaryProtocolFactory().getProtocol(transport)
client = Client(protocol)
transport.open()
client.getTestResult("91212d82282f98c6c73a4233bf8c561b", "failed", response_data, "test")
使用
thriftpy2
实现客户端
import thriftpy2
import asyncio
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
THRIFT_PATH = os.path.join(BASE_DIR, "xxxx", "xxxx.thrift")
sync_task_thrift = thriftpy2.load(THRIFT_PATH, module_name="xxxxx")
from thriftpy2.rpc import make_aio_client, make_client
#
# async def request():
# client = await make_aio_client(sync_task_thrift.SmbSyncTaskService, '127.0.0.1', 6000)
# print(await client.getCreateCampaignTaskResult(task_id='91212d82282f98c6c73a4233bf8c561b', task_status="failed", data_result=response_data , fail_reason="ok"))
# client.close()
#
#
# coroutine = request()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(coroutine)
client =make_client(xxx.TestService, '127.0.0.1', 6000)
client.getCTestResult(task_id='91212d82282f98c6c73a4233bf8c561b', task_status="failed", data_result=response_data , fail_reason="ok")