分布式事务框架

常见的分布式事务 方案

分布式事务模式 介绍 技术栈
AT 模式 无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本(sql都由框架托管统一执行,会存在脏写问题) seata、shardingsphere
TCC 模式 高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景(第一阶段会产生行锁,事务执行太久会锁行很久) seata、service-comb
Saga 模式 长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统(第一阶段就操作DB,会存在脏读问题) seata、shardingsphere、service-comb
XA模式 分布式强一致性的解决方案,但性能低而使用较少。 seata、shardingsphere
Saga和TCC模式区别不大,TCC就是多了个锁行的步骤(避免了脏读,但事务执行太久会导致锁行很久,不适用于长事务)    

分布式事务最经典的七种解决方案




常见的分布式解决方案框架

特性 DTM SEATA 备注
支持语言 Golang、python、php及其他 Java dtm可轻松接入一门新语言
异常处理 子事务屏障自动处理 手动处理 dtm解决了幂等、悬挂、空补偿
TCC事务  
XA事务  
AT事务 AT与XA类似,性能更好,但有脏回滚
SAGA事务 简单模式 状态机复杂模式 dtm的状态机模式在规划中
事务消息 dtm提供类似rocketmq的事务消息
通信协议 HTTP dubbo等协议,无HTTP dtm后续将支持grpc类协议
star数量 github stars github stars dtm从20210604发布0.1,发展快



DTM(Distributed Transaction Manager)框架

DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题

它提供了Saga、TCC、 XA二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等悬挂空补偿等异常问题。

DTM 介绍

DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题

DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,
避免只更新了一部分数据产生的一致性问题



DTM 流程

DTM的架构

整个DTM架构中,一共有三个角色,分别承担了不同的职责:

RM-资源管理器

RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。
例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。

AP-应用程序:

AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。
例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM

TM-事务管理器

TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。
每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。

总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。 TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支, TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。

Saga 模式

使用DTMSaga模式,首先要进行 事务拆分

DTM Saga 时序图

  • 成功

用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回

DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回

DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回

DTM已完成所有的事务分支,将全局事务的状态修改为已完成
  • 失败

在实际的业务中,子事务可能出现失败,例如转入的子账号被冻结导致转账失败。我们对业务代码进行修改,让TransIn的正向操作失败

在转入操作失败的情况下,TransIn和TransOut的补偿操作被执行,保证了最终的余额和转账前是一样的

转出子事务(TransOut)


 [HttpPost("TransferOut")]
    public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}转出{request.Amount}元";
        _logger.LogInformation($"转出子事务-启动:{msg}");
        // 1. 创建子事务屏障
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                // 2. 在子事务屏障内执行事务操作
                await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"转出子事务-执行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null || bankAccount.Balance < request.Amount)
                        throw new InvalidDataException("账户不存在或余额不足!");
                    bankAccount.Balance -= request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"转出子事务-失败:{ex.Message}");
            // 3. 按照接口协议,返回409,以表示子事务失败
            return new StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"转出子事务-成功:{msg}");
        return Ok();
    }

  • 1 必须开启子事务屏障

    gid:全局事务Id trans_type:事务类型,是saga、msg、xa或者是tcc。 branch_id:子事务的Id op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)

  • 2 必须在子事务屏障内执行事务操作

  • 3 对于Saga正向操作而言,业务上的失败异常是需要做严格区分的

    例如余额不足,是业务上的失败,必须回滚。 而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。

因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回409状态码以告知DTM 子事务失败。

转出补偿子事务(TransOut_Compensate)

    [HttpPost("TransferOut_Compensate")]
    public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
        _logger.LogInformation($"转出补偿子事务-启动:{msg}");
        // 1. 创建子事务屏障
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            // 在子事务屏障内执行事务操作
            await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"转出补偿子事务-执行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null)
                    return; //对于补偿操作,可直接返回,中断后续操作
                bankAccount.Balance += request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"转出补偿子事务-成功!");
        // 2. 因补偿操作必须成功,所以必须返回200。
        return Ok();
    }

由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。 但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。

另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。

因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回200。 因此当出现bankAccount==null时可以直接 return。

转入子事务(TransIn)

转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑 并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败

[HttpPost("TransferIn")]
    public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}转入{request.Amount}元";
        _logger.LogInformation($"转入子事务-启动:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"转入子事务-执行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null)
                        throw new InvalidDataException("账户不存在!");
                    bankAccount.Balance += request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"转入子事务-失败:{ex.Message}");
            return new StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"转入子事务-成功:{msg}");
        return Ok();
    }

转入补偿子事务(TransIn_Compensate)

转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后, 在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功

 [HttpPost("TransferIn_Compensate")]
    public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
    {
        var msg = "用户{request.UserId}回滚转入{request.Amount}元";
        _logger.LogInformation($"转入补偿子事务-启动:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"转入补偿子事务-执行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null) return;
                bankAccount.Balance -= request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"转入补偿子事务-成功!");
        return Ok();
    }

Saga事务编排

生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);

创建Saga全局事务:_transFactory.NewSaga(gid);

添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。

如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。

提交Saga全局事务:saga.Submit(cancellationToken);

若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。
 [HttpPost("Transfer")]
    public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
        CancellationToken cancellationToken)
    {
        try
        {
            _logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
            //1. 生成全局事务ID
            var gid = await _dtmClient.GenGid(cancellationToken);
            var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
            //2. 创建Saga
            var saga = _transFactory.NewSaga(gid);
            //3. 添加子事务
        	saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
                    new TransferRequest(fromUserId, amount))
                .Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
                    new TransferRequest(toUserId, amount))
                .EnableWaitResult(); // 4. 按需启用是否等待事务执行结果

            //5. 提交Saga事务
            await saga.Submit(cancellationToken);
        }
        catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
        {
            _logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
            return new BadRequestObjectResult($"转账失败:{ex.Message}");
        }

        _logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
        return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
    }

SAGA 实践 (python)

创建账户余额表:

CREATE TABLE dtm_busi.`user_account` (
  `id` int(11) AUTO_INCREMENT PRIMARY KEY,
  `user_id` int(11) not NULL UNIQUE ,
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `create_time` datetime DEFAULT now(),
  `update_time` datetime DEFAULT now()
);

核心业务代码,调整用户的账户余额

def saga_adjust_balance(cursor, uid, amount):
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
  if affected == 0:
    raise Exception("update error, balance not enough")

具体的正向操作/补偿操作的处理函数

@app.post("/api/TransOutSaga")
def trans_out_saga():
  saga_adjust_balance(c, out_uid, -30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransOutCompensate")
def trans_out_compensate():
  saga_adjust_balance(c, out_uid, 30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransInSaga")
def trans_in_saga():
  saga_adjust_balance(c, in_uid, 30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransInCompensate")
def trans_in_compensate():
  saga_adjust_balance(c, in_uid, -30)
  return {"dtm_result": "SUCCESS"}

各个子事务的处理函数已经OK,然后是开启SAGA事务,进行分支调用

# 这是dtm服务地址
dtm = "http://localhost:8080/api/dtmsvr"
# 这是业务微服务地址
svc = "http://localhost:5000/api"

    req = {"amount": 30}
    s = saga.Saga(dtm, utils.gen_gid(dtm))
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
    s.submit()

处理网络异常 (子事务屏障)

# DTM提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交
@app.post("/api/TransOutSaga")
def trans_out_saga():
  with barrier.AutoCursor(conn_new()) as cursor:
    def busi_callback(c):
      saga_adjust_balance(c, out_uid, -30)
    barrier_from_req(request).call(cursor, busi_callback)
  return {"dtm_result": "SUCCESS"}
  
# 这里的barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证busi_callback回调函数仅被提交一次​

# 您可以尝试多次调用这个TransIn服务,仅有一次余额调整。

处理回滚

假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败
@app.post("/api/TransInSaga")
def trans_in_saga():
  return {"dtm_result": "FAILURE"}
  
  
# 子事务屏障技术,能够保证TransIn的错误如果发生在提交之前,则补偿为空操作;
TransIn的错误如果发生在提交之后,则补偿操作会将数据提交一次

@app.post("/api/TransInSaga")
def trans_in_saga():
  with barrier.AutoCursor(conn_new()) as cursor:
    def busi_callback(c):
      saga_adjust_balance(c, in_uid, 30)
    barrier_from_req(request).call(cursor, busi_callback)
  return {"dtm_result": "FAILURE"}

[» 分布式事务最经典的七种解决方案](https://segmentfault.com/a/1190000040321750



TCC 模式

TCC分布式事务,TCC是Try、Confirm、Cancel三个词语的缩写

最早是由 `Pat Helland` 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出

TCC组成

TCC分为3个阶段

Try 阶段

尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)

Confirm 阶段

如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源

Cancel 阶段

如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

TCC分布式事务里,有3个角色 与经典的XA分布式事务一样

AP/应用程序

发起全局事务,定义全局事务包含哪些事务分支

RM/资源管理器

负责分支事务各项资源的管理

TM/事务管理器

负责协调全局事务的正确执行,包括Confirm,Cancel的执行,并处理网络异常


跨行转账操作 

    Try阶段冻结A转账的金额
    Confirm进行实际的扣款
    Cancel进行资金解冻

这样用户在任何一个阶段,看到的数据都是清晰明了的

TCC 实践 (成功)

一个TCC事务的具体开发流程

首先创建两张表,一张是用户余额表,一张是冻结资金表

CREATE TABLE dtm_busi.`user_account` (
  `id` int(11) AUTO_INCREMENT PRIMARY KEY,
  `user_id` int(11) not NULL UNIQUE ,
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `create_time` datetime DEFAULT now(),
  `update_time` datetime DEFAULT now()
);

CREATE TABLE dtm_busi.`user_account_trading` (
  `id` int(11) AUTO_INCREMENT PRIMARY KEY,
  `user_id` int(11) not NULL UNIQUE ,
  `trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `create_time` datetime DEFAULT now(),
  `update_time` datetime DEFAULT now()
);

-- trading_balance记录正在交易的金额

冻结/解冻资金操作

检查约束 balance+trading_balance >= 0,如果约束不成立,执行失败
def tcc_adjust_trading(cursor, uid, amount):
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account_trading set trading_balance=trading_balance + %d where user_id=%d and trading_balance + %d + (select balance from dtm_busi.user_account where id=%d) >= 0" % (amount, uid, amount, uid))
  if affected == 0:
    raise Exception("update error, maybe balance not enough")

调整余额

def tcc_adjust_balance(cursor, uid, amount):
  utils.sqlexec(cursor, "update dtm_busi.user_account_trading set trading_balance = trading_balance+ %d where user_id=%d" %( -amount, uid))
  utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d" %(amount, uid))

具体的Try/Confirm/Cancel的处理函数

# 各个子事务的处理函数
@app.post("/api/TransOutTry")
def trans_out_try():
  # 事务以及异常处理
  tcc_adjust_trading(c, out_uid, -30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransOutConfirm")
def trans_out_confirm():
  # 事务以及异常处理
  tcc_adjust_balance(c, out_uid, -30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransOutCancel")
def trans_out_cancel():
  # 事务以及异常处理
  tcc_adjust_trading(c, out_uid, 30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransInTry")
def trans_in_try():
  # 事务以及异常处理
  tcc_adjust_trading(c, in_uid, 30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransInConfirm")
def trans_in_confirm():
  # 事务以及异常处理
  tcc_adjust_balance(c, in_uid, 30)
  return {"dtm_result": "SUCCESS"}

@app.post("/api/TransInCancel")
def trans_in_cancel():
  # 事务以及异常处理
  tcc_adjust_trading(c, in_uid, -30)
  return {"dtm_result": "SUCCESS"}

开启TCC事务,进行分支调用


@app.get("/api/fireTcc")
def fire_tcc():
    # 发起tcc事务
    gid = tcc.tcc_global_transaction(dtm, utils.gen_gid(dtm), tcc_trans)
    return {"gid": gid}

# tcc事务的具体处理
def tcc_trans(t):
    req = {"amount": 30} # 业务请求的负荷
    # 调用转出服务的Try|Confirm|Cancel
    t.call_branch(req, svc + "/TransOutTry", svc + "/TransOutConfirm", svc + "/TransOutCancel")
    # 调用转入服务的Try|Confirm|Cancel
    t.call_branch(req, svc + "/TransInTry", svc + "/TransInConfirm", svc + "/TransInCancel")

TCC 实践 (失败, 回滚)

银行将金额准备转入用户2时,发现用户2的账户异常,返回失败

@app.post("/api/TransInTry")
def trans_in_try():
  # 事务以及异常处理
  tcc_adjust_trading(c, in_uid, 30)
  return {"dtm_result": "FAILURE"}

和成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚



子事务屏障


create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表 barrier中插入一条数据 业务服务就是依赖此表来完成子事务的控制

子事务屏障的核心就是子事务屏障表唯一键的设计,以gid、branch_id、op和barrier_id为唯一索引,利用唯一索引,“以改代查”来避免竞态条件

在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:

  • 1 开启本地事务

  • 2 对于当前操作op(action compensate)

使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事务屏障表插入一条数据,有几种情况:

1) 插入成功且影响条数大于0,则继续向下执行。

2) 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。
  • 3 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求

    1) 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。

    2) 子事务失败,回滚本地事务







安装运行(本地)

MAC Homebrew 安装

brew install dtm

输入下面命令,即可启动运行

dtm

brew还会额外安装一个命令 dtm-qs,这是一个quick-start客户端,用于运行一个简单的例子。 在dtm启动之后,运行下面命令:

dtm-qs

可以看到控制台打印出TransOut、TransIn成功,一个完整的分布式事务就完成了

dtm运行后,会监听两个端口

http:36789
grpc:36790



Seata 框架 (todo)

Buy me a 肥仔水!