【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务【云原生】

文章目录

  • 一、前言
  • 二、@GlobalTransactional
  • 1、GlobalTransactionScanner类(BPP)
  • 1)AbstractAutoProxyCreator(自动创建动态代理)
  • 2)BeanPostProcessor(对Bean进行修改的入口)
  • 3)从SpringBoot启动流程来看入口
  • 4)是否 / 如何生成动态代理对象
  • 第一步
  • 第二步
  • 第三步
  • 第四步
  • existsAnnotation()方法用于判断类是否需要被动态代理
  • 第五步
  • 三、全局事务的执行(前戏)
  • 四、总结
  • 一、前言

    至此,seata系列的内容包括:

    1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
    2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
    3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
    4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
    5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
    6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
    7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
    8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
    9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

    本文接着Seata使用@GlobalTransactional是如何开启全局事务的?

    PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

    二、@GlobalTransactional

    我们知道可以将@GlobalTransactional注解标注在类或方法上 开启全局事务,下面来看一下@GlobalTransactional是如何开启的全局事务?

    【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么一文中,我们知道了SpringBoot启动过程中会自动装配GlobalTransactionScanner类;

    1、GlobalTransactionScanner类(BPP)

    先看GlobalTransactionScanner类的继承关系:

    GlobalTransactionScanner类继承了AbstractAutoProxyCreatorAbstractAutoProxyCreator类又实现了BeanPostProcessor接口;因此GlobalTransactionScanner类也是BPP(BeanPostProcessor)

    下面简单看一下AbstractAutoProxyCreator类;

    1)AbstractAutoProxyCreator(自动创建动态代理)

    AbstractAutoProxyCreator是Spring AOP中的一个抽象类,其主要功能是自动创建动态代理;因为其实现了BeanPostProcessor接口,所以在类加载到Spring容器之前,会进入到其wrapIfNecessary()方法对Bean进行代理包装,后续调用Bean之将委托给指定的拦截器。

    另外其getAdvicesAndAdvisorsForBean()方法用于给子类实现,由子类决定一个Bean是否需要被代理(是否存在切面);并且它还可以返回只应用于特定Bean实例的附加拦截器

    2)BeanPostProcessor(对Bean进行修改的入口)

    BeanPostProcessor是Bean的后置处理器,可以通过实现其 并 覆写其postProcessBeforeInitialization()postProcessAfterInitialization() 方法在Bean初始化前后对其进行修改;

    AbstractAutoProxyCreator正是通过覆写BeanPostProcessorpostProcessAfterInitialization() 创建并返回Bean的代理对象;

    下面从SpringBoot启动流程来看针对标注了@GlobalTransactional的类 或 类中包含标注了@GlobalTransactional方法的类 创建动态代理的入口。

    3)从SpringBoot启动流程来看入口

    TradeService类为例:

    package com.saint.trade.service;
    
    import com.saint.trade.feign.OrderFeignClient;
    import com.saint.trade.feign.StockFeignClient;
    import io.seata.spring.annotation.GlobalTransactional;
    import lombok.RequiredArgsConstructor;
    import org.springframework.stereotype.Service;
    
    /**
     * @author Saint
     */
    @Service
    @RequiredArgsConstructor
    public class TradeService {
    
        private final StockFeignClient stockFeignClient;
        private final OrderFeignClient orderFeignClient;
    
        /**
         * 减库存,下订单
         *
         * @param userId
         * @param commodityCode
         * @param orderCount
         */
        @GlobalTransactional
        public void purchase(String userId, String commodityCode, int orderCount) {
            stockFeignClient.deduct(commodityCode, orderCount);
    
            orderFeignClient.create(userId, commodityCode, orderCount);
        }
        
        public void test() {
            System.out.println("hhahaha");
        }
    }
    
    

    TradeService类被@Component衍生注解@Service标注,TradeService类又在SpringBoot扫描路径中,因此SpringBoot启动时会扫描到TradeService类;

    TradeService类中包含两个方法:purchase()test(),其中purchase()方法被@GlobalTransactional注解标注。

    下面来看创建Bean时设计到BeanPostProcessor的代码片段:

    AbstractBeanFactory抽象Bean工厂的实现类AbstractAutowireCapableBeanFactoryinitializeBean()方法是初始化Bean的入口:


    在Bean初始化之后会调用BeanPostProcessorpostProcessAfterInitialization() 创建并返回Bean的代理对象;整体线程栈帧信息如下:

    最终进入到GlobalTransactionScanner覆写AbstractAutoProxyCreator抽象类的wrapIfNecessary()方法创建并返回代理对象(如果需要创建动态代理的话)。

    4)是否 / 如何生成动态代理对象

    从上面我们知道了GlobalTransactionScanner类的wrapIfNecessary()是创建动态代理的入口;这里接着来看wrapIfNecessary()方法如何判断是否Bean是否需要生成动态代理?

    // 对于扫描到的@GlobalTransactional注解, bean和beanName
    // 判断类 或 类的某一个方法是否被@GlobalTransactional注解标注,进而决定当前Class是否需要创建动态代理;存在则创建。
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // do checkers,做一些检查,不用花过多精力关注
        if (!doCheckers(bean, beanName)) {
            return bean;
        }
    
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy TCC的动态代理
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // init tcc fence clean task if enable useTccFence
                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener) interceptor);
                } else {
                    // 先获取目标Class的接口
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    
                    // existsAnnotation()表示类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                            && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
    
                    if (globalTransactionalInterceptor == null) {
                        // 构建一个全局拦截器
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener) globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
    
                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                // 如果当前Bean没有被AOP代理
                if (!AopUtils.isAopProxy(bean)) {
                    // 基于Spring AOP的AutoProxyCreator对当前Class创建全局事务动态动态代理类
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    int pos;
                    for (Advisor avr : advisor) {
                        // Find the position based on the advisor's order, and add to advisors by pos
                        // 找到seata切面的位置
                        pos = findAddSeataAdvisorPosition(advised, avr);
                        advised.addAdvisor(pos, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
    

    第一步

    1> 首先doCheckers()方法对Bean做一些检查,包括:Bean是否已经生成了代理类、Bean不允许生成代理类…

    第二步

    2> 对已经创建了动态代理的Bean的Set集合PROXYED_SET加锁做同步操作,如果PROXYED_SET中存在当前Bean的代理对象,则直接返回。

    第三步

    3> 根据@TwoPhaseBusinessAction注解判断是否是TCC模式下的动态代理(默认是AT模式,即不是TCC模式);

    第四步

    4> 获取Bean的目标Class,再通过existsAnnotation()方法查看 类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理;

    existsAnnotation()方法用于判断类是否需要被动态代理

    existsAnnotation()方法判断类是否需要被动态代理时:

    1. 首先判断类上是否标注了@GlobalTransactional注解,如果标注了,则直接返回true,表示类需要被动态代理;
    2. 否者,接着利用反射获取类的所有public方法,只要存在一个方法被@GlobalTransactional@GlobalLock 注解标注,则表示当前类需要被动态代理;

    PS:聪明的你肯定发现,当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,那么调用没加@GlobalTransactional注解的方法也会进入到代理对象,会不会有问题呢? 继续往后看,拦截器GlobalTransactionalInterceptor中会对其进行处理。

    当目标Class需要被动态代理时,则会初始化一个拦截器GlobalTransactionalInterceptor,用于拦截后面对目标Class的调用;

    那么GlobalTransactionalInterceptor是如何被应用于目标Class上做拦截的?

    第五步

    5> 如果针对当前Bean的代理是JDK 或 CGLIB动态代理,则根据GlobalTransactionalInterceptor创建切面,并应用到Bean上;

    在第四步时,我们对GlobalTransactionalInterceptor如何被应用于目标Class上做拦截持有疑问,在前面介绍AbstractAutoProxyCreator我们提到过:

    AbstractAutoProxyCreator的getAdvicesAndAdvisorsForBean()方法用于给子类实现,由子类决定一个Bean是否需要被代理(是否存在切面);并且它还可以返回只应用于特定Bean实例的附加拦截器;>>

    GlobalTransactionScanner覆写了getAdvicesAndAdvisorsForBean()方法,将上面初始化后的GlobalTransactionalInterceptor作为切面返回给AbstractAutoProxyCreator,供其创建动态代理类时使用;

    创建玩代理对象之后,将代理对象放入到GlobalTransactionScanner的动态代理Bean的Set集合PROXYED_SET,以快去获取Bean的代理对象 并 防止Bean代理对象的重复创建。最后将代理对象返回,创建Bean流程结束。

    至此,我们知道了所谓的@GlobalTransactional注解开启全局事务,实际就是针对类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象

    三、全局事务的执行(前戏)

    上面我们知道了所谓的@GlobalTransactional注解开启全局事务,其实就是类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。但是动态代理对象是针对类的;

    当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor会做进一步处理,保证只有加了@GlobalTransactional注解的方法才会开启全局事务。

    先看GlobalTransactionalInterceptor类的继承图:

    GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法:

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // method invocation是一次方法调用,一定是针对某个对象的方法调用;
        // methodInvocation.getThis()就是拿到当前方法所属的对象;
        // AopUtils.getTargetClass()获取到当前实例对象所对应的Class
        Class<?> targetClass =
                methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    
        // 通过反射获取到被调用目标Class的method方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    
        // 如果目标method不为空,并且方法的DeclaringClass不是Object
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            // 通过BridgeMethodResolver寻找method的桥接方法
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            // 获取目标方法的@GlobalTransactional注解
            final GlobalTransactional globalTransactionalAnnotation =
                    getAnnotation(method, targetClass, GlobalTransactional.class);
            // 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            // 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
            // 则localDisable等价于全局事务被禁用了
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
    
            // 如果全局事务没有被禁用
            if (!localDisable) {
                // 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        // 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                                globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                                globalTransactionalAnnotation.rollbackForClassName(),
                                globalTransactionalAnnotation.noRollbackFor(),
                                globalTransactionalAnnotation.noRollbackForClassName(),
                                globalTransactionalAnnotation.propagation(),
                                globalTransactionalAnnotation.lockRetryInterval(),
                                globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    // 真正处理全局事务的入口
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    // 获取事务锁
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        // 直接运行目标方法
        return methodInvocation.proceed();
    }
    

    假如我们调用TradeService类中没有标注@GlobalTransactional注解的test()方法;

    invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。

    四、总结

    所谓的@GlobalTransactional注解开启全局事务,实际就是针对类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。

    在调用相应Bean的时候,会进入到动态代理对象的拦截器GlobalTransactionalInterceptor

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务【云原生】

    发表评论