一止长渊

docker配置Mysql集群(下)——Spring代码层读写分离

N 人看过
字数:3k字 | 预计阅读时长:14分钟

上节介绍了利用Docker部署MYSQL集群,下面介绍如何利用部署好的master和slave实现Spring代码层的读写分离,保证发生读操作时访问slave结点,发生写操作时访问master结点。

1. 实现基础

a.spring有关数据源路由的源码

package org.springframework.jdbc.datasource.lookup;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.sql.DataSource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.AbstractDataSource;
import org.springframework.util.Assert;

public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
    private Map<Object, Object> targetDataSources;
    private Object defaultTargetDataSource;
    private boolean lenientFallback = true;
    private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
    private Map<Object, DataSource> resolvedDataSources;
    private DataSource resolvedDefaultDataSource;
    
    protected DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
        Object lookupKey = this.determineCurrentLookupKey();
        DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }

        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        } else {
            return dataSource;
        }
    }

    protected abstract Object determineCurrentLookupKey();
    
    ...
}

首先看一下在springframework.jdbc包下的源码,类名AbstractRoutingDataSource(抽象路由数据源),这个虚拟类就是spring提供的提供路由导向的数据库类

  • 通过变量Map存储不同方法的数据源
  • determineTargetDataSource方法通过调用determineCurrentLookupKey来指明使用何种数据库,然后在Map中寻找相应的datasource

我们可以通过实现上面determineCurrentLookupKey抽象方法,通过建立一个Map存储master和slave的数据源,通过SQL对应是update还是insert操作,来决定使用何种key,进而调用哪个数据源

b.Mybatis提供Interceptor

package org.apache.ibatis.plugin;

import java.util.Properties;

public interface Interceptor {
    Object intercept(Invocation var1) throws Throwable;

    Object plugin(Object var1);

    void setProperties(Properties var1);
}

Interceptor是mybatis提供一个接口,是拦截类,可以拦截mybatis中的操作

  • plugin方法:定义拦截何种类型的类,以决定是否织入代理
  • intercept方法:定义对拦截下来的类织入的逻辑

通过自定义实现接口中的两个方法,我们可以使用plugin方法仅拦截属于Mybatis底层SQL执行器的类,interceptor方法可以编写对应拦截到执行器的执行SQL方法

c.如何将Interceptor方法和determineCurrentLookupKey方法以及具体不同数据库操作的线程统筹

这里用到了一个中间人——ThreadLocal,通过ThreadLocal来隔绝不同数据库线程的要选择的key。
具体而言为:Interceptor通过拦截执行器对应的数据库操作,查看是属于读还是写操作,来向ThreadLocal中放入对应线程选择数据源的名字(master or slave),然后通过实现AbstractRoutingDataSource中的determineCurrentLookupKey方法去从ThreadLocal中取对应线程放入的key,然后根据对应的key在Map型变量targetDataSources选择相应的数据源

2.实现方式

a.DynamicDataSourceHolder类

主要定义中间人池子ThreadLocal来隔离不同线程放入的key,其中可选择的key定义为静态变量”master”和”slave”

package com.imooc.o2o.dao.split;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 池子,放入决定数据源key的池子,其中池子利用ThreadLocal设置为线程安全
 */
public class DynamicDataSourceHolder {
    private static Logger logger = LoggerFactory.getLogger(DynamicDataSourceHolder.class);
    // 池子,利用ThreadLocal保证线程安全,线程安全是访问同一个变量时容器出现问题,特别多个线程对一个变量进行写入的时候
    // ThreadLocal是除了加锁之外的另外一种保证多线程访问变量的线程安全方法,即每个线程对变量的访问都是基于线程自己的变量
    // 也就是说中央变量只有一个,但是每个线程都有一个中央变量的独立拷贝,每个线程只访问修改自己独立拷贝的变量
    private static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
    public static String DB_MASTER = "master";
    public static String DB_SLAVE = "slave";

    public static String getDbType(){
        String db = contextHolder.get();
        if(db == null){
            db = DB_MASTER;
        }
        return db;
    }

    /**
     * 设置线程的dbType
     * @param str
     */
    public static void setDbType(String str){
        logger.debug("所使用的数据源为:" + str);
        contextHolder.set(str);
    }

    /**
     * 清理连接类型
     */
    public static void clearDbType(){
        contextHolder.remove();
    }
}

b.DynamicDataSourceInterceptor类

通过实现Mybatis提供的Interceptor接口来拦截Mybatis执行器Executor,intercept方法定义织入的规则,通过invocation.getArgs()来获取对应线程执行的数据库操作是读操作还是写操作,然后向DynamicDataSourceHolder中的ThreadLocal放入对应线程选择的key,是选择master还是slave

读写分离逻辑

这里首先先判断是否为事务操作

  • 如果是事务操作,会包含一系列增删改查的操作,所以使用master数据库
  • 如果不是事务操作,然后再看对应增删改查是读取还是写入操作,从而实现读写分离的逻辑
    package com.imooc.o2o.dao.split;
    

import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.Locale;
import java.util.Properties;

/**

  • 拦截器拦截mybatis传递进来的SQL信息,根据SQL信息来使用相应的读写分离的数据源

  • /
    // mybatis会把增删改的方法封装在update这个method中
    @Intercepts({@Signature(type=Executor.class, method=”update”, args={MappedStatement.class, Object.class}),
    @Signature(type=Executor.class, method=”query”, args={MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})})
    public class DynamicDataSourceInterceptor implements Interceptor {
    private static Logger logger = LoggerFactory.getLogger(DynamicDataSourceInterceptor.class);
    private static final String REGEX = “.insert\u0020.|.update\u0020.|.delete\u0020.“;

    /**

    • 织入的规则

    • @param invocation

    • @return

    • @throws Throwable

    • /
      @Override
      public Object intercept(Invocation invocation) throws Throwable {
      // 利用spring提供的,判断当前是不是事务已表明是不是数据库操作
      boolean synchronizationActive = TransactionSynchronizationManager.isActualTransactionActive();
      Object[] objects = invocation.getArgs();
      MappedStatement ms = (MappedStatement)objects[0]; // 参数第一个是解释什么数据库操作
      String lookupKey = DynamicDataSourceHolder.DB_MASTER;
      if(synchronizationActive != true){

        // 读方法,
        if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)){
            // selectKey 为自增id查询主键(SELECT LAST_INSERT_ID())方法,使用主库,通常使用自增主键插入数据库操作
            if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)){
                lookupKey = DynamicDataSourceHolder.DB_MASTER;
            }else{
                BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
                String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", "");
                if(sql.matches(REGEX)){
                    lookupKey = DynamicDataSourceHolder.DB_MASTER;
                }else{
                    lookupKey = DynamicDataSourceHolder.DB_SLAVE;
                }
            }
        }
      

      }else{

        // 因为一般事务操作可能包含增删改查混合操作,所以用master
        lookupKey = DynamicDataSourceHolder.DB_MASTER;
      

      }
      logger.debug(“设置方法[{}] use[{}] Strategy,SqlCommandType[{}]…”, ms.getId(), lookupKey, ms.getSqlCommandType().name());
      DynamicDataSourceHolder.setDbType(lookupKey);
      return invocation.proceed();
      }

      /**

    • 返回封装好的代理对象,决定返回的是本体还是编织好的代理

    • @param target

    • @return

    • /
      @Override
      public Object plugin(Object target) {
      // 这里的Executor是mybatis支持一系列增删改查操作的执行器
      if(target instanceof Executor){

        // 如果是mybatis的Executor类型,那么就把编入逻辑织入,不是返回本体
        return Plugin.wrap(target, this);
      

      }
      return target;
      }

      /**

    • 非必要

    • @param properties

    • /
      @Override
      public void setProperties(Properties properties) {

      }
      }

      
      
      
      

c.DynamicDataSource类

继承AbstractRoutingDataSource,实现determineCurrentLookupKey方法,从池子中获取对应线程应该选择的数据源名称key,然后在targetDataSource中获取对应的数据源连接给线程(targetDataSource是个Map变量,在spring-dao.xml配置文件中配置)

package com.imooc.o2o.dao.split;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

/**
 * 从池子中取key,然后从HashMap<String,DataSource>中用key找到对应的DataSource
 */
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceHolder.getDbType();
    }
}

3.装配

  1. mybatis-config中装配拦截器
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <!-- 配置全局属性 -->
    <settings>
        <!-- 使用jdbc的getGeneratedKeys获取数据库自增主键值 -->
        <setting name="useGeneratedKeys" value="true"/>

        <!-- 使用列标签(数据库列名)替换列名(查询语句名字) 默认:true -->
        <setting name="useColumnLabel" value="true"/>

        <!-- 开启驼峰命名转换:Table{create_time} -> Entity{createTime} -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
    </settings>

    <!--自定义读写分离拦截器-->
    <plugins>
        <plugin interceptor="com.imooc.o2o.dao.split.DynamicDataSourceInterceptor"/>
    </plugins>
</configuration>
  1. spring-dao.xml配置路由数据源
  • 设置abstract为true的连接池,定义为abstract是为了后边继承通用的连接池属性
  • 定义好master和slave的数据源连接配置,指明parent为上面abstract的连接池
  • 将继承实现AbstractRoutingDataSourc    e的类,配置好Map变量targetDataSource
  • 定义为懒加载,因为只在SQL语句正式执行的时候才指定出来dataSource

jdbc.properties文件:

jdbc.master.driver=com.mysql.cj.jdbc.Driver
jdbc.master.url=jdbc:mysql://localhost:10001/o2o?useUnicode=true&characterEncoding=utf8
jdbc.master.username=root
jdbc.master.password=make1234
jdbc.slave.driver=com.mysql.cj.jdbc.Driver
jdbc.slave.url=jdbc:mysql://localhost:10002/o2o?useUnicode=true&characterEncoding=utf8
jdbc.slave.username=root
jdbc.slave.password=make1234

spring-dao.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:bean="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="com.imooc.o2o.dao"/>

    <!--配置数据库properties的数据-->
    <context:property-placeholder location="classpath:jdbc.properties"/>

    <!--1.数据库连接池,设置为abstract,便于后面继承属性配置主从库-->
    <bean id="abstractDataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
        abstract="true" destroy-method="close">
        <!--c3p0连接池的私有属性-->
        <property name="maxPoolSize" value="30"/>
        <property name="minPoolSize" value="10"/>
        <!--关闭连接后不自动commit-->
        <property name="autoCommitOnClose" value="false"/>
        <!--获取连接的超时时间-->
        <property name="checkoutTimeout" value="10000"/>
        <!--当获取连接失败重试次数-->
        <property name="acquireRetryAttempts" value="10"/>
    </bean>

    <!--2.主库配置-->
    <bean id="master" parent="abstractDataSource">
        <!--配置连接池属性-->
        <property name="driverClass" value="${jdbc.master.driver}"/>
        <property name="jdbcUrl" value="${jdbc.master.url}"/>
        <property name="user" value="${jdbc.master.username}"/>
        <property name="password" value="${jdbc.master.password}"/>
    </bean>

    <!--3.从库配置-->
    <bean id="slave" parent="abstractDataSource">
        <!--配置连接池属性-->
        <property name="driverClass" value="${jdbc.slave.driver}"/>
        <property name="jdbcUrl" value="${jdbc.slave.url}"/>
        <property name="user" value="${jdbc.slave.username}"/>
        <property name="password" value="${jdbc.slave.password}"/>
    </bean>

    <!--4.配置动态数据源,这儿的targetDataSource就是路由数据源所对应的名称-->
    <bean id="dynamicDataSource" class="com.imooc.o2o.dao.split.DynamicDataSource">
        <!--将数据源放入到map中,这里的key要与com.imooc.o2o.dao.split.DynamicDataSourceHolder中的key相同-->
        <property name="targetDataSources">
            <map>
                <entry value-ref="master" key="master"></entry>
                <entry value-ref="slave" key="slave"></entry>
            </map>
        </property>
    </bean>

    <!--5.懒加载,需要在SQL语句正式执行的时候才指定出来dataSource-->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy">
        <property name="targetDataSource">
            <ref bean="dynamicDataSource"></ref>
        </property>
    </bean>

    <!--配置SqlSessionFactory对象-->
    <!--MyBatis全局配置文件-->
    <!--扫描entity包,将取出结果包装成entity对象-->
    <!--扫描Sql配置文件-->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"
          p:dataSource-ref="dataSource"
          p:configLocation="classpath:mybatis-config.xml"
          p:typeAliasesPackage="com.imooc.o2o.entity"
          p:mapperLocations="classpath:mapper/*.xml"/>

    <!--配置扫描Dao接口包,动态实现Dao接口,注入到Spring容器中-->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <!--注入SqlSessionFactory-->
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
        <!--给出需要扫描Dao接口包-->
        <property name="basePackage" value="com.imooc.o2o.dao"/>
    </bean>
</beans>

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。