系列
开篇
梳理mybatis核心关键流程,打通datasource在mybatis的流程中的作用点。
- MapperMethod解析参数,参考mybatis 参数解析流程附相关案例。
- DefaultSqlSessionFactory#openSessionFromDataSource负责创建SqlSession对象。
- Executor负责创建StatementHandler和通过prepareStatement初始化Statement对象。
- SqlSession作为入口,内部通过Executor执行真正的SQL操作。
DefaultSqlSessionFactory
public class DefaultSqlSessionFactory implements SqlSessionFactory {
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
// 创建事务对象工厂SpringManagedTransactionFactory
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
// 创建事务对象SpringManagedTransaction
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
// 创建Executor对象
final Executor executor = configuration.newExecutor(tx, execType);
// 创建SqlSession对象
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
} finally {
}
}
}
- 通过openSessionFromDataSource返回SqlSession对象。
- SqlSession包含Executor,Executor包含tx,tx包含DataSource。
- mybatis的执行逻辑通过SqlSession调用Executor来进行执行。
Executor
public class Configuration {
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
}
- Configuration#newExecutor负责创建Executor对象。
DefaultSqlSession
public class DefaultSqlSession implements SqlSession {
private Configuration configuration;
private Executor executor;
private boolean autoCommit;
private boolean dirty;
public DefaultSqlSession(Configuration configuration, Executor executor, boolean autoCommit) {
this.configuration = configuration;
this.executor = executor;
this.dirty = false;
this.autoCommit = autoCommit;
}
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
} finally {
}
}
}
- DefaultSqlSession包含Executor对象,核心组件参考mybatis组件关系图
- SqlSession的执行逻辑通过executor来执行,针对列表参数通过wrapCollection进行包装。
SimpleExecutor
public class SimpleExecutor extends BaseExecutor {
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
// 创建StatementHandler对象
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
// 初始化Statement
stmt = prepareStatement(handler, ms.getStatementLog());
// 通过StatementHandler对象进行查询
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
}
- 创建StatementHandler对象,包括boundSql、parameterHandler、resultSetHandler。
- 执行Statement的初始化操作,执行从Datasource到Connection到Statement的初始化。
- 通过StatementHandler执行SQL操作。
StatementHandler
public class Configuration {
public StatementHandler newStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
StatementHandler statementHandler = new RoutingStatementHandler(executor, mappedStatement, parameterObject, rowBounds, resultHandler, boundSql);
statementHandler = (StatementHandler) interceptorChain.pluginAll(statementHandler);
return statementHandler;
}
}
public class RoutingStatementHandler implements StatementHandler {
public RoutingStatementHandler(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
switch (ms.getStatementType()) {
case STATEMENT:
delegate = new SimpleStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
case PREPARED:
delegate = new PreparedStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
case CALLABLE:
delegate = new CallableStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
default:
throw new ExecutorException("Unknown statement type: " + ms.getStatementType());
}
}
public class PreparedStatementHandler extends BaseStatementHandler {
public PreparedStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
super(executor, mappedStatement, parameter, rowBounds, resultHandler, boundSql);
}
}
public abstract class BaseStatementHandler implements StatementHandler {
protected final Configuration configuration;
protected final ObjectFactory objectFactory;
protected final TypeHandlerRegistry typeHandlerRegistry;
protected final ResultSetHandler resultSetHandler;
protected final ParameterHandler parameterHandler;
protected final Executor executor;
protected final MappedStatement mappedStatement;
protected final RowBounds rowBounds;
protected BoundSql boundSql;
protected BaseStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
this.configuration = mappedStatement.getConfiguration();
this.executor = executor;
this.mappedStatement = mappedStatement;
this.rowBounds = rowBounds;
this.typeHandlerRegistry = configuration.getTypeHandlerRegistry();
this.objectFactory = configuration.getObjectFactory();
if (boundSql == null) { // issue #435, get the key before calculating the statement
generateKeys(parameterObject);
boundSql = mappedStatement.getBoundSql(parameterObject);
}
this.boundSql = boundSql;
this.parameterHandler = configuration.newParameterHandler(mappedStatement, parameterObject, boundSql);
this.resultSetHandler = configuration.newResultSetHandler(executor, mappedStatement, rowBounds, parameterHandler, resultHandler, boundSql);
}
}
- StatementHandler的创建过程中负责生成boundSql、parameterHandler、resultSetHandler。
Statement
public class SimpleExecutor extends BaseExecutor {
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
// 1、获取连接
Connection connection = getConnection(statementLog);
// 2、获取会话Statement对象
stmt = handler.prepare(connection);
// 3、初始化参数格式
handler.parameterize(stmt);
return stmt;
}
}
public abstract class BaseExecutor implements Executor {
protected Connection getConnection(Log statementLog) throws SQLException {
// 1、从SpringManagedTransaction获取连接
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
}
public class SpringManagedTransaction implements Transaction {
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public SpringManagedTransaction(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
}
<br>
##PreparedStatementHandler
public abstract class DataSourceUtils {
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
// 从dataSource获取链接
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
catch (RuntimeException ex) {
releaseConnection(con, dataSource);
throw ex;
}
}
return con;
}
}
- DataSourceUtils#doGetConnection从Datasource获取Connection对象。
PreparedStatementHandler
public abstract class BaseStatementHandler implements StatementHandler {
@Override
public Statement prepare(Connection connection) throws SQLException {
ErrorContext.instance().sql(boundSql.getSql());
Statement statement = null;
try {
statement = instantiateStatement(connection);
setStatementTimeout(statement);
setFetchSize(statement);
return statement;
} catch (SQLException e) {
} catch (Exception e) {
}
}
}
public class PreparedStatementHandler extends BaseStatementHandler {
@Override
protected Statement instantiateStatement(Connection connection) throws SQLException {
String sql = boundSql.getSql();
if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
String[] keyColumnNames = mappedStatement.getKeyColumns();
if (keyColumnNames == null) {
return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
} else {
return connection.prepareStatement(sql, keyColumnNames);
}
} else if (mappedStatement.getResultSetType() != null) {
return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);
} else {
// 执行connection的prepareStatement操作
return connection.prepareStatement(sql);
}
}
}
- instantiateStatement执行connection的prepareStatement操作。
结论
- 分库分布核心思想在于自定义Datasource并且重新getConnection返回动态代理的Connection对象。
- Connection对象使用动态代理生成的对象在于可以自定义其他的操作。
- 定义以StatementHandler的prepare方法作为切入点的插件,因为Mysql查询过程中的statement是在StatementHandler#prepare中生成。