一、分布式事务问题由来
1、单体应用被拆分成多个微服务应用,原来的三个模块被拆分成三个独立的应用,分别部署在三台服务器上,且使用自己独立的数据库;
某次的业务操作中,需要调用这三个服务来完成,此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性却没有办法由任意一台的本地事务来保证;
一句话:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题!
2、什么是Seata?
Seata是Alibaba开源的一款分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务——一站式的分布式事务解决方案!
3、Seata完成分布式事务的处理过程
分布式事务处理过程: 1个ID + 3个组件模型
-
1个ID:全局唯一的事务ID;
-
3个组件:
Transaction Coordinator——TC – 事务协调者,维护全局和分支事务的状态,驱动全局事务提交或回滚。——总把头
Transaction Manager——TM – 事务管理器,定义并控制全局事务的边界:开始全局事务、并最终发起全局提交或全局回滚的决议。
Resource Manager——RM – 资源管理器,管理控制分支事务处理的资源,负责分支注册、状态汇报、并接受事务协调器的指令,并驱动分支事务提交或回滚。
处理过程:
1、TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID;
2、XID在微服务调用链路的上下文中传播;
3、RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖;
4、TM向TC发起针对XID的全局提交或回滚决议;
5、TC调度XID下管辖的全部分支事务完成提交或回滚操作;
二、Seata-Server的下载与安装
1、下载地址: https://github.com/seata/seata/releases
本次选择的版本是V0.9.0
2、解压下载文件,修改conf目录下的 file.conf 配置文件(别忘记备份好习惯);
这一步主要是修改:自定义事务组名称 + 事务日志存储模式为db + 数据库连接信息;
mode = "db"
3、根据上面的配置,我们需要到数据库中创建一个数据库,名为seata
建表脚本使用解压目录 /conf/db_store.sql 中的脚本;此脚本会在 seata 库中创建三张表:分支表+全局表+锁表
4、继续在conf目录下,修改 registry.conf 配置文件:
5、在启动了Nacos的基础上,启动seata-server
到这一步,我们的Seata的本地安装就已经完成了!
三、示例实战——准备工作
本次用例是一个用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
-
仓储服务:对给定的商品扣除仓储数量。
-
订单服务:根据采购需求创建订单。
-
帐户服务:从用户帐户中扣除余额。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,在通过远程调用账户服务来扣减用户账户里的余额,最后在订单服务中修改订单状态为已完成! 该操作跨越三个服务,三个数据库,必然会存在分布式事务问题! |
1、创建业务数据库:
-
seata_order:存储订单的数据库
-
seata_storage:存储库存的数据库
-
seata_account:存储账户信息的数据库
CREATE DATABASE seata_order; CREATE DATABASE seata_storage; CREATE DATABASE seata_account;
2、在上面3个数据库中分别创建自己的业务表:脚本如下:
CREATE TABLE `t_order` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `user_id` bigint(11) DEFAULT NULL COMMENT '用户id', `product_id` bigint(11) DEFAULT NULL COMMENT '产品id', `count` int(11) DEFAULT NULL COMMENT '数量', `money` decimal(11,0) DEFAULT NULL COMMENT '金额', `status` int(1) DEFAULT NULL COMMENT '订单状态 0:创建中,1:已完结', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8; CREATE TABLE `t_storage` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `product_id` bigint(11) DEFAULT NULL COMMENT '产品id', `total` int(11) DEFAULT NULL COMMENT '总库存', `used` int(11) DEFAULT NULL COMMENT '已用库存', `residue` int(11) DEFAULT NULL COMMENT '剩余库存', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; CREATE TABLE `t_account` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `user_id` bigint(11) DEFAULT NULL COMMENT '用户id', `total` decimal(10,0) DEFAULT NULL COMMENT '总额度', `used` decimal(10,0) DEFAULT NULL COMMENT '已用额度', `residue` decimal(10,0) DEFAULT NULL COMMENT '剩余额度', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
3、在上述的3个库中分别创建对应的回滚日志表:
建表脚本是 /conf/db_undo_log.sql
-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库) -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log drop table `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
4、最终数据库效果为:
四、示例实战——Order-Module配置搭建
首先弄清楚业务流程:
下订单 -> 减库存 -> 扣余额 -> 改(订单)状态
1、新建订单Order-Module
pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud2020</artifactId> <groupId>com.jiguiquan.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>seata-order-service2001</artifactId> <dependencies> <!--springcloud alibaba nacos discovery 以后服务中将nacos和sentinel一起配--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!--使用seata一定要将自带的版本seata去掉,然后引入我们使用的版本的seata依赖--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>0.9.0</version> </dependency> <!--openfeign--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <version>2.2.1.RELEASE</version> </dependency> <!--springboot项目web和actuator最好一起走--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--数据库操作4件套,jdbc/mysql/连接池druid/orm框架mybatis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <!--spring-boot-starter下面的所有版本都已经在父工程中的spring-boot-dependencies包含--> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> </dependency> <!--热部署devtools--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
2、application.xml 配置文件:
server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: tx-service-group: jgq_tx_group nacos: discovery: server-addr: localhost:8848 datasource: type: com.alibaba.druid.pool.DruidDataSource #当前数据源操作类型 driver-class-name: org.gjt.mm.mysql.Driver url: jdbc:mysql://localhost:3306/cloud2020?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 831121 feign: hystrix: enabled: false logging: level: io: seata: info mybatis: mapper-locations: classpath:mapper/*.xml
3、增加配置文件 file.conf,复制上面的seata配置即可:
4、增加配置文件registry.conf,和上面seata的配置一样:
5、创建domain域模型,叫entity也行,还有dao、mapper文件,项目结构如下:
6、使用Seata对数据源进行代理配置(重要)
Seata在AT模式下解决分布式事务的具体逻辑就是体现在对数据源的代理上面;
创建一个数据源代理配置类DataSourceProxyConfig:
/** * 使用seata对数据源进行代理 * seata在AT模式下解决分布式事务的具体逻辑体现在对数据源的代理上,即对DataSource产生代理变成DataSourceProxy,进行全局事务的管理和协调,因此在整合时,需通过配置类的方式进行配置 * 且DataSourceProxy必须是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务 * @author jigq * @create 2020-04-29 11:09 */ @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapper-locations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource() { return new DruidDataSource(); } @Primary @Bean public DataSourceProxy dataSourceProxy(DataSource druidDataSource) { return new DataSourceProxy(druidDataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSourceProxy); bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); bean.setTransactionFactory(new SpringManagedTransactionFactory()); return bean.getObject(); } }
因为我们自己配置了数据源,为了防止循环注入,我们在启动类上需要将“数据源自动装载”排除在外——我们使用了Mybatis的Starter;
7、启动类:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) //取消数据源的自动创建 @EnableDiscoveryClient @EnableFeignClients public class SeataOrderMain2001 { public static void main(String[] args) { SpringApplication.run(SeataOrderMain2001.class, args); } }
8、新建库存Storage-Module
9、新建账户Account-Module,这两部分基础结构与Order-Module一样,略;
五、实例实战——业务代码Order-Module
1、在Order-Module模块,创建service代码:
其中OrderService是本层的业务代码,而StorageService和AccountService是OpenFeign的远程调用接口类;
2、开始编写创建订单的业务逻辑代码:
OrderServiceImpl.java:
@Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderMapper orderMapper; @Resource private StorageService storageService; @Resource private AccountService accountService; @Override public Order create(Order order) { log.info("------>开始新建订单"); order.setStatus(0); orderMapper.insert(order); System.out.println("此处已获得主键为:"+order.getId()); log.info("------>订单微服务开始调用库存,做扣减"); storageService.decrease(order.getProductId(), order.getCount()); log.info("------>订单微服务开始调用库存,做扣减End"); log.info("------>订单微服务开始调用账户,做扣减"); accountService.decrease(order.getUserId(), order.getMoney()); log.info("------>订单微服务开始调用账户,做扣减End"); //修改原订单状态,从0到1 log.info("------>修改订单状态开始"); order.setStatus(1); orderMapper.updateByPrimaryKey(order); log.info("------>修改订单状态结束"); return order; } }
StorageService.java:
@FeignClient(value = "seata-storage-service") public interface StorageService { @PostMapping(value = "/storage/decrease") CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count); }
AccountService.java:
@FeignClient(value = "seata-account-service") public interface AccountService { @PostMapping(value = "/account/decrease") CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money); }
3、编写Controller层OrderController.java:
@RestController public class OrderController { @Autowired private OrderService orderService; @PostMapping("/order/create") public CommonResult<Order> create(@RequestBody Order order){ Order result = orderService.create(order); return new CommonResult<>(200, "创建订单成功", result); } }
4、Order-Module创建成功后,我们可以尝试启动Order2001服务,一切顺利!
六、实例实战——业务代码Storage-Module
StorageController.java
@RestController public class StorageController { @Autowired private StorageService storageService; public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count){ storageService.decrease(productId, count); return new CommonResult(200, "扣减库存成功"); } }
StorageServiceImpl.java:
@Service @Slf4j public class StorageServiceImpl implements StorageService { @Resource private StorageMapper storageMapper; @Override public void decrease(Long productId, Integer count) { log.info("------>storage-service中扣减库存开始"); storageMapper.decrease(productId, count); log.info("------>storage-service中扣减库存结束"); } }
StorgaeMapper.java:
StorageMapper.xml:
void decrease(@Param("productId") Long productId, @Param("count") Integer count); <update id="decrease"> update t_storage set used = used + #{count,jdbcType=INTEGER}, residue = residue - #{count,jdbcType=INTEGER} where product_id = #{productId,jdbcType=BIGINT} </update>
七、实例实战——业务代码Account-Module
AccountController.java:
@RestController public class AccountController { @Autowired private AccountService accountService; @PostMapping(value = "/account/decrease") CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){ accountService.decrease(userId, money); return new CommonResult(200, "扣减账户余额成功"); } }
AccountServiceImpl:
@Service @Slf4j public class AccountServiceImpl implements AccountService { @Resource private AccountMapper accountMapper; @Override public void decrease(Long userId, BigDecimal money) { log.info("------>account-service中扣除账户余额开始"); accountMapper.decrease(userId, money); log.info("------>account-service中扣除账户余额结束"); } }
AccountMapper.java:
AccountMapper.xml:
void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money); <update id="decrease"> update t_account set used = used + #{money,jdbcType=DECIMAL}, residue = residue - #{money,jdbcType=DECIMAL} where user_id = #{userID,jdbcType=BIGINT} </update>
八、启动测试验证效果
1、分别启动3个服务,顺利启动,测试前数据库截图如下:
2、首先测试一组正常下单情况:
POST { "userId":1, "productId":1, "count":10, "money":100 }
请求结果:
此时我们看看三张数据库表的数据结果:
很顺利,没有问题!
3、模拟异常,在account服务中模拟超时异常,此时我们还没用到seata的@GlobalTransactional注解:
注意Feign默认的请求超时时间为1秒钟:
@RestController public class AccountController { @Autowired private AccountService accountService; @PostMapping(value = "/account/decrease") CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){ try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } accountService.decrease(userId, money); return new CommonResult(200, "扣减账户余额成功"); } }
之后,我们重新访问刚刚的请求,结果是Feign访问超时:
Read timed out executing POST http://seata-account-service/account/decrease?userId=1&money=100
检查一下数据库,看看结果如何:
库存减了,钱被扣了,但是订单没有正常完成,还是未完成状态,这显然是由问题的!
更可怕的是,由于Feign有超时重试机制,有时候,账户余额甚至被多次扣款,这就更严重了!!
这就是分布式事务问题的表现!!!
4、还是上面的超时异常,但是本次我们使用Seata的@GlobalTransactional注解:
我们在OrderServiceImpl.java的方法体上,加上@GlobalTransactional注解:
//发现的所有Exception,通通回滚 @Override @GlobalTransactional(name = "jgq-create-order", rollbackFor = Exception.class) public Order create(Order order) { log.info("------>开始新建订单"); order.setStatus(0); orderMapper.insert(order); System.out.println("此处已获得主键为:"+order.getId()); log.info("------>订单微服务开始调用库存,做扣减"); storageService.decrease(order.getProductId(), order.getCount()); log.info("------>订单微服务开始调用库存,做扣减End"); log.info("------>订单微服务开始调用账户,做扣减"); accountService.decrease(order.getUserId(), order.getMoney()); log.info("------>订单微服务开始调用账户,做扣减End"); //修改原订单状态,从0到1 log.info("------>修改订单状态开始"); order.setStatus(1); orderMapper.updateByPrimaryKey(order); log.info("------>修改订单状态结束"); return order; }
此时我有访问了一次刚刚的链接,依然是超时异常,我还故意躲操作了两次!然后我们再看看数据库:
本应该像上面异常一样的错误数据(半成品)没有被写进数据库,严重的分布式事务被避免了,仅仅就是一个注解@GlobalTransactional;
到这里,我们使用Seata解决分布式事务的测试校验工作,就算是成功结束了!
个人此项目代码地址(持续更新):