分布式事务

分布式事务

概念、理论

本地事务的四大特性

  • 原子性(Atomicity)
  • 一致性(Consistency)
  • 隔离性(Isolation)
  • 持久性(Durability)

本地事务特性

分布式事务

指不是在单个服务或单个数据库架构下,产生的事务

CAP理论

在P一定会出现的情况下,A和C之间只能实现一个

  • Consistency(一致性)

    用户访问分布式系统中的任意节点,得到的数据必须一致。

  • Availability(可用性)

    用户访问集群中的任意健康节点,必须能得到响应

  • Partition tolerance (分区容错性)

    为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区

    在集群出现分区时,整个系统也要持续对外提供服务

BASE理论

是对CAP的一种解决思路

  • Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
  • Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
  • Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。

事务协调者

事务协调者

Seate

Seata 是一款开源的分布式事务解决方案

官网

Seata架构

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

部署TC

**TC (Transaction Coordinator) -**事务协调者

  1. 下载seata-server包并解压http://seata.io/zh-cn/blog/download.html

  2. 修改registry.conf配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    registry {
    # tc服务的注册中心类file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
    type = "nacos"

    nacos {
    # seata tc 服务注册到 nacos的服务名称,可以自定义
    application = "seata-tc-server"
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    # 集群名
    cluster = "default"
    username = "nacos"
    password = "nacos"
    }

    }

    config {
    # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
    # file、nacos 、apollo、zk、consul、etcd3
    type = "nacos"

    nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
    }
    }
  3. nacos添加配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    # 数据存储方式,db代表数据库
    store.mode=db
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.jdbc.Driver
    store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=UTC
    store.db.user=root
    store.db.password=123456
    store.db.minConn=5
    store.db.maxConn=30
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.queryLimit=100
    store.db.lockTable=lock_table
    store.db.maxWait=5000
    # 事务、日志等配置
    server.recovery.committingRetryPeriod=1000
    server.recovery.asynCommittingRetryPeriod=1000
    server.recovery.rollbackingRetryPeriod=1000
    server.recovery.timeoutRetryPeriod=1000
    server.maxCommitRetryTimeout=-1
    server.maxRollbackRetryTimeout=-1
    server.rollbackRetryTimeoutUnlockEnable=false
    server.undo.logSaveDays=7
    server.undo.logDeletePeriod=86400000

    # 客户端与服务端传输方式
    transport.serialization=seata
    transport.compressor=none
    # 关闭metrics功能,提高性能
    metrics.enabled=false
    metrics.registryType=compact
    metrics.exporterList=prometheus
    metrics.exporterPrometheusPort=9898
  4. 创建数据库seata,tc服务在管理分布式事务时,需要记录事务相关数据到数据库中,需要提前创建好这些表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;

    -- ----------------------------
    -- 分支事务表
    -- ----------------------------
    DROP TABLE IF EXISTS `branch_table`;
    CREATE TABLE `branch_table` (
    `branch_id` BIGINT(20) NOT NULL,
    `xid` VARCHAR(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
    `transaction_id` BIGINT(20) NULL DEFAULT NULL,
    `resource_group_id` VARCHAR(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `resource_id` VARCHAR(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `branch_type` VARCHAR(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `status` TINYINT(4) NULL DEFAULT NULL,
    `client_id` VARCHAR(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `application_data` VARCHAR(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `gmt_create` DATETIME(6) NULL DEFAULT NULL,
    `gmt_modified` DATETIME(6) NULL DEFAULT NULL,
    PRIMARY KEY (`branch_id`) USING BTREE,
    INDEX `idx_xid`(`xid`) USING BTREE
    ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;

    -- ----------------------------
    -- 全局事务表
    -- ----------------------------
    DROP TABLE IF EXISTS `global_table`;
    CREATE TABLE `global_table` (
    `xid` VARCHAR(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
    `transaction_id` BIGINT(20) NULL DEFAULT NULL,
    `status` TINYINT(4) NOT NULL,
    `application_id` VARCHAR(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `transaction_service_group` VARCHAR(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `transaction_name` VARCHAR(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `timeout` INT(11) NULL DEFAULT NULL,
    `begin_time` BIGINT(20) NULL DEFAULT NULL,
    `application_data` VARCHAR(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `gmt_create` DATETIME NULL DEFAULT NULL,
    `gmt_modified` DATETIME NULL DEFAULT NULL,
    PRIMARY KEY (`xid`) USING BTREE,
    INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,
    INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
    ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;

    SET FOREIGN_KEY_CHECKS = 1;
  5. seata的bin目录下执行seata-server.bat来启动TC

微服务集成Seata

  • 导入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!--seata-->
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
    <!--版本较低,1.3.0,因此排除-->
    <exclusion>
    <artifactId>seata-spring-boot-starter</artifactId>
    <groupId>io.seata</groupId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <!--seata starter 采用1.4.2版本-->
    <version>${seata.version}</version>
    </dependency>
  • 配置tc地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    seata:
    registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    type: nacos # 注册中心类型 nacos
    nacos:
    server-addr: 127.0.0.1:8848 # nacos地址
    namespace: "" # namespace,默认为空
    group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
    application: seata-tc-server # seata服务名称
    username: nacos
    password: nacos
    tx-service-group: seata-demo # 事务组名称
    service:
    vgroup-mapping: # 事务组与cluster的映射关系
    seata-demo: default

Seata的分布式事务解决方案

Seata提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入

    事务协调者通知每个事物参与者执行本地事务,只有事务都成功了才提交

    优点:实现简单,并且没有代码侵入

    缺点:性能较差

  • TCC模式:最终一致的分阶段事务模式,有业务侵入

    • Try:资源的检测和预留
    • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
    • Cancel:预留资源释放,可以理解为try的反向操作。

    优点:无需生成快照,无需使用全局锁,性能最强

    缺点:有代码侵入,需要人为编写try、Confirm和Cancel接口,存在软状态

  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式

    完成事务直接提交,通过记录undo-log(数据快照)实现事务回滚,解决XA性能差的问题

    优点:性能好,并且没有代码侵入

    缺点:AT在一阶段和二阶段之间为软状态

  • SAGA模式:长事务模式,有业务侵入

实现XA模式
  • 配置开启XA模式

    1
    2
    seata:
    data-source-proxy-mode: XA
  • 给发起全局事务的入口方法添加@GlobalTransactional注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //实例
    @Override
    @GlobalTransactional
    public Long create(Order order) {
    // 创建订单
    orderMapper.insert(order);
    try {
    // 扣用户余额
    accountClient.deduct(order.getUserId(), order.getMoney());
    // 扣库存
    storageClient.deduct(order.getCommodityCode(), order.getCount());

    } catch (FeignException e) {
    log.error("下单失败,原因:{}", e.contentUTF8(), e);
    throw new RuntimeException(e.contentUTF8(), e);
    }
    return order.getId();
    }
  • 重启服务。 只有所有事务都成功,TC才会通知微服务提交事务

实现AT模式
  • 创建两张数据库表,记录全局锁,其中lock_table导入到TC服务关联的数据库,undo_log表导入到微服务关联的数据库

  • 配置开启AT模式

    1
    2
    seata:
    data-source-proxy-mode: AT
  • 给发起全局事务的入口方法添加@GlobalTransactional注解

  • 重启服务

实现TCC模式

概念:

  • 空回滚:在未执行try操作时先执行了cancel操作
  • 业务悬挂:对于已经空回滚的业务,之前被阻塞的try操作恢复,继续执行try,就永远不可能confirm或cancel ,事务一直处于中间状态

实例:

  • 创建表,用于存储回滚时需要恢复的数据

    1
    2
    3
    4
    5
    6
    7
    REATE TABLE `account_freeze_tbl`  (
    `xid` VARCHAR(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
    `user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `freeze_money` INT(11) UNSIGNED NULL DEFAULT 0,
    `state` INT(1) NULL DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
    PRIMARY KEY (`xid`) USING BTREE
    ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
    • xid:是全局事务id
    • freeze_money:用来记录用户冻结金额
    • state:用来记录事务状态
  • 声明TCC接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package cn.itcast.account.service;

    //声明为TCC事务
    @LocalTCC
    public interface AccountTCCService {

    //Try
    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
    @BusinessActionContextParameter(paramName = "money") int money);

    //Confirm
    boolean confirm(BusinessActionContext ctx);

    //Cancel
    boolean cancel(BusinessActionContext ctx);
    }
  • 实现TCC接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    package cn.itcast.account.service.impl;

    @Service
    public class AccountTccServiceImpl implements AccountTCCService {

    //记录事务状态
    @Autowired
    private AccountFreezeMapper accountFreezeMapper;

    @Autowired
    private AccountMapper accountMapper;

    @Override
    @Transactional
    public void deduct(String userId, int money) {
    //获取事务id
    String xid = RootContext.getXID();
    //是否业务悬挂
    if (accountFreezeMapper.selectById(xid) != null) {
    //证明业务已经执行过一次
    return;
    }
    //执行业务操作
    accountMapper.deduct(userId, money);
    //记录事务状态
    AccountFreeze freeze = new AccountFreeze();
    freeze.setXid(xid);
    freeze.setUserId(userId);
    freeze.setFreezeMoney(money);
    freeze.setState(AccountFreeze.State.TRY); //设置事务状态为Try
    //提交事务状态
    accountFreezeMapper.insert(freeze);
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
    //都成功后提交事务
    String xid = ctx.getXid();
    //删除冻结记录
    int count = accountFreezeMapper.deleteById(xid);

    return count == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
    //失败回滚事务
    String xid = ctx.getXid();
    //判断是否空回滚
    if(accountFreezeMapper.selectById(xid) == null) {
    AccountFreeze freeze = new AccountFreeze();
    freeze.setXid(xid);
    freeze.setFreezeMoney(0);
    freeze.setUserId(ctx.getActionContext("userId").toString());
    freeze.setState(AccountFreeze.State.CANCEL);
    accountFreezeMapper.insert(freeze);
    return true;
    }
    AccountFreeze freeze = accountFreezeMapper.selectById(xid);
    accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney()); //自定义的方法,用于恢复之前的数据
    //将事务状态改为CANCEL,并清空冻结金额
    freeze.setFreezeMoney(0);
    freeze.setState(AccountFreeze.State.CANCEL);
    int count = accountFreezeMapper.updateById(freeze);

    return count == 1;
    }
    }


分布式事务
http://xwww12.github.io/2022/09/20/微服务/分布式事务/
作者
xw
发布于
2022年9月20日
许可协议