目前ShardingSphere提供两种接入模式JDBC与Proxy(MySQL协议),sidecar还未实现,本篇介绍JDBC接入。
JDBC作为java访问数据库的一个接口规范,它定义了一套与数据库交互的方式,例如要通过SQL从MySQL数据中查询一些数据,JDBC方式的代码可能如下:
try (Connection conn = DriverManager.getConnection(
"jdbc:mysql://localhost/mydb",
"user",
"password")) {
try (PreparedStatement ps =
conn.prepareStatement("SELECT i.*, j.* FROM Omega i, Zappa j WHERE i.name = ? AND
j.num = ?")
) {
// In the SQL statement being prepared, each question mark is a placeholder
// that must be replaced with a value you provide through a "set" method invocation.
// The following two method calls replace the two placeholders; the first is
// replaced by a string value, and the second by an integer value.
ps.setString(1, "Poor Yorick");
ps.setInt(2, 8008);
// The ResultSet, rs, conveys the result of executing the SQL statement.
// Each time you call rs.next(), an internal row pointer, or cursor,
// is advanced to the next row of the result. The cursor initially is
// positioned before the first row.
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
int numColumns = rs.getMetaData().getColumnCount();
for (int i = 1; i <= numColumns; i++) {
System.out.println("COLUMN " + i + " = " + rs.getObject(i));
}
}
}
}
}
对于ShardingSphere来说,JDBC子项目的功能定位很明确,就是可以让使用者按照JDBC方式实现分库分表、读写分离、加解密等功能,在设计方面就是装饰器模式,在完成SQL解析、路由、改写等操作后,由内部、底层真正的JDBC资源(DataSource、Connection、Statement、ResultSet)来最终完成SQL的执行。
与引擎篇不一样,在JDBC篇,将按照类层次结构展开介绍,这样可以更好的从整体上理解设计。JDBC规范里最主要的几个接口:DataSource、Connection、Statement、PreparedStatement、ResultSet、DatabaseMetaData、ResultSetMetaData、ParameterMetaData。本文将按照这个顺序对ShardingSphere对这些接口的各主要实现类源码进行解读。
DataSource
从名字就可以到ShardingSphere根据不同的功能提供了对应的DataSource接口实现,按照此类图中从上往下看下。
首先是AbstractUnsupportedOperationDataSource类,它是ShardingSphere各DataSource实现类的基类,虽然实现了DataSource接口,但只实现了getLoginTimeout和 setLoginTimeout方法,实现中直接抛出了SQLFeatureNotSupportedException,这两方法在其子类各功能的DataSource实现类中都未进行重写,所以ShardingSphere明确了不支持这两个方法的调用。
JDBC是JAVA操作数据库的一套标准接口,但各接口的实现是由数据库驱动中负责实现的,而各家厂商也并非对接口中所有方法都完整的进行了支持,ShardingSphere也类似,对于一些不常用或者无法提供准确值的方法并未提供对应的实现,为了统一管理不支持的方法,所以在实现JDBC接口时,往往都设计了一个公有的父类:一个AbstractUnsupported*类,其中各方法实现就直接抛出SQLFeatureNotSupportedException,这样设计也是为了更方便对不支持的JDBC方法进行统一管理。
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource
/**
* Unsupported {@code Datasource} methods.
*/
public abstract class AbstractUnsupportedOperationDataSource extends WrapperAdapter implements DataSource {
@Override
public final int getLoginTimeout() throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported getLoginTimeout()");
}
@Override
public final void setLoginTimeout(final int seconds) throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported setLoginTimeout(int seconds)");
}
}
可以看到该类除了实现DataSource接口,同事还继承了WrapperAdapter类
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter
/**
* Adapter for {@code java.sql.Wrapper}.
*/
public abstract class WrapperAdapter implements Wrapper {
private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
@SuppressWarnings("unchecked")
@Override
public final <T> T unwrap(final Class<T> iface) throws SQLException {
if (isWrapperFor(iface)) {
return (T) this;
}
throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
}
@Override
public final boolean isWrapperFor(final Class<?> iface) {
return iface.isInstance(this);
}
/**
* record method invocation.
*
* @param targetClass target class
* @param methodName method name
* @param argumentTypes argument types
* @param arguments arguments
*/
@SneakyThrows
public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
}
/**
* Replay methods invocation.
*
* @param target target object
*/
public final void replayMethodsInvocation(final Object target) {
for (JdbcMethodInvocation each : jdbcMethodInvocations) {
each.invoke(target);
}
}
}
该类实现了Wrapper接口,其内部持有一个JdbcMethodInvocation的集合,它记录了当前JDBC资源的一些方法操作,recordMethodInvocation方法负责记录外围应用的一些设置方法的调用,例如setAutoCommit、setReadOnly、setFetchSize、setMaxFieldSize等,在外围程序调用ShardingConnection的setAutoCommit、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize时进行调用,replayMethodsInvocation完成在指定目标对象回放这些方法调用,会在底层真实JDBC类(DB driver、数据库连接池等)时进行重新调用。
WrapperAdapter是ShardingSphere各JDBC实现类的基础基类,所以无论是DataSource、Connection、Statement、PrepareStatement都具备了该回放能力。
Wrapper是JDBC中提供的一个接口也是一种设计模式,用于获取JDBC代理类包装的原始类,在JDK该类的源码有该接口的完整说明:
Interface for JDBC classes which provide the ability to retrieve the delegate instance when the instance in question is in fact a proxy class.
The wrapper pattern is employed by many JDBC driver implementations to provide extensions beyond
the traditional JDBC API that are specific to a data source. Developers may wish to gain access to
these resources that are wrapped (the delegates) as proxy class instances representing the
the actual resources. This interface describes a standard mechanism to access
these wrapped resources
represented by their proxy, to permit direct access to the resource delegates.
接下来看下AbstractDataSourceAdapter 类org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter
/**
* Adapter for {@code Datasource}.
*/
@Getter
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
private final Map<String, DataSource> dataSourceMap;
private final DatabaseType databaseType;
@Setter
private PrintWriter logWriter = new PrintWriter(System.out);
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
this.dataSourceMap = dataSourceMap;
databaseType = createDatabaseType();
}
…
@Override
public final Connection getConnection(final String username, final String password) throws SQLException {
return getConnection();
}
@Override
public final void close() throws Exception {
close(dataSourceMap.keySet());
}
…
protected abstract RuntimeContext getRuntimeContext();
}
可以看到,这个抽象的DataSource适配器类,其内部维护了一个真实DataSource(可以是数据库驱动提供的,也可以是第三方数据库连接)的map以及数据库类型,另外定义一个抽象方法生成对应的RuntimeContext。RuntimeContext作为各JDBC各资源对象间传递的一个上下文对象,其定义了对应规则、属性、数据库类型、执行引擎以及SQL解析引擎。org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext
/**
* Runtime context.
*
* @param <T> type of rule
*/
public interface RuntimeContext<T extends BaseRule> extends AutoCloseable {
/**
* Get rule.
*
* @return rule
*/
T getRule();
/**
* Get properties.
*
* @return properties
*/
ConfigurationProperties getProperties();
/**
* Get database type.
*
* @return database type
*/
DatabaseType getDatabaseType();
/**
* Get execute engine.
*
* @return execute engine
*/
ExecutorEngine getExecutorEngine();
/**
* Get SQL parser engine.
*
* @return SQL parser engine
*/
SQLParserEngine getSqlParserEngine();
}
不同的功能都有对应的RuntimeContext实现类,其的类层次图为:
首先看下AbstractRuntimeContext
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.AbstractRuntimeContext
/**
* Abstract runtime context.
*
* @param <T> type of rule
*/
@Getter
public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
private final T rule;
private final ConfigurationProperties properties;
private final DatabaseType databaseType;
private final ExecutorEngine executorEngine;
private final SQLParserEngine sqlParserEngine;
protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
this.rule = rule;
properties = new ConfigurationProperties(null == props ? new Properties() : props);
this.databaseType = databaseType;
executorEngine = new ExecutorEngine(properties.<Integer>getValue(ConfigurationPropertyKey.EXECUTOR_SIZE));
sqlParserEngine = SQLParserEngineFactory.getSQLParserEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
ConfigurationLogger.log(rule.getRuleConfiguration());
ConfigurationLogger.log(props);
}
protected abstract ShardingSphereMetaData getMetaData();
…
}
可以看到在构造函数中,对执行引擎、解析引擎等属性进行赋值与初始化,另外关键的是其定义了一个抽象方法getMetaData,其返回ShardingSphereMetaData。各功能实现的RuntimeContex实现类的主要逻辑都为围绕如果生成ShardingSphereMetaData对象。根据数据源是单个还是多个,又分别定义了两个抽象类SingleDataSourceRuntimeContext和MultipleDataSourcesRuntimeContext,其操作就是根据应用传入的DataSource的map,然后生成DataSourceMetas与SchemaMetaData对象从而构建ShardingSphereMetaData实例,其中后者是需要真正的功能RuntimeContext类这种实现,例如在数据分片执行上下文类中
/**
* Runtime context for sharding.
*/
@Getter
public final class ShardingRuntimeContext extends MultipleDataSourcesRuntimeContext<ShardingRule> {
private final CachedDatabaseMetaData cachedDatabaseMetaData;
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props, final DatabaseType databaseType) throws SQLException {
super(dataSourceMap, shardingRule, props, databaseType);
cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);// 创建缓存元数据
shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();// 创建事务管理器
shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
}
private CachedDatabaseMetaData createCachedDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) {
return new CachedDatabaseMetaData(connection.getMetaData());
}
}
@Override
protected SchemaMetaData loadSchemaMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
int maxConnectionsSizePerQuery = getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
boolean isCheckingMetaData = getProperties().<Boolean>getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED);
SchemaMetaData result = new ShardingMetaDataLoader(dataSourceMap, getRule(), maxConnectionsSizePerQuery, isCheckingMetaData).load(getDatabaseType());
result = SchemaMetaDataDecorator.decorate(result, getRule(), new ShardingTableMetaDataDecorator());
if (!getRule().getEncryptRule().getEncryptTableNames().isEmpty()) {
result = SchemaMetaDataDecorator.decorate(result, getRule().getEncryptRule(), new EncryptTableMetaDataDecorator());
}
return result;
}
..
}
ShardingSphereMetaData类中定义了关于数据库、表、索引的元数据,作为RuntimeContext的一部分,这些元数据在后续各引擎都会使用到。这些元数据类并不复杂,快速浏览下:
org.apache.shardingsphere.underlying.common.metadata.ShardingSphereMetaData
/**
* ShardingSphere meta data.
*/
@RequiredArgsConstructor
@Getter
public final class ShardingSphereMetaData {
private final DataSourceMetas dataSources;
private final SchemaMetaData schema;
}
org.apache.shardingsphere.underlying.common.metadata.datasource.DataSourceMetas
/**
* Data source metas.
*/
public final class DataSourceMetas {
private final Map<String, DataSourceMetaData> dataSourceMetaDataMap;
…
}
数据源元数据
org.apache.shardingsphere.spi.database.metadata.DataSourceMetaData
/**
* Data source meta data.
*/
public interface DataSourceMetaData {
/**
* Get host name.
*
* @return host name
*/
String getHostName();
/**
* Get port.
*
* @return port
*/
int getPort();
/**
* Get catalog.
*
* @return catalog
*/
String getCatalog();
/**
* Get schema.
*
* @return schema
*/
String getSchema();
}
Schema元数据org.apache.shardingsphere.sql.parser.binder.metadata.schema.SchemaMetaData
/**
* Schema meta data.
*/
public final class SchemaMetaData {
private final Map<String, TableMetaData> tables;
…
}
表元数据org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData
/**
* Table meta data.
*/
@Getter
@EqualsAndHashCode
@ToString
public final class TableMetaData {
private final Map<String, ColumnMetaData> columns;
private final Map<String, IndexMetaData> indexes;
…
}
列元数据org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaData
/**
* Column meta data.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class ColumnMetaData {
private final String name;
private final int dataType;
private final String dataTypeName;
private final boolean primaryKey;
private final boolean generated;
private final boolean caseSensitive;
}
索引元数据org.apache.shardingsphere.sql.parser.binder.metadata.index.IndexMetaData
/**
* Index meta data.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class IndexMetaData {
private final String name;
}
这些元数据类的加载分别有对应的加载类SchemaMetaDataLoader、TableMetaDataLoader、ColumnMetaDataLoader、IndexMetaDataLoader,其核心逻辑都是通过获取数据库连接Connection实例,然后通过其getMetaData()获得DatabaseMetaData实例,然后调用getSchemas、getTables、getColums等方法拿到对应的表与列信息。例如ColumnMetaDataLoader。
org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaDataLoader
/**
* Column meta data loader.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ColumnMetaDataLoader {
private static final String COLUMN_NAME = "COLUMN_NAME";
private static final String DATA_TYPE = "DATA_TYPE";
private static final String TYPE_NAME = "TYPE_NAME";
/**
* Load column meta data list.
*
* @param connection connection
* @param table table name
* @param databaseType database type
* @return column meta data list
* @throws SQLException SQL exception
*/
public static Collection<ColumnMetaData> load(final Connection connection, final String table, final String databaseType) throws SQLException {
if (!isTableExist(connection, connection.getCatalog(), table, databaseType)) {
return Collections.emptyList();
}
Collection<ColumnMetaData> result = new LinkedList<>();
Collection<String> primaryKeys = loadPrimaryKeys(connection, table, databaseType);
List<String> columnNames = new ArrayList<>();
List<Integer> columnTypes = new ArrayList<>();
List<String> columnTypeNames = new ArrayList<>();
List<Boolean> isPrimaryKeys = new ArrayList<>();
List<Boolean> isCaseSensitives = new ArrayList<>();
try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), JdbcUtil.getSchema(connection, databaseType), table, "%")) {
while (resultSet.next()) {
String columnName = resultSet.getString(COLUMN_NAME);
columnTypes.add(resultSet.getInt(DATA_TYPE));
columnTypeNames.add(resultSet.getString(TYPE_NAME));
isPrimaryKeys.add(primaryKeys.contains(columnName));
columnNames.add(columnName);
}
}
try (ResultSet resultSet = connection.createStatement().executeQuery(generateEmptyResultSQL(table, databaseType))) {
for (String each : columnNames) {
isCaseSensitives.add(resultSet.getMetaData().isCaseSensitive(resultSet.findColumn(each)));
}
}
for (int i = 0; i < columnNames.size(); i++) {
// TODO load auto generated from database meta data
result.add(new ColumnMetaData(columnNames.get(i), columnTypes.get(i), columnTypeNames.get(i), isPrimaryKeys.get(i), false, isCaseSensitives.get(i)));
}
return result;
}
…
}
回到功能对应的DataSource最终实现类,例如数据分片ShardingDataSource类
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource
/**
* Sharding data source.
*/
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
NewInstanceServiceLoader.register(ResultProcessEngine.class);
}
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}
private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
for (DataSource each : dataSourceMap.values()) {
Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
}
}
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}
…
}
可以看到,在static代码中注册了该功能需要的路由装饰器、SQL重写上下文装饰器以及结果处理引擎,该类实现了getConnection()方法。
Connection
在ShardingSphere中Connection的实现类层次图:
与DataSource类似,ShardingSphere在Connection接口的实现中,也是先定义了一个AbstractUnsupportedOperationConnection类,对于主从和数据分片,又定义了一个抽象Connection适配器类。
/**
* Adapter for {@code Connection}.
*/
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
@Getter
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
@Getter
private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
private boolean autoCommit = true;
private boolean readOnly;
private volatile boolean closed;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
protected AbstractConnectionAdapter() {
rootInvokeHook.start();
}
/**
* Get database connection.
*
* @param dataSourceName data source name
* @return database connection
* @throws SQLException SQL exception
*/
public final Connection getConnection(final String dataSourceName) throws SQLException {
return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
}
/**
* Get database connections.
*
* @param connectionMode connection mode
* @param dataSourceName data source name
* @param connectionSize size of connection list to be get
* @return database connections
* @throws SQLException SQL exception
*/
public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
DataSource dataSource = getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
connections = cachedConnections.get(dataSourceName);
}
List<Connection> result;
if (connections.size() >= connectionSize) {
result = new ArrayList<>(connections).subList(0, connectionSize);
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
}
return result;
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
if (1 == connectionSize) {
Connection connection = createConnection(dataSourceName, dataSource);
replayMethodsInvocation(connection);
return Collections.singletonList(connection);
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
synchronized (dataSource) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
}
…
}
可以看到该类内部持有一个Multimap<String, Connection> cachedConnections,该类为guava中支持集合value的一个map,类似Map<String,List<Connection>>,ShardingSphere选择此类型Map,是因为同一数据源可能生成多个数据库连接。
在getConnections方法中,根据cachedConnections已有的连接和目标连接数,如果小于目标连接数,则创建相差的连接。
值得注意的cachedConnections是否有必要加同步锁?
一般而言,各数据库驱动提供的Connection、Statement并不是线程安全的,也就是说并不支持并发操作。通过查看该方法的调用链ShardingPreparedStatement.executeQuery-> PreparedStatementExecutor.init-> PreparedStatementExecutor .obtainExecuteGroups-> AbstractConnectionAdapter.getConnection,可以看到如果并发执行ShardingPreparedStatement.executeQuery方法就会产生对cachedConnections的操作,这里对cachedConnections加synchronized,应该也是为了保证线程安全性,虽然应用这种用法其实并不正确。
这个类还有好多属性设置方法,例如setReadOnly方法,其中recordMethodInvocation方法是为了后续新建的连接都能通过回放已设置的属性,而forceExecuteTemplate.execute则将已创建的连接分别设置该属性值。
public final void setReadOnly(final boolean readOnly) throws SQLException {
this.readOnly = readOnly;
recordMethodInvocation(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
}
看完Connection的抽象适配器类后,看下其功能实现类,数据分片ShardingConnection,主从MasterSlaveConnection
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection
/**
* Connection that support sharding.
*/
@Getter
public final class ShardingConnection extends AbstractConnectionAdapter {
private final Map<String, DataSource> dataSourceMap;
private final ShardingRuntimeContext runtimeContext;
private final TransactionType transactionType;
private final ShardingTransactionManager shardingTransactionManager;
public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
this.dataSourceMap = dataSourceMap;
this.runtimeContext = runtimeContext;
this.transactionType = transactionType;
shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
}
/**
* Whether hold transaction or not.
*
* @return true or false
*/
public boolean isHoldTransaction() {
return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
}
@Override
protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
}
private boolean isInShardingTransaction() {
return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
}
@Override
public DatabaseMetaData getMetaData() {
return new ShardingDatabaseMetaData(this);
}
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingPreparedStatement(this, sql);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}
...
@Override
public Statement createStatement() {
return new ShardingStatement(this);
}
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
return new ShardingStatement(this, resultSetType, resultSetConcurrency);
}
…
}
可以看到prepareStatement方法返回的是ShardingPreparedStatement实例,createStatement方法返回的是ShardingStatement。
另外createConnection方法(AbstractConnectionAdapter类会调用该方法创建Connection)中,判断是否为分片事务,如果是则通过分片事务管理器获取连接。
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection
/**
* Connection that support master-slave.
*/
@RequiredArgsConstructor
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter {
private final Map<String, DataSource> dataSourceMap;
private final MasterSlaveRuntimeContext runtimeContext;
@Override
protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
@Override
public DatabaseMetaData getMetaData() {
return new MasterSlaveDatabaseMetaData(this);
}
@Override
public Statement createStatement() {
return new MasterSlaveStatement(this);
}
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
}
…
}
相比ShardingConnection,MasterSlaveConnection类则简单一些,因为不涉及分片事务,可以看到创建的Statement分别为MasterSlavePreparedStatement和MasterSlaveStatement。
Statement
接下来我们看下ShardingSphere中Statement的各个实现类,类图如下:
与DataSource与Connection接口实现类一样,分别定义了两个抽象不支持类AbstractUnsupportedOperationStatement和AbstractUnsupportedOperationPreparedStatement,分别对Statement和PreparedStatement接口中不支持的方法进行默认实现(直接抛出SQLFeatureNotSupportedException异常)。
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement
/**
* Unsupported {@code Statement} METHODS.
*/
public abstract class AbstractUnsupportedOperationStatement extends WrapperAdapter implements Statement {
@Override
public final int getFetchDirection() throws SQLException {
throw new SQLFeatureNotSupportedException("getFetchDirection");
}
@Override
public final void setFetchDirection(final int direction) throws SQLException {
throw new SQLFeatureNotSupportedException("setFetchDirection");
}
@Override
public final void addBatch(final String sql) throws SQLException {
throw new SQLFeatureNotSupportedException("addBatch sql");
}
@Override
public void clearBatch() throws SQLException {
throw new SQLFeatureNotSupportedException("clearBatch");
}
…
}
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement
**
* Unsupported {@code PreparedStatement} methods.
*/
public abstract class AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter implements PreparedStatement {
public AbstractUnsupportedOperationPreparedStatement() {
super(PreparedStatement.class);
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
throw new SQLFeatureNotSupportedException("getMetaData");
}
/**
* Get parameter meta data.
*
* @return parameter metadata
* @throws SQLException SQL exception
*/
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new SQLFeatureNotSupportedException("ParameterMetaData");
}
@Override
public final void setNString(final int parameterIndex, final String x) throws SQLException {
throw new SQLFeatureNotSupportedException("setNString");
}
…
}
这两个类作为抽象父类,只是提供了一个默认实现,在最终的【功能】实现类中,根据功能逻辑对部分方法进行了重写实现。
来看下Statement分支的另外一个抽象类
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter
/**
* Adapter for {@code Statement}.
*/
@RequiredArgsConstructor
public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {
private final Class<? extends Statement> targetClass;
private boolean closed;
private boolean poolable;
private int fetchSize;
private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
@SuppressWarnings("unchecked")
@Override
public final void close() throws SQLException {
closed = true;
try {
forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
} finally {
getRoutedStatements().clear();
}
}
@Override
public final boolean isClosed() {
return closed;
}
…
@Override
public final int getFetchSize() {
return fetchSize;
}
@SuppressWarnings("unchecked")
@Override
public final void setFetchSize(final int rows) throws SQLException {
this.fetchSize = rows;
recordMethodInvocation(targetClass, "setFetchSize", new Class[] {int.class}, new Object[] {rows});
forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
}
…
@Override
public final int getUpdateCount() throws SQLException {
if (isAccumulate()) {
return accumulate();
} else {
Collection<? extends Statement> statements = getRoutedStatements();
if (statements.isEmpty()) {
return -1;
}
return getRoutedStatements().iterator().next().getUpdateCount();
}
}
private int accumulate() throws SQLException {
long result = 0;
boolean hasResult = false;
for (Statement each : getRoutedStatements()) {
int updateCount = each.getUpdateCount();
if (updateCount > -1) {
hasResult = true;
}
result += updateCount;
}
if (result > Integer.MAX_VALUE) {
result = Integer.MAX_VALUE;
}
return hasResult ? Long.valueOf(result).intValue() : -1;
}
…
protected abstract boolean isAccumulate();
protected abstract Collection<? extends Statement> getRoutedStatements();
}
从代码中可以看到该类实现了close、poolable、fetchSize、maxFieldSize、maxRows、queryTimeout的getter和setter方法,其实现逻辑是类似的,即依次设置其底层对应的Statement的对应方法,这里依然使用了recordMethodInvocation和forceExecuteTemplate.execute方法。
该类中定义了一个抽象方法getRoutedStatements(),该方法返回路由后对应的真正底层的Statement实现类,不同功能中该方法实现不同,例如数据分片ShardingPreparedStatement中就是路由的Statement,可能有多个,而主从、加密、影子表中该方法则只有一个。
该类中另外一个抽象方法是isAccumulate(),这个方法判断是否需要进行累计,在getUpdateCount()方法中,会首先判断isAccumulate()的值,如果是true,则需要将getRoutedStatements()返回的各Statement的getUpdateCount()值进行累计。此方法的子类实现中,数据分片中如果并非全部是广播表时,此方法返回即为true,主从、加密、影子表
此方法返回则都为false。
在ShardingPreparedStatement和ShardingStatement类中
@Override
public boolean isAccumulate() {
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
}
接下来我们看下看下数据分片对应的Statement实现类
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement
**
* Statement that support sharding.
*/
public final class ShardingStatement extends AbstractStatementAdapter {
@Getter
private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
private boolean returnGeneratedKeys;
private ExecutionContext executionContext;
private ResultSet currentResultSet;
public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
}
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
ResultSet result;
try {
executionContext = prepare(sql);
List<QueryResult> queryResults = statementExecutor.executeQuery();
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
executionContext = prepare(sql);
return statementExecutor.executeUpdate();
} finally {
currentResultSet = null;
}
}
…
private ExecutionContext prepare(final String sql) throws SQLException {
statementExecutor.clear();
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
BasePrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
ExecutionContext result = prepareEngine.prepare(sql, Collections.emptyList());
statementExecutor.init(result);
statementExecutor.getStatements().forEach(this::replayMethodsInvocation);
return result;
}
private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
}
…
}
在该类的SQL执行方法executeQuery、executeUpdate、execute方法中可以看到,其内部先执行了prepare方法,然后调用了StatementExecutor的executeQuery、executeUpdate、execute方法。
在prepare方法中,首先通过SimpleQueryPrepareEngine.prepare方法完成SQL的解析、路由计算、SQL改写操作(这里已经属于内核引擎部分,在总览篇中已介绍,本篇就不重复展开),然后对StatementExecutor进行初始化以及底层Statement的方法回放(例如setFetchSize、setQueryTimeout)。
真正的SQL执行其实是由StatementExecutor完成的,看下StatementExecutor的类层次图:
org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor
/**
* Abstract statement executor.
*/
@Getter
public abstract class AbstractStatementExecutor {
private final DatabaseType databaseType;
private final int resultSetType;
private final int resultSetConcurrency;
private final int resultSetHoldability;
private final ShardingConnection connection;
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
private final SQLExecuteTemplate sqlExecuteTemplate;
private final Collection<Connection> connections = new LinkedList<>();
private final List<List<Object>> parameterSets = new LinkedList<>();
private final List<Statement> statements = new LinkedList<>();
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
private final Collection<InputGroup<StatementExecuteUnit>> inputGroups = new LinkedList<>();
@Setter
private SQLStatementContext sqlStatementContext;
public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) {
this.databaseType = shardingConnection.getRuntimeContext().getDatabaseType();
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
this.connection = shardingConnection;
int maxConnectionsSizePerQuery = connection.getRuntimeContext().getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
ExecutorEngine executorEngine = connection.getRuntimeContext().getExecutorEngine();
sqlExecutePrepareTemplate = new SQLExecutePrepareTemplate(maxConnectionsSizePerQuery);
sqlExecuteTemplate = new SQLExecuteTemplate(executorEngine, connection.isHoldTransaction());
}
…
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);// 对输入的分组对象,执行指定的回调操作
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);// 刷新元数据, 因为DDL会修改元数据信息
return result;
}
…
private void refreshMetaDataIfNeeded(final ShardingRuntimeContext runtimeContext, final SQLStatementContext sqlStatementContext) throws SQLException {
if (null == sqlStatementContext) {
return;
}
if (sqlStatementContext instanceof CreateTableStatementContext) {
refreshTableMetaData(runtimeContext, ((CreateTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof AlterTableStatementContext) {
refreshTableMetaData(runtimeContext, ((AlterTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof DropTableStatementContext) {
refreshTableMetaData(runtimeContext, ((DropTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof CreateIndexStatementContext) {
refreshTableMetaData(runtimeContext, ((CreateIndexStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof DropIndexStatementContext) {
refreshTableMetaData(runtimeContext, ((DropIndexStatementContext) sqlStatementContext).getSqlStatement());
}
}
private void refreshTableMetaData(final ShardingRuntimeContext runtimeContext, final CreateTableStatement createTableStatement) throws SQLException {
String tableName = createTableStatement.getTable().getTableName().getIdentifier().getValue();
runtimeContext.getMetaData().getSchema().put(tableName, loadTableMeta(tableName, databaseType));
}
….
}
AbstractStatementExecutor是各类StatementExecutor的抽象类,它包含了Statement执行的各种信息,包括resultSetType、resultSetConcurrency、resultSetHoldability,另外也持有ShardingSphere的Connection实例、路由后的真实Statement列表,参数集合、执行后的结果集、执行分组信息、以及执行准备引擎、执行引擎。
在executeCallback 方法中,可以看到其内部是通过SQLExecuteTemplate来真正完成SQL的执行,SQLExecuteTemplate是执行引擎的关键类,已在执行引擎篇进行了分析,这里就不展开;之后调用了refreshMetaDataIfNeeded方法,对元数据进行了刷新,因为当执行DDL语句时,表名、列名、字段类型、主键以及索引等信息可能发生了变化,因此需要同步对元数据进行修改。这里的元数据指的就是RuntimeContext中的ShardingSphereMetaData属性,该类实例封装了ShardingSphere所需分库分表等各类元数据,在各环节、引擎之间传递,提供所需的各类基础信息。
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor
/**
* Prepared statement executor.
*/
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
@Getter
private final boolean returnGeneratedKeys;
public PreparedStatementExecutor(
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param executionContext execution context
* @throws SQLException SQL exception
*/
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));// 生成执行分组
cacheStatements();
}
private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
@Override
// 在指定数据源上创建要求数量的数据库连接
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
//根据执行单元信息 创建Statement执行单元对象
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
}
});
}
@SuppressWarnings("MagicConstant")
private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
: connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
// 在指定的Statement上执行SQL,将JDBC结果集包装成查询QueryResult对象(基于流模式、基于内存模式两类)
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);// 通过executeCallback操作
}
…
}
可以看到在最终的StatementExecutor实现类中,实现了executeQuery、executeUpdate、execute方法,这些方法中定义SQLExecuteCallback实例,然后由父类中executeCallback完成最后的SQL执行。
在该类中完成的关键操作是init方法,其生成了执行分组信息,同时调用父类中cacheStatements方法,将底层Statement和参数进行了记录。
回到Statement实现类,简单看下主从功能对应的
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement
/**
* Statement that support master-slave.
*/
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {
private final MasterSlaveConnection connection;
@Getter(AccessLevel.NONE)
private final DataNodeRouter dataNodeRouter;
private final int resultSetType;
private final int resultSetConcurrency;
private final int resultSetHoldability;
private final Collection<Statement> routedStatements = new LinkedList<>();
public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
dataNodeRouter = new DataNodeRouter(connection.getRuntimeContext().getMetaData(), connection.getRuntimeContext().getProperties(), connection.getRuntimeContext().getSqlParserEngine());
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
clearPrevious();
MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
SimpleQueryPrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
Collections.singletonList(runtimeContext.getRule()), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
ExecutionContext executionContext = prepareEngine.prepare(sql, Collections.emptyList());
ExecutionUnit executionUnit = executionContext.getExecutionUnits().iterator().next();
Preconditions.checkState(1 == executionContext.getExecutionUnits().size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(executionUnit.getDataSourceName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(executionUnit.getSqlUnit().getSql());
}
@Override
public int executeUpdate(final String sql) throws SQLException {
clearPrevious();
int result = 0;
MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
RouteContext routeContext = dataNodeRouter.route(sql, Collections.emptyList(), false);
routeContext = new MasterSlaveRouteDecorator().decorate(routeContext, runtimeContext.getMetaData(), runtimeContext.getRule(), runtimeContext.getProperties());
for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
Statement statement = connection.getConnection(each.getDataSourceMapper().getActualName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);
}
return result;
}
…
}
可以看到,与ShardingStatement类似,executeQuery方法中,也是通过SimpleQueryPrepareEngine进行prepare操作,不过与ShardingStatement中通过StatementExecutor创建Statement,执行SQL不同,这里是直接调用MasterSlaveConnection. getConnection拿到Connection,然后直接调用createStatement方法进行Statement,进而执行SQL的。
另外在其executeUpdate方法中,也不像ShardingStatement中统一都使用SimpleQueryPrepareEngine进行prepare操作,而是,直接就操作了dataNodeRouter. route方法,和MasterSlaveRouteDecorator. decorate方法,这种不一致显然不如ShardingStatement更加容易让人理解。
PreparedStatement
接下来看下PreparedStatement分支,首先看下其抽象类
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter
/**
* Sharding adapter for {@code PreparedStatement}.
*/
public abstract class AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement {
private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
@Getter
private final List<Object> parameters = new ArrayList<>();
@Override
public final void setNull(final int parameterIndex, final int sqlType) {
setParameter(parameterIndex, null);
}
@Override
public final void setNull(final int parameterIndex, final int sqlType, final String typeName) {
setParameter(parameterIndex, null);
}
…
private void setParameter(final int parameterIndex, final Object value) {
if (parameters.size() == parameterIndex - 1) {
parameters.add(value);
return;
}
for (int i = parameters.size(); i <= parameterIndex - 1; i++) {
parameters.add(null);
}
parameters.set(parameterIndex - 1, value);
}
protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
setParameterMethodInvocations.clear();
addParameters(parameters);
for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
each.invoke(preparedStatement);
}
}
private void addParameters(final List<Object> parameters) {
int i = 0;
for (Object each : parameters) {
setParameters(new Class[]{int.class, Object.class}, i++ + 1, each);
}
}
@SneakyThrows
private void setParameters(final Class[] argumentTypes, final Object... arguments) {
setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod("setObject", argumentTypes), arguments, arguments[1]));
}
@Override
public final void clearParameters() {
parameters.clear();
setParameterMethodInvocations.clear();
}
}
可以看到该类实现了PreparedStatement接口的各种参数设置方法,其parameters记录了设置的各参数,setParameterMethodInvocations属性记录了参数设置方法调用。
而在其子类中,org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement
/**
* PreparedStatement that support sharding.
*/
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
private final String sql;
@Getter
private final ParameterMetaData parameterMetaData;
private final BasePrepareEngine prepareEngine;
private final PreparedStatementExecutor preparedStatementExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
private ExecutionContext executionContext;
private ResultSet currentResultSet;
public ShardingPreparedStatement(final ShardingConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
}
public ShardingPreparedStatement(
final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false);
}
private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
prepareEngine = new PreparedQueryPrepareEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
}
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
@Override
public int executeUpdate() throws SQLException {
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
return preparedStatementExecutor.executeUpdate();
} finally {
clearBatch();
}
}
…
private void initPreparedStatementExecutor() throws SQLException {
preparedStatementExecutor.init(executionContext);
setParametersForStatements();
replayMethodForStatements();
}
private void setParametersForStatements() {
for (int i = 0; i < preparedStatementExecutor.getStatements().size(); i++) {
replaySetParameter((PreparedStatement) preparedStatementExecutor.getStatements().get(i), preparedStatementExecutor.getParameterSets().get(i));
}
}
private void replayMethodForStatements() {
for (Statement each : preparedStatementExecutor.getStatements()) {
replayMethodsInvocation(each);
}
}
…
}
可以看到,该类中实现了executeQuery、executeUpdate、execute方法,这几个方法的 内部又分别调用clearPrevious、prepare、 initPreparedStatementExecutor以及PreparedStatementExecutor中对应的execute*方法。
- clearPrevious方法即清空该PreparedStatement之前执行的参数、连接、结果集、分组信息以及Statement对象;
- prepare方法是通过调用PreparedQueryPrepareEngine#prepare,其内部实现逻辑在总览篇和引擎篇已进行了分析,这里不重复展开。
- initPreparedStatementExecutor方法则获取底层真实的PreparedStatement,然后设置对应的参数。
- PreparedStatementExecutor的execute*方法,前面已介绍。
除了参数设置,与ShardingStatement相比,ShardingPreparedStatement还支持批量操作,即JDBC中的addBatch、executeBatch方法。
看下ShardingPreparedStatement中这两批量操作方法的实现。
@Override
public void addBatch() {
try {
prepare();
batchPreparedStatementExecutor.addBatchForRouteUnits(executionContext);
} finally {
currentResultSet = null;
clearParameters();
}
}
@Override
public int[] executeBatch() throws SQLException {
try {
initBatchPreparedStatementExecutor();
return batchPreparedStatementExecutor.executeBatch();
} finally {
clearBatch();
}
}
private void initBatchPreparedStatementExecutor() throws SQLException {
batchPreparedStatementExecutor.init(executionContext.getSqlStatementContext());
setBatchParametersForStatements();
}
private void setBatchParametersForStatements() throws SQLException {
for (Statement each : batchPreparedStatementExecutor.getStatements()) {
List<List<Object>> parameterSet = batchPreparedStatementExecutor.getParameterSet(each);
for (List<Object> parameters : parameterSet) {
replaySetParameter((PreparedStatement) each, parameters);
((PreparedStatement) each).addBatch();
}
}
}
@Override
public void clearBatch() throws SQLException {
currentResultSet = null;
batchPreparedStatementExecutor.clear();
clearParameters();
}
可以看到,addBatch方法在调用prepare方法后,又调用了BatchPreparedStatementExecutor#addBatchForRouteUnits(executionContext),而executeBatch方法则调用了BatchPreparedStatementExecutor#init和executeBatch方法,setBatchParametersForStatements方法则获取到BatchPreparedStatementExecutor内部的Statement和参数集合后,调用底层PreparedStatement#addBatch方法,添加批量参数。接下来看下BatchPreparedStatementExecutor类。
org.apache.shardingsphere.shardingjdbc.executor.batch.BatchPreparedStatementExecutor
/**
* Prepared statement executor to process add batch.
*/
public final class BatchPreparedStatementExecutor extends AbstractStatementExecutor {
private final Collection<BatchRouteUnit> routeUnits = new LinkedList<>();
@Getter
private final boolean returnGeneratedKeys;
private int batchCount;
public BatchPreparedStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param sqlStatementContext SQL statement context
* @throws SQLException SQL exception
*/
public void init(final SQLStatementContext sqlStatementContext) throws SQLException {
setSqlStatementContext(sqlStatementContext);
getInputGroups().addAll(obtainExecuteGroups(routeUnits));
}
…
/**
* Add batch for route units.
*
* @param executionContext execution context
*/
public void addBatchForRouteUnits(final ExecutionContext executionContext) {
handleOldBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
handleNewBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
batchCount++;
}
private Collection<BatchRouteUnit> createBatchRouteUnits(final Collection<ExecutionUnit> executionUnits) {
Collection<BatchRouteUnit> result = new LinkedList<>();
for (ExecutionUnit each : executionUnits) {
result.add(new BatchRouteUnit(each));
}
return result;
}
private void handleOldBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
for (BatchRouteUnit each : newRouteUnits) {
for (BatchRouteUnit unit : routeUnits) {
if (unit.equals(each)) {
reviseBatchRouteUnit(unit, each);
}
}
}
}
private void reviseBatchRouteUnit(final BatchRouteUnit oldBatchRouteUnit, final BatchRouteUnit newBatchRouteUnit) {
oldBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters().addAll(newBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters());
oldBatchRouteUnit.mapAddBatchCount(batchCount);
}
private void handleNewBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
newRouteUnits.removeAll(routeUnits);
for (BatchRouteUnit each : newRouteUnits) {
each.mapAddBatchCount(batchCount);
}
routeUnits.addAll(newRouteUnits);
}
…
/**
* Execute batch.
*
* @return execute results
* @throws SQLException SQL exception
*/
public int[] executeBatch() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), isExceptionThrown) {
@Override
protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return statement.executeBatch();
}
};
List<int[]> results = executeCallback(callback);
if (isAccumulate()) {
return accumulate(results);
} else {
return results.get(0);
}
}
…
}
可以看到在addBatchForRouteUnits方法中,handleOldBatchRouteUnits和
handleNewBatchRouteUnits这两个方法其实就是按照路由单元进行合并批量执行的参数;而在init方法中则生成了执行分组信息;而executeBatch依然是通过父类中executeCallback方法按照分组,依次调用底层PreparedStatement的executeBatch进行SQL的执行。
ResultSet
ResultSet接口表示数据库结果集(通常由Statement执行完查询SQL后得到),它包含了一个类似表格的数据,内部维护了一个游标,访问完一条记录的各列后,可以通过next()方法将游标指向下一行数据,直到访问到最后一条数据为止。
同样,ResultSet的实现类中,也定义了几个抽象的UnSupported类,AbstractUnsupportedUpdateOperationResultSet实现了ResultSet中更新类的方法,目前在ShardingSphere中不支持ResultSet接口中update*方法,AbstractUnsupportedOperationResultSet实现了其它操作类方法,AbstractUnsupportedDatabaseMetaDataResultSet则定义了DatabaseMetaData不支持的方法(ShardingDatabaseMetaData中getSchemas、getColumns等方法的返回值),AbstractUnsupportedGeneratedKeysResultSet定义了GeneratedKeys不支持方法(Statement中getGeneratedKeys()返回值)。
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
/**
* Adapter for {@code ResultSet}.
*/
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
@Getter
private final List<ResultSet> resultSets;
@Getter
private final Statement statement;
private boolean closed;
private final ForceExecuteTemplate<ResultSet> forceExecuteTemplate = new ForceExecuteTemplate<>();
@Getter
private final ExecutionContext executionContext;
public AbstractResultSetAdapter(final List<ResultSet> resultSets, final Statement statement, final ExecutionContext executionContext) {
Preconditions.checkArgument(!resultSets.isEmpty());
this.resultSets = resultSets;
this.statement = statement;
this.executionContext = executionContext;
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
}
private ShardingRule getShardingRule() {
ShardingConnection connection = statement instanceof ShardingPreparedStatement ? ((ShardingPreparedStatement) statement).getConnection() : ((ShardingStatement) statement).getConnection();
return connection.getRuntimeContext().getRule();
}
@Override
public final int findColumn(final String columnLabel) throws SQLException {
return resultSets.get(0).findColumn(columnLabel);
}
…
}
可以看到大部分方法实现,是通过第一个底层ResultSet,调用其对应方法获得,这些属性的设置,也依次对各个底层ResultSet执行对应设置方法即可。
看下数据分片对应的ShardingResultSet类
org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet
/**
* Result that support sharding.
*/
public final class ShardingResultSet extends AbstractResultSetAdapter {
private final MergedResult mergeResultSet;
private final Map<String, Integer> columnLabelAndIndexMap;
public ShardingResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final ExecutionContext executionContext) throws SQLException {
super(resultSets, statement, executionContext);
this.mergeResultSet = mergeResultSet;
columnLabelAndIndexMap = createColumnLabelAndIndexMap(resultSets.get(0).getMetaData());
}
private Map<String, Integer> createColumnLabelAndIndexMap(final ResultSetMetaData resultSetMetaData) throws SQLException {
Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (int columnIndex = resultSetMetaData.getColumnCount(); columnIndex > 0; columnIndex--) {
result.put(resultSetMetaData.getColumnLabel(columnIndex), columnIndex);
}
return result;
}
@Override
public boolean next() throws SQLException {
return mergeResultSet.next();
}
@Override
public boolean wasNull() throws SQLException {
return mergeResultSet.wasNull();
}
@Override
public boolean getBoolean(final int columnIndex) throws SQLException {
return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
}
@Override
public boolean getBoolean(final String columnLabel) throws SQLException {
int columnIndex = columnLabelAndIndexMap.get(columnLabel);
return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
}
…
}
可以看到各方法调用的其实是MergedResult#getValue获取指定的列的值。
ResultSet作为JDBC的一个通用接口,不仅作为SQL查询返回的结果标识,还用在了很多多条记录表示的场景,例如在DatabaseMetaData、Statement的自增生成键结果等。
org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.DatabaseMetaDataResultSet
/**
* Database meta data result set.
*/
public final class DatabaseMetaDataResultSet<T extends BaseRule> extends AbstractUnsupportedDatabaseMetaDataResultSet {
private static final String TABLE_NAME = "TABLE_NAME";
private static final String INDEX_NAME = "INDEX_NAME";
private final int type;
private final int concurrency;
private final T rule;
private final ResultSetMetaData resultSetMetaData;
private final Map<String, Integer> columnLabelIndexMap;
private final Iterator<DatabaseMetaDataObject> databaseMetaDataObjectIterator;
private volatile boolean closed;
private DatabaseMetaDataObject currentDatabaseMetaDataObject;
public DatabaseMetaDataResultSet(final ResultSet resultSet, final T rule) throws SQLException {
this.type = resultSet.getType();
this.concurrency = resultSet.getConcurrency();
this.rule = rule;
this.resultSetMetaData = resultSet.getMetaData();
this.columnLabelIndexMap = initIndexMap();
this.databaseMetaDataObjectIterator = initIterator(resultSet);
}
private Map<String, Integer> initIndexMap() throws SQLException {
Map<String, Integer> result = new HashMap<>(resultSetMetaData.getColumnCount());
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
result.put(resultSetMetaData.getColumnLabel(i), i);
}
return result;
}
private Iterator<DatabaseMetaDataObject> initIterator(final ResultSet resultSet) throws SQLException {
LinkedList<DatabaseMetaDataObject> result = new LinkedList<>();
Set<DatabaseMetaDataObject> removeDuplicationSet = new HashSet<>();
int tableNameColumnIndex = columnLabelIndexMap.getOrDefault(TABLE_NAME, -1);
int indexNameColumnIndex = columnLabelIndexMap.getOrDefault(INDEX_NAME, -1);
while (resultSet.next()) {
DatabaseMetaDataObject databaseMetaDataObject = generateDatabaseMetaDataObject(tableNameColumnIndex, indexNameColumnIndex, resultSet);
if (!removeDuplicationSet.contains(databaseMetaDataObject)) {
result.add(databaseMetaDataObject);
removeDuplicationSet.add(databaseMetaDataObject);
}
}
return result.iterator();
}
private DatabaseMetaDataObject generateDatabaseMetaDataObject(final int tableNameColumnIndex, final int indexNameColumnIndex, final ResultSet resultSet) throws SQLException {
DatabaseMetaDataObject result = new DatabaseMetaDataObject(resultSetMetaData.getColumnCount());
for (int i = 1; i <= columnLabelIndexMap.size(); i++) {
if (tableNameColumnIndex == i) {
String tableName = resultSet.getString(i);
Collection<String> logicTableNames = rule instanceof ShardingRule ? ((ShardingRule) rule).getLogicTableNames(tableName) : Collections.emptyList();
result.addObject(logicTableNames.isEmpty() ? tableName : logicTableNames.iterator().next());
} else if (indexNameColumnIndex == i) {
String tableName = resultSet.getString(tableNameColumnIndex);
String indexName = resultSet.getString(i);
result.addObject(null != indexName && indexName.endsWith(tableName) ? indexName.substring(0, indexName.indexOf(tableName) - 1) : indexName);
} else {
result.addObject(resultSet.getObject(i));
}
}
return result;
}
@Override
public boolean next() throws SQLException {
checkClosed();
if (databaseMetaDataObjectIterator.hasNext()) {
currentDatabaseMetaDataObject = databaseMetaDataObjectIterator.next();
return true;
}
return false;
}
@Override
public void close() throws SQLException {
checkClosed();
closed = true;
}
@Override
public boolean wasNull() throws SQLException {
checkClosed();
return false;
}
@Override
public String getString(final int columnIndex) throws SQLException {
checkClosed();
checkColumnIndex(columnIndex);
return (String) ResultSetUtil.convertValue(currentDatabaseMetaDataObject.getObject(columnIndex), String.class);
}
…
}
这些ResultSet的实现类,其构造函数都会传入一个底层ResultSet或者迭代器Iterator,next()方法其实就是通过调用内部ResultSet或者Iterator的nex()完成游标的迁移,而get方法则直接调用 ResultSet或者Iterator中的值的get方法。
MetaData类接口
JDBC中有几个元数据接口DatabaseMetaData、ResultSetMetaData、ParameterMetaData,
分别表示数据库+驱动元数据(Catalog、Schema、Table、Column、Index等信息,通过Connection#getMetaData()方法获得)、结果集元数据(列的类型属性,通过ResultSet# getMetaData()方法获得)以及参数元数据(参数数量、类型以及属性,通过PreparedStatement# getParameterMetaData()获得)。
DatabaseMetaData
看下涉及到的几个类
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.AdaptedDatabaseMetaData
/**
* Adapted database meta data.
*/
@RequiredArgsConstructor
public abstract class AdaptedDatabaseMetaData extends WrapperAdapter implements DatabaseMetaData {
private final CachedDatabaseMetaData cachedDatabaseMetaData;
@Override
public final String getURL() {
return cachedDatabaseMetaData.getUrl();
}
@Override
public final String getUserName() {
return cachedDatabaseMetaData.getUserName();
}
@Override
public final String getDatabaseProductName() {
return cachedDatabaseMetaData.getDatabaseProductName();
}
看到此类对DatabaseMetaData方法进行了实现,都是直接调用CachedDatabaseMetaData实例对应方法直接返回。
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.MultipleDatabaseMetaData
/**
* Multiple database meta data.
*/
@Getter
public abstract class MultipleDatabaseMetaData<C extends AbstractConnectionAdapter> extends AdaptedDatabaseMetaData {
private final C connection;
private final Collection<String> datasourceNames;
private final ShardingSphereMetaData shardingSphereMetaData;
private String currentDataSourceName;
private DatabaseMetaData currentDatabaseMetaData;
public MultipleDatabaseMetaData(final C connection, final Collection<String> datasourceNames,
final CachedDatabaseMetaData cachedDatabaseMetaData, final ShardingSphereMetaData shardingSphereMetaData) {
super(cachedDatabaseMetaData);
this.connection = connection;
this.datasourceNames = datasourceNames;
this.shardingSphereMetaData = shardingSphereMetaData;
}
@Override
public final Connection getConnection() throws SQLException {
return connection.getConnection(getDataSourceName());
}
@Override
public final ResultSet getSuperTypes(final String catalog, final String schemaPattern, final String typeNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTypes(getActualCatalog(catalog), getActualSchema(schemaPattern), typeNamePattern));
}
@Override
public final ResultSet getSuperTables(final String catalog, final String schemaPattern, final String tableNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTables(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern)));
@Override
public final ResultSet getColumns(final String catalog, final String schemaPattern, final String tableNamePattern, final String columnNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(
getDatabaseMetaData().getColumns(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern), columnNamePattern));
}
…
protected abstract String getActualTableNamePattern(String tableNamePattern);
protected abstract String getActualTable(String table);
protected abstract ResultSet createDatabaseMetaDataResultSet(ResultSet resultSet) throws SQLException;
…
}
可以看到,MultipleDatabaseMetaData类中,对AdaptedDatabaseMetaData类中大多数方法进行了重写,通过调用子类实现的createDatabaseMetaDataResultSet方法获得真实的DatabaseMetaDataResultSet,同时通过子类实现getActualTableNamePattern、getActualTable方法获取到真实物理表名信息。
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.ShardingDatabaseMetaData
/**
* Sharding database meta data.
*/
public final class ShardingDatabaseMetaData extends MultipleDatabaseMetaData<ShardingConnection> {
private ShardingRule shardingRule;
public ShardingDatabaseMetaData(final ShardingConnection connection) {
super(connection, connection.getDataSourceMap().keySet(), connection.getRuntimeContext().getCachedDatabaseMetaData(), connection.getRuntimeContext().getMetaData());
shardingRule = connection.getRuntimeContext().getRule();
}
@Override
public String getActualTableNamePattern(final String tableNamePattern) {
if (null == tableNamePattern) {
return null;
}
return shardingRule.findTableRule(tableNamePattern).isPresent() ? "%" + tableNamePattern + "%" : tableNamePattern;
}
@Override
public String getActualTable(final String table) {
if (null == table) {
return null;
}
String result = table;
if (shardingRule.findTableRule(table).isPresent()) {
DataNode dataNode = shardingRule.getDataNode(table);
result = dataNode.getTableName();
}
return result;
}
@Override
protected ResultSet createDatabaseMetaDataResultSet(final ResultSet resultSet) throws SQLException {
return new DatabaseMetaDataResultSet<>(resultSet, shardingRule);
}
}
可以看到,在ShardingDatabaseMetaData类中,主要实现的getActualTable 、getActualTableNamePattern方法中就是根据ShardingRule查找到真实的表名、表名pattern以及创建DatabaseMetaDataResultSet实例。
ResultSetMetaData
*org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSetMetaData *
/**
* Sharding result set meta data.
*/
@RequiredArgsConstructor
public final class ShardingResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
private final ResultSetMetaData resultSetMetaData;
private final ShardingRule shardingRule;
private final SQLStatementContext sqlStatementContext;
@Override
public int getColumnCount() {
return sqlStatementContext instanceof SelectStatementContext ? ((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().size() : 0;
}
@Override
public boolean isAutoIncrement(final int column) throws SQLException {
return resultSetMetaData.isAutoIncrement(column);
}
…
@Override
public String getTableName(final int column) throws SQLException {
String actualTableName = resultSetMetaData.getTableName(column);
return shardingRule.getLogicTableNames(actualTableName).isEmpty() ? actualTableName : shardingRule.getLogicTableNames(actualTableName).iterator().next();
}
@Override
public String getCatalogName(final int column) {
return DefaultSchema.LOGIC_NAME;
}
…
}
这个类中实现比较简单,大多数方法逻辑就是调用构造函数传入的ResultSetMetaData对应方法,其它有一些则进行了物理表名与逻辑表名的转换和扩展projection的判断等。ResultSetMetaData的传入则是在AbstractResultSetAdapter中取的第一个内部底层ResultSet。
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
}
ParameterMetaData
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.metadata.ShardingParameterMetaData
/**
* Sharding parameter meta data.
*/
@RequiredArgsConstructor
public final class ShardingParameterMetaData extends AbstractUnsupportedOperationParameterMetaData {
private final SQLParserEngine sqlParserEngine;
private final String sql;
@Override
public int getParameterCount() {
return sqlParserEngine.parse(sql, true).getParameterCount();
}
}
可以看到,该类只实现了ParameterMetaData接口中getParameterCount,通过解析引擎获取到参数的数量。这个类实例会在ShardingPreparedStatement的构造函数中进行创建,以供getParameterMetaData()方法返回。
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement
/**
* PreparedStatement that support sharding.
*/
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
private final String sql;
@Getter
private final ParameterMetaData parameterMetaData;
…
private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
…
}
5.x中的变化
在5.x中,对JDBC的实现类做了进一步的合并与简化,并不是根据功能定义各JDBC实现类,而是统一成ShardingSphere*类,例如将4.1.1中ShardingDataSource、MasterSalveDataSource、ShadowDataSource、EncryptDataSource都统一成了ShardingSphereDataSource类;
ShardingConnection、MasterSlaveConnection、ShadowConnection、EncryptConnection都合并为ShardingSphereConnection类;
ShardingPreparedStatement、MasterSalvePreparedStatement、ShadowPreparedStatement等也类似,都合并成为ShardingSpherePreparedStatement类;
原来不同的功能逻辑都以装饰器方式,分散到了各个统一处理里环节(解析、路由、改写、执行、归并),由内核引擎负责触发执行,这种设计显然更加优雅,也降低了开发人员心智负担。
5.x中的DataSource类层次图
5.x中Connection的类层次图
5.x中Statement的类层次图
由于5.x还没有发布release版本,代码后续可能还会有大的变化,所以5.x的详细源码待release版本正式发布后再分析。