常见的分布式事务 方案
分布式事务模式 | 介绍 | 技术栈 |
---|---|---|
AT 模式 |
无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本(sql都由框架托管统一执行,会存在脏写问题) | seata、shardingsphere |
TCC 模式 |
高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景(第一阶段会产生行锁,事务执行太久会锁行很久) | seata、service-comb |
Saga 模式 |
长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统(第一阶段就操作DB,会存在脏读问题) | seata、shardingsphere、service-comb |
XA 模式 |
分布式强一致性的解决方案,但性能低而使用较少。 | seata、shardingsphere |
Saga和TCC模式区别不大,TCC就是多了个锁行的步骤(避免了脏读,但事务执行太久会导致锁行很久,不适用于长事务) |
常见的分布式解决方案框架
特性 | DTM | SEATA | 备注 |
---|---|---|---|
支持语言 | Golang、python、php及其他 | Java | dtm可轻松接入一门新语言 |
异常处理 | 子事务屏障自动处理 | 手动处理 | dtm解决了幂等、悬挂、空补偿 |
TCC事务 | ✓ | ✓ | |
XA事务 | ✓ | ✓ | |
AT事务 | ✗ | ✓ | AT与XA类似,性能更好,但有脏回滚 |
SAGA事务 | 简单模式 | 状态机复杂模式 | dtm的状态机模式在规划中 |
事务消息 | ✓ | ✗ | dtm提供类似rocketmq的事务消息 |
通信协议 | HTTP | dubbo等协议,无HTTP | dtm后续将支持grpc类协议 |
star数量 | github stars | github stars | dtm从20210604发布0.1,发展快 |
DTM(Distributed Transaction Manager
)框架
DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题
它提供了Saga、TCC、 XA
和二阶段消息模式
以满足不同应用场景的需求,同时其首创的子事务屏障技术
可以有效解决幂等
、悬挂
和空补偿
等异常问题。
DTM 介绍
DTM是一款开源的分布式事务管理器
,解决跨数据库、跨服务、跨语言栈更新数据的一致性
问题
DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,
避免只更新了一部分数据产生的一致性问题
DTM 流程
DTM的架构
整个DTM架构中,一共有三个角色
,分别承担了不同的职责:
RM-资源管理器
:
RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。
例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。
AP-应用程序:
AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。
例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM
TM-事务管理器
:
TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。
每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。
总体而言,AP-应用程序
充当全局事务编排器的角色
通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。
TM-事务管理器
接收到注册的全局事务和子事务后,负责调用RM-资源管理器
来执行对应的事务分支,
TM-事务管理器
根据事务分支的执行结果决定是否提及或回滚事务。
Saga 模式
使用DTM
的Saga
模式,首先要进行 事务拆分
DTM Saga 时序图
- 成功
用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
DTM已完成所有的事务分支,将全局事务的状态修改为已完成
- 失败
在实际的业务中,子事务可能出现失败,例如转入的子账号被冻结导致转账失败。我们对业务代码进行修改,让TransIn的正向操作失败
在转入操作失败的情况下,TransIn和TransOut的补偿操作被执行,保证了最终的余额和转账前是一样的
转出子事务(TransOut)
[HttpPost("TransferOut")]
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转出{request.Amount}元";
_logger.LogInformation($"转出子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
// 2. 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null || bankAccount.Balance < request.Amount)
throw new InvalidDataException("账户不存在或余额不足!");
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转出子事务-失败:{ex.Message}");
// 3. 按照接口协议,返回409,以表示子事务失败
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转出子事务-成功:{msg}");
return Ok();
}
-
1 必须开启
子事务屏障
gid:全局事务Id trans_type:事务类型,是saga、msg、xa或者是tcc。 branch_id:子事务的Id op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)
-
2 必须
在子事务屏障内执行事务操作
-
3 对于Saga正向操作而言,业务上的
失败
与异常
是需要做严格区分的例如余额不足,是业务上的失败,必须回滚。 而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。
因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回409
状态码以告知DTM 子事务失败。
转出补偿子事务(TransOut_Compensate)
[HttpPost("TransferOut_Compensate")]
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
_logger.LogInformation($"转出补偿子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
// 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
return; //对于补偿操作,可直接返回,中断后续操作
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转出补偿子事务-成功!");
// 2. 因补偿操作必须成功,所以必须返回200。
return Ok();
}
由于DTM设计为总是执行补偿
,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。
但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。
因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回200
。
因此当出现bankAccount==null
时可以直接 return。
转入子事务(TransIn)
转入子事务和转出子事务的实现基本类似,都是开启子事务屏障
后,在branchBarrier.Call(conn, async tx => {}
中实现事务逻辑
并通过抛异常的方式
并最终返回409状态码
来显式告知DTM 子事务执行失败
[HttpPost("TransferIn")]
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转入{request.Amount}元";
_logger.LogInformation($"转入子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
throw new InvalidDataException("账户不存在!");
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转入子事务-失败:{ex.Message}");
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转入子事务-成功:{msg}");
return Ok();
}
转入补偿子事务(TransIn_Compensate)
转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,
在branchBarrier.Call(conn, async tx => {}
中实现事务逻辑,并最终返回200状态码
来告知DTM 补偿子事务执行成功
。
[HttpPost("TransferIn_Compensate")]
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
{
var msg = "用户{request.UserId}回滚转入{request.Amount}元";
_logger.LogInformation($"转入补偿子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null) return;
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转入补偿子事务-成功!");
return Ok();
}
Saga事务编排
生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);
创建Saga全局事务:_transFactory.NewSaga(gid);
添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。
如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。
提交Saga全局事务:saga.Submit(cancellationToken);
若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。
[HttpPost("Transfer")]
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
//1. 生成全局事务ID
var gid = await _dtmClient.GenGid(cancellationToken);
var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
//2. 创建Saga
var saga = _transFactory.NewSaga(gid);
//3. 添加子事务
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
new TransferRequest(fromUserId, amount))
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
new TransferRequest(toUserId, amount))
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果
//5. 提交Saga事务
await saga.Submit(cancellationToken);
}
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
{
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
return new BadRequestObjectResult($"转账失败:{ex.Message}");
}
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
}
SAGA 实践 (python
)
创建账户余额表:
CREATE TABLE dtm_busi.`user_account` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
核心业务代码,调整用户的账户余额
def saga_adjust_balance(cursor, uid, amount):
affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
if affected == 0:
raise Exception("update error, balance not enough")
具体的正向操作/补偿操作的处理函数
@app.post("/api/TransOutSaga")
def trans_out_saga():
saga_adjust_balance(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutCompensate")
def trans_out_compensate():
saga_adjust_balance(c, out_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInSaga")
def trans_in_saga():
saga_adjust_balance(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInCompensate")
def trans_in_compensate():
saga_adjust_balance(c, in_uid, -30)
return {"dtm_result": "SUCCESS"}
各个子事务的处理函数已经OK,然后是开启SAGA事务,进行分支调用
# 这是dtm服务地址
dtm = "http://localhost:8080/api/dtmsvr"
# 这是业务微服务地址
svc = "http://localhost:5000/api"
req = {"amount": 30}
s = saga.Saga(dtm, utils.gen_gid(dtm))
s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
s.submit()
处理网络异常 (子事务屏障
)
# DTM提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交
@app.post("/api/TransOutSaga")
def trans_out_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, out_uid, -30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "SUCCESS"}
# 这里的barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证busi_callback回调函数仅被提交一次
# 您可以尝试多次调用这个TransIn服务,仅有一次余额调整。
处理回滚
假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败
@app.post("/api/TransInSaga")
def trans_in_saga():
return {"dtm_result": "FAILURE"}
# 子事务屏障技术,能够保证TransIn的错误如果发生在提交之前,则补偿为空操作;
TransIn的错误如果发生在提交之后,则补偿操作会将数据提交一次
@app.post("/api/TransInSaga")
def trans_in_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, in_uid, 30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "FAILURE"}
[» 分布式事务最经典的七种解决方案](https://segmentfault.com/a/1190000040321750
TCC 模式
TCC
分布式事务,TCC是Try、Confirm、Cancel
三个词语的缩写
最早是由 `Pat Helland` 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出
TCC组成
TCC分为3个阶段
Try 阶段
尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
Confirm 阶段
如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
Cancel 阶段
如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
TCC分布式事务里,有3个角色 与经典的XA分布式事务一样
AP/应用程序
发起全局事务,定义全局事务包含哪些事务分支
RM/资源管理器
负责分支事务各项资源的管理
TM/事务管理器
负责协调全局事务的正确执行,包括Confirm,Cancel的执行,并处理网络异常
跨行转账操作
Try阶段冻结A转账的金额
Confirm进行实际的扣款
Cancel进行资金解冻
这样用户在任何一个阶段,看到的数据都是清晰明了的
TCC 实践 (成功
)
一个TCC事务的具体开发流程
首先创建两张表,一张是用户余额表,一张是冻结资金表
CREATE TABLE dtm_busi.`user_account` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
CREATE TABLE dtm_busi.`user_account_trading` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
-- trading_balance记录正在交易的金额
冻结/解冻资金操作
检查约束 balance+trading_balance >= 0,如果约束不成立,执行失败
def tcc_adjust_trading(cursor, uid, amount):
affected = utils.sqlexec(cursor, "update dtm_busi.user_account_trading set trading_balance=trading_balance + %d where user_id=%d and trading_balance + %d + (select balance from dtm_busi.user_account where id=%d) >= 0" % (amount, uid, amount, uid))
if affected == 0:
raise Exception("update error, maybe balance not enough")
调整余额
def tcc_adjust_balance(cursor, uid, amount):
utils.sqlexec(cursor, "update dtm_busi.user_account_trading set trading_balance = trading_balance+ %d where user_id=%d" %( -amount, uid))
utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d" %(amount, uid))
具体的Try/Confirm/Cancel的处理函数
# 各个子事务的处理函数
@app.post("/api/TransOutTry")
def trans_out_try():
# 事务以及异常处理
tcc_adjust_trading(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutConfirm")
def trans_out_confirm():
# 事务以及异常处理
tcc_adjust_balance(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutCancel")
def trans_out_cancel():
# 事务以及异常处理
tcc_adjust_trading(c, out_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInTry")
def trans_in_try():
# 事务以及异常处理
tcc_adjust_trading(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInConfirm")
def trans_in_confirm():
# 事务以及异常处理
tcc_adjust_balance(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInCancel")
def trans_in_cancel():
# 事务以及异常处理
tcc_adjust_trading(c, in_uid, -30)
return {"dtm_result": "SUCCESS"}
开启TCC事务,进行分支调用
@app.get("/api/fireTcc")
def fire_tcc():
# 发起tcc事务
gid = tcc.tcc_global_transaction(dtm, utils.gen_gid(dtm), tcc_trans)
return {"gid": gid}
# tcc事务的具体处理
def tcc_trans(t):
req = {"amount": 30} # 业务请求的负荷
# 调用转出服务的Try|Confirm|Cancel
t.call_branch(req, svc + "/TransOutTry", svc + "/TransOutConfirm", svc + "/TransOutCancel")
# 调用转入服务的Try|Confirm|Cancel
t.call_branch(req, svc + "/TransInTry", svc + "/TransInConfirm", svc + "/TransInCancel")
TCC 实践 (失败, 回滚
)
银行将金额准备转入用户2时,发现用户2的账户异常,返回失败
@app.post("/api/TransInTry")
def trans_in_try():
# 事务以及异常处理
tcc_adjust_trading(c, in_uid, 30)
return {"dtm_result": "FAILURE"}
和成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚
子事务屏障
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);
具体的服务再接收到来自Dtm的子事务分支调用
时,每次都会往子事务屏障表
barrier中插入一条数据
业务服务就是依赖此表来完成子事务的控制
子事务屏障的核心就是子事务屏障表唯一键的设计,以gid、branch_id、op和barrier_id
为唯一索引,利用唯一索引,“以改代查”来避免竞态条件
在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:
-
1 开启本地事务
-
2 对于当前操作op(action compensate)
使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)
向子事务屏障表插入一条数据,有几种情况:
1) 插入成功且影响条数大于0,则继续向下执行。
2) 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。
-
3 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求
1) 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。
2) 子事务失败,回滚本地事务
安装运行(本地)
MAC Homebrew 安装
brew install dtm
输入下面命令,即可启动运行
dtm
brew还会额外安装一个命令 dtm-qs
,这是一个quick-start客户端
,用于运行一个简单的例子。
在dtm启动之后,运行下面命令:
dtm-qs
可以看到控制台打印出TransOut、TransIn
成功,一个完整的分布式事务
就完成了
dtm运行后,会监听两个端口
http:36789
grpc:36790