Spring + Mybatis 读写分离实现(基于SqlSession实现)

在某一次断点中发现SqlSession中有区分sql是属于写操作还是读操作,因此我打算通过自定义SqlSession来实现读写分离。

简单介绍

  1. 需要实现SqlSessionFactoryBuilder,可以通过这个工厂构建器写入对应的数据源.。
  2. 实现SqlSessionFactory,用于创建自定义SqlSession
  3. 实现Executor,用于按需打开对应的数据库连接
  4. 实现SqlSession,用于切换读写数据源

写数据源对应方法:
insert、delete、update

读数据源对应方法:
selectOne、selectList、selectMap、selectCursor、select

使用SqlSession读写分离配置

@Bean(name = "writeDruidDataSource", destroyMethod = "close")
public DruidDataSource writeDruidDataSource() throws SQLException {
    // 写数据源配置
    
    return dataSource;
}

@Bean(name = "readDruidDataSource", destroyMethod = "close")
public DruidDataSource readDruidDataSource() throws SQLException {
    // 读数据源配置

    return dataSource;
}

@Bean("sqlSessionFactoryBuilder")
public VirSqlSessionFactoryBuilder sqlSessionFactoryBuilder() throws SQLException {
    VirSqlSessionFactoryBuilder sessionFactoryBuilder = new VirSqlSessionFactoryBuilder();
    sessionFactoryBuilder.setWriteDataSource(writeDruidDataSource());
    sessionFactoryBuilder.setReadDataSource(readDruidDataSource());

    return sessionFactoryBuilder;
}

/**
 * MyBatis配置 :配置sqlSessionFactory
 * 
 * @param  sqlSessionFactoryBuilder
 * @return
 * @throws Exception
 */
@Bean("sqlSessionFactoryBean")
public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("writeDruidDataSource") DruidDataSource druidDataSource) throws Exception {
    // 分页拦截器
    PageInterceptor interceptor = new PageInterceptor();
    Properties properties = new Properties();
    properties.put("helperDialect", "mysql");

    interceptor.setProperties(properties);

    // 创建SqlSession工厂
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setMapperLocations(ResourceUtil.getResources("classpath:mysql/**/*.xml"));
    sqlSessionFactoryBean.setSqlSessionFactoryBuilder(sqlSessionFactoryBuilder());
    sqlSessionFactoryBean.setPlugins(new Interceptor[] { interceptor });
    sqlSessionFactoryBean.setDataSource(druidDataSource);

    return sqlSessionFactoryBean;
}

Executor自定义的实现

package cn.virens.web.components.mybatis;

import java.sql.SQLException;
import java.util.List;

import javax.sql.DataSource;

import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.apache.ibatis.transaction.TransactionFactory;

/**
 * Executor 的实现类
 * 
 * @author virens
 */
public class VirExecutor implements Executor {
    private final TransactionFactory transactionFactory;
    private final TransactionIsolationLevel level;
    private final Configuration configuration;
    private final ExecutorType executorType;
    private final DataSource dataSource;

    private final boolean autoCommit;

    private Transaction transaction;
    private Executor executor;

    public VirExecutor(TransactionFactory transactionFactory, Configuration configuration, DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit, ExecutorType executorType) {
        this.transactionFactory = transactionFactory;
        this.configuration = configuration;
        this.executorType = executorType;
        this.autoCommit = autoCommit;
        this.dataSource = dataSource;
        this.level = level;
    }

    @Override
    public int update(MappedStatement ms, Object parameter) throws SQLException {
        return executor().update(ms, parameter);
    }

    @Override
    @SuppressWarnings("rawtypes")
    public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException {
        return executor().query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql);
    }

    @Override
    @SuppressWarnings("rawtypes")
    public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
        return executor().query(ms, parameter, rowBounds, resultHandler);
    }

    @Override
    public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
        return executor().queryCursor(ms, parameter, rowBounds);
    }

    @Override
    public List<BatchResult> flushStatements() throws SQLException {
        return executor().flushStatements();
    }

    @Override
    public void commit(boolean required) throws SQLException {
        executor().commit(required);
    }

    @Override
    public void rollback(boolean required) throws SQLException {
        executor().rollback(required);
    }

    @Override
    public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
        return executor().createCacheKey(ms, parameterObject, rowBounds, boundSql);
    }

    @Override
    public boolean isCached(MappedStatement ms, CacheKey key) {
        return executor().isCached(ms, key);
    }

    @Override
    public void clearLocalCache() {
        executor().clearLocalCache();
    }

    @Override
    public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
        executor().deferLoad(ms, resultObject, property, key, targetType);
    }

    @Override
    public Transaction getTransaction() {
        return executor().getTransaction();
    }

    @Override
    public void close(boolean forceRollback) {
        executor().close(forceRollback);
    }

    @Override
    public boolean isClosed() {
        return executor().isClosed();
    }

    @Override
    public void setExecutorWrapper(Executor executor) {
        executor().setExecutorWrapper(executor);
    }

    public void closeTransaction() {
        if (transaction == null) return;

        try {
            transaction.close();
        } catch (SQLException ignore) {
            // Intentionally ignore. Prefer previous error.
        }
    }

    private Executor executor() {
        if (transaction == null || executor == null) {
            this.transaction = transactionFactory.newTransaction(dataSource, level, autoCommit);
            this.executor = configuration.newExecutor(transaction, executorType);
        }

        return executor;
    }
}

SqlSession自定义的实现

package cn.virens.web.components.mybatis;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.defaults.DefaultSqlSession;

import cn.virens.exception.APIException;

/**
 * SqlSession 的实现类
 * 
 * @author virens
 */
public class VirSqlSession implements SqlSession {
    private final DefaultSqlSession writeSqlSession;
    private final DefaultSqlSession readSqlSession;

    public VirSqlSession(Configuration configuration, Executor readExecutor, Executor writeExecutor, boolean autoCommit) {
        this.writeSqlSession = new DefaultSqlSession(configuration, writeExecutor, autoCommit);
        this.readSqlSession = new DefaultSqlSession(configuration, readExecutor, autoCommit);
    }

    @Override
    public int insert(String statement) {
        return writeSqlSession.insert(statement);
    }

    @Override
    public int insert(String statement, Object parameter) {
        return writeSqlSession.insert(statement, parameter);
    }

    @Override
    public int delete(String statement) {
        return writeSqlSession.delete(statement);
    }

    @Override
    public int delete(String statement, Object parameter) {
        return writeSqlSession.delete(statement, parameter);
    }

    @Override
    public int update(String statement) {
        return writeSqlSession.update(statement);
    }

    @Override
    public int update(String statement, Object parameter) {
        return writeSqlSession.update(statement, parameter);
    }

    @Override
    public <T> T selectOne(String statement) {
        return readSqlSession.selectOne(statement);
    }

    @Override
    public <T> T selectOne(String statement, Object parameter) {
        return readSqlSession.selectOne(statement, parameter);
    }

    @Override
    public <E> List<E> selectList(String statement) {
        return readSqlSession.selectList(statement);
    }

    @Override
    public <E> List<E> selectList(String statement, Object parameter) {
        return readSqlSession.selectList(statement, parameter);
    }

    @Override
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        return readSqlSession.selectList(statement, parameter, rowBounds);
    }

    @Override
    public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
        return readSqlSession.selectMap(statement, mapKey);
    }

    @Override
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
        return readSqlSession.selectMap(statement, parameter, mapKey);
    }

    @Override
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
        return readSqlSession.selectMap(statement, parameter, mapKey, rowBounds);
    }

    @Override
    public <T> Cursor<T> selectCursor(String statement) {
        return readSqlSession.selectCursor(statement);
    }

    @Override
    public <T> Cursor<T> selectCursor(String statement, Object parameter) {
        return readSqlSession.selectCursor(statement, parameter);
    }

    @Override
    public <T> Cursor<T> selectCursor(String statement, Object parameter, RowBounds rowBounds) {
        return readSqlSession.selectCursor(statement, parameter, rowBounds);
    }

    @Override
    @SuppressWarnings("rawtypes")
    public void select(String statement, Object parameter, ResultHandler handler) {
        readSqlSession.select(statement, parameter, handler);
    }

    @Override
    @SuppressWarnings("rawtypes")
    public void select(String statement, ResultHandler handler) {
        readSqlSession.select(statement, handler);
    }

    @Override
    @SuppressWarnings("rawtypes")
    public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
        readSqlSession.select(statement, parameter, rowBounds, handler);
    }

    @Override
    public void close() {
        writeSqlSession.close();
        readSqlSession.close();
    }

    @Override
    public void commit() {
        this.commit(false);
    }

    @Override
    public void commit(boolean force) {
        writeSqlSession.commit(force);
        readSqlSession.commit(force);
    }

    @Override
    public void rollback() {
        this.rollback(false);
    }

    @Override
    public void rollback(boolean force) {
        writeSqlSession.rollback(force);
        readSqlSession.rollback(force);
    }

    @Override
    public void clearCache() {
        writeSqlSession.clearCache();
        readSqlSession.clearCache();
    }

    @Override
    public List<BatchResult> flushStatements() {
        List<BatchResult> answer = new ArrayList<>();
        answer.addAll(writeSqlSession.flushStatements());
        answer.addAll(readSqlSession.flushStatements());

        return answer;
    }

    @Override
    public Configuration getConfiguration() {
        return readSqlSession.getConfiguration();
    }

    @Override
    public <T> T getMapper(Class<T> type) {
        return readSqlSession.getMapper(type);
    }

    @Override
    public Connection getConnection() {
        throw new APIException("ERROR", "多数据源,禁止获取连接");
    }

}

SqlSessionFactory自定义的实现

package cn.virens.web.components.mybatis;

import java.sql.Connection;

import org.apache.ibatis.exceptions.ExceptionFactory;
import org.apache.ibatis.executor.ErrorContext;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.managed.ManagedTransactionFactory;

import cn.virens.exception.APIException;
    
/**
 * SqlSessionFactory 的实现类
 * 
 * @author virens
 */
public class VirSqlSessionFactory implements SqlSessionFactory {
    private final VirSqlSessionFactoryBuilder sessionFactory;
    private final Configuration configuration;

    public VirSqlSessionFactory(Configuration configuration, VirSqlSessionFactoryBuilder sessionFactory) {
        this.sessionFactory = sessionFactory;
        this.configuration = configuration;
    }

    @Override
    public SqlSession openSession() {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
    }

    @Override
    public SqlSession openSession(boolean autoCommit) {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, autoCommit);
    }

    @Override
    public SqlSession openSession(ExecutorType execType) {
        return openSessionFromDataSource(execType, null, false);
    }

    @Override
    public SqlSession openSession(TransactionIsolationLevel level) {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), level, false);
    }

    @Override
    public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) {
        return openSessionFromDataSource(execType, level, false);
    }

    @Override
    public SqlSession openSession(ExecutorType execType, boolean autoCommit) {
        return openSessionFromDataSource(execType, null, autoCommit);
    }

    @Override
    public SqlSession openSession(Connection connection) {
        return openSessionFromConnection(configuration.getDefaultExecutorType(), connection);
    }

    @Override
    public SqlSession openSession(ExecutorType execType, Connection connection) {
        return openSessionFromConnection(execType, connection);
    }

    @Override
    public Configuration getConfiguration() {
        return configuration;
    }

    private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        VirExecutor readExecutor = null;
        VirExecutor writeExecutor = null;

        try {
            final Environment environment = configuration.getEnvironment();
            final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);

            readExecutor = new VirExecutor(transactionFactory, configuration, sessionFactory.getReadDataSource(), level, autoCommit, execType);
            writeExecutor = new VirExecutor(transactionFactory, configuration, sessionFactory.getWriteDataSource(), level, autoCommit, execType);

            return new VirSqlSession(configuration, readExecutor, writeExecutor, autoCommit);
        } catch (Exception e) {
            writeExecutor.closeTransaction();
            readExecutor.closeTransaction();

            throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
            ErrorContext.instance().reset();
        }
    }

    private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
        throw new APIException("ERROR", "读写分离,无法判断类型,不支持根据连接获取");
    }

    private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) {
        if (environment != null && environment.getTransactionFactory() != null) {
            return environment.getTransactionFactory();
        } else {
            return new ManagedTransactionFactory();
        }
    }
}

SqlSessionFactoryBuilder自定义的实现

package cn.virens.web.components.mybatis;

import javax.sql.DataSource;

import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

/**
 * SqlSessionFactoryBuilder 的实现类
 * 
 * @author virens
 */
public class VirSqlSessionFactoryBuilder extends SqlSessionFactoryBuilder {
    private DataSource writeDataSource;
    private DataSource readDataSource;
    
    public DataSource getWriteDataSource() {
        return writeDataSource;
    }
    
    public void setWriteDataSource(DataSource writeDataSource) {
        this.writeDataSource = writeDataSource;
    }

    public DataSource getReadDataSource() {
        return readDataSource;
    }

    public void setReadDataSource(DataSource readdataSource) {
        this.readDataSource = readdataSource;
    }

    @Override
    public SqlSessionFactory build(Configuration config) {
        return new VirSqlSessionFactory(config, this);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容