从源码看ShardingSphere设计-JDBC篇

目前ShardingSphere提供两种接入模式JDBC与Proxy(MySQL协议),sidecar还未实现,本篇介绍JDBC接入。

JDBC作为java访问数据库的一个接口规范,它定义了一套与数据库交互的方式,例如要通过SQL从MySQL数据中查询一些数据,JDBC方式的代码可能如下:

try (Connection conn = DriverManager.getConnection(
     "jdbc:mysql://localhost/mydb",
     "user",
     "password")) {
 try (PreparedStatement ps =
    conn.prepareStatement("SELECT i.*, j.* FROM Omega i, Zappa j WHERE i.name = ? AND 
 j.num = ?")
 ) {
    // In the SQL statement being prepared, each question mark is a placeholder
    // that must be replaced with a value you provide through a "set" method invocation.
    // The following two method calls replace the two placeholders; the first is
    // replaced by a string value, and the second by an integer value.
    ps.setString(1, "Poor Yorick");
    ps.setInt(2, 8008);

    // The ResultSet, rs, conveys the result of executing the SQL statement.
    // Each time you call rs.next(), an internal row pointer, or cursor,
    // is advanced to the next row of the result.  The cursor initially is
    // positioned before the first row.
    try (ResultSet rs = ps.executeQuery()) {
        while (rs.next()) {
            int numColumns = rs.getMetaData().getColumnCount();
            for (int i = 1; i <= numColumns; i++) {
                System.out.println("COLUMN " + i + " = " + rs.getObject(i));
            } 
        } 
    }
  }    
}  

对于ShardingSphere来说,JDBC子项目的功能定位很明确,就是可以让使用者按照JDBC方式实现分库分表、读写分离、加解密等功能,在设计方面就是装饰器模式,在完成SQL解析、路由、改写等操作后,由内部、底层真正的JDBC资源(DataSource、Connection、Statement、ResultSet)来最终完成SQL的执行。

与引擎篇不一样,在JDBC篇,将按照类层次结构展开介绍,这样可以更好的从整体上理解设计。JDBC规范里最主要的几个接口:DataSource、Connection、Statement、PreparedStatement、ResultSet、DatabaseMetaData、ResultSetMetaData、ParameterMetaData。本文将按照这个顺序对ShardingSphere对这些接口的各主要实现类源码进行解读。

DataSource

ShardingSphere中DataSource接口实现类图

从名字就可以到ShardingSphere根据不同的功能提供了对应的DataSource接口实现,按照此类图中从上往下看下。

首先是AbstractUnsupportedOperationDataSource类,它是ShardingSphere各DataSource实现类的基类,虽然实现了DataSource接口,但只实现了getLoginTimeout和 setLoginTimeout方法,实现中直接抛出了SQLFeatureNotSupportedException,这两方法在其子类各功能的DataSource实现类中都未进行重写,所以ShardingSphere明确了不支持这两个方法的调用。

JDBC是JAVA操作数据库的一套标准接口,但各接口的实现是由数据库驱动中负责实现的,而各家厂商也并非对接口中所有方法都完整的进行了支持,ShardingSphere也类似,对于一些不常用或者无法提供准确值的方法并未提供对应的实现,为了统一管理不支持的方法,所以在实现JDBC接口时,往往都设计了一个公有的父类:一个AbstractUnsupported*类,其中各方法实现就直接抛出SQLFeatureNotSupportedException,这样设计也是为了更方便对不支持的JDBC方法进行统一管理。

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource

/**
 * Unsupported {@code Datasource} methods.
 */
public abstract class AbstractUnsupportedOperationDataSource extends WrapperAdapter implements DataSource {
    
    @Override
    public final int getLoginTimeout() throws SQLException {
        throw new SQLFeatureNotSupportedException("unsupported getLoginTimeout()");
    }
    
    @Override
    public final void setLoginTimeout(final int seconds) throws SQLException {
        throw new SQLFeatureNotSupportedException("unsupported setLoginTimeout(int seconds)");
    }
}

可以看到该类除了实现DataSource接口,同事还继承了WrapperAdapter类
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter

/**
 * Adapter for {@code java.sql.Wrapper}.
 */
public abstract class WrapperAdapter implements Wrapper {
    
    private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
    
    @SuppressWarnings("unchecked")
    @Override
    public final <T> T unwrap(final Class<T> iface) throws SQLException {
        if (isWrapperFor(iface)) {
            return (T) this;
        }
        throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
    }
    
    @Override
    public final boolean isWrapperFor(final Class<?> iface) {
        return iface.isInstance(this);
    }
    
    /**
     * record method invocation.
     * 
     * @param targetClass target class
     * @param methodName method name
     * @param argumentTypes argument types
     * @param arguments arguments
     */
    @SneakyThrows
    public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
        jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
    }
    
    /**
     * Replay methods invocation.
     * 
     * @param target target object
     */
    public final void replayMethodsInvocation(final Object target) {
        for (JdbcMethodInvocation each : jdbcMethodInvocations) {
            each.invoke(target);
        }
    }
}

该类实现了Wrapper接口,其内部持有一个JdbcMethodInvocation的集合,它记录了当前JDBC资源的一些方法操作,recordMethodInvocation方法负责记录外围应用的一些设置方法的调用,例如setAutoCommit、setReadOnly、setFetchSize、setMaxFieldSize等,在外围程序调用ShardingConnection的setAutoCommit、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize时进行调用,replayMethodsInvocation完成在指定目标对象回放这些方法调用,会在底层真实JDBC类(DB driver、数据库连接池等)时进行重新调用。

WrapperAdapter是ShardingSphere各JDBC实现类的基础基类,所以无论是DataSource、Connection、Statement、PrepareStatement都具备了该回放能力。

Wrapper是JDBC中提供的一个接口也是一种设计模式,用于获取JDBC代理类包装的原始类,在JDK该类的源码有该接口的完整说明:
Interface for JDBC classes which provide the ability to retrieve the delegate instance when the instance in question is in fact a proxy class.
The wrapper pattern is employed by many JDBC driver implementations to provide extensions beyond
the traditional JDBC API that are specific to a data source. Developers may wish to gain access to
these resources that are wrapped (the delegates) as proxy class instances representing the
the actual resources. This interface describes a standard mechanism to access
these wrapped resources
represented by their proxy, to permit direct access to the resource delegates.

接下来看下AbstractDataSourceAdapter 类org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter

/**
 * Adapter for {@code Datasource}.
 */
@Getter
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final DatabaseType databaseType;
    
    @Setter
    private PrintWriter logWriter = new PrintWriter(System.out);
    
    public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
        this.dataSourceMap = dataSourceMap;
        databaseType = createDatabaseType();
    }
…
    @Override
    public final Connection getConnection(final String username, final String password) throws SQLException {
        return getConnection();
    }
    
    @Override
    public final void close() throws Exception {
        close(dataSourceMap.keySet());
    }
    …
    protected abstract RuntimeContext getRuntimeContext();
}

可以看到,这个抽象的DataSource适配器类,其内部维护了一个真实DataSource(可以是数据库驱动提供的,也可以是第三方数据库连接)的map以及数据库类型,另外定义一个抽象方法生成对应的RuntimeContext。RuntimeContext作为各JDBC各资源对象间传递的一个上下文对象,其定义了对应规则、属性、数据库类型、执行引擎以及SQL解析引擎。org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext

/**
 * Runtime context.
 *
 * @param <T> type of rule
 */
public interface RuntimeContext<T extends BaseRule> extends AutoCloseable {
    
    /**
     * Get rule.
     * 
     * @return rule
     */
    T getRule();
    
    /**
     * Get properties.
     *
     * @return properties
     */
    ConfigurationProperties getProperties();
    
    /**
     * Get database type.
     * 
     * @return database type
     */
    DatabaseType getDatabaseType();
    
    /**
     * Get execute engine.
     * 
     * @return execute engine
     */
    ExecutorEngine getExecutorEngine();
    
    /**
     * Get SQL parser engine.
     * 
     * @return SQL parser engine
     */
    SQLParserEngine getSqlParserEngine();
}

不同的功能都有对应的RuntimeContext实现类,其的类层次图为:


首先看下AbstractRuntimeContext
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.AbstractRuntimeContext

/**
 * Abstract runtime context.
 *
 * @param <T> type of rule
 */
@Getter
public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
    
    private final T rule;
    
    private final ConfigurationProperties properties;
    
    private final DatabaseType databaseType;
    
    private final ExecutorEngine executorEngine;
    
    private final SQLParserEngine sqlParserEngine;
    
    protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
        this.rule = rule;
        properties = new ConfigurationProperties(null == props ? new Properties() : props);
        this.databaseType = databaseType;
        executorEngine = new ExecutorEngine(properties.<Integer>getValue(ConfigurationPropertyKey.EXECUTOR_SIZE));
        sqlParserEngine = SQLParserEngineFactory.getSQLParserEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
        ConfigurationLogger.log(rule.getRuleConfiguration());
        ConfigurationLogger.log(props);
    }
    
protected abstract ShardingSphereMetaData getMetaData();
…
}

可以看到在构造函数中,对执行引擎、解析引擎等属性进行赋值与初始化,另外关键的是其定义了一个抽象方法getMetaData,其返回ShardingSphereMetaData。各功能实现的RuntimeContex实现类的主要逻辑都为围绕如果生成ShardingSphereMetaData对象。根据数据源是单个还是多个,又分别定义了两个抽象类SingleDataSourceRuntimeContext和MultipleDataSourcesRuntimeContext,其操作就是根据应用传入的DataSource的map,然后生成DataSourceMetas与SchemaMetaData对象从而构建ShardingSphereMetaData实例,其中后者是需要真正的功能RuntimeContext类这种实现,例如在数据分片执行上下文类中

/**
 * Runtime context for sharding.
 */
@Getter
public final class ShardingRuntimeContext extends MultipleDataSourcesRuntimeContext<ShardingRule> {
    
    private final CachedDatabaseMetaData cachedDatabaseMetaData;
    
    private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
    
    public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props, final DatabaseType databaseType) throws SQLException {
        super(dataSourceMap, shardingRule, props, databaseType);
        cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);// 创建缓存元数据
        shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();// 创建事务管理器
        shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
    }

    private CachedDatabaseMetaData createCachedDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
        try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) {
            return new CachedDatabaseMetaData(connection.getMetaData());
        }
    }
    
    @Override
    protected SchemaMetaData loadSchemaMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
        int maxConnectionsSizePerQuery = getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
        boolean isCheckingMetaData = getProperties().<Boolean>getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED);
        SchemaMetaData result = new ShardingMetaDataLoader(dataSourceMap, getRule(), maxConnectionsSizePerQuery, isCheckingMetaData).load(getDatabaseType());
        result = SchemaMetaDataDecorator.decorate(result, getRule(), new ShardingTableMetaDataDecorator());
        if (!getRule().getEncryptRule().getEncryptTableNames().isEmpty()) {
            result = SchemaMetaDataDecorator.decorate(result, getRule().getEncryptRule(), new EncryptTableMetaDataDecorator());
        }
        return result;
    }
..
}

ShardingSphereMetaData类中定义了关于数据库、表、索引的元数据,作为RuntimeContext的一部分,这些元数据在后续各引擎都会使用到。这些元数据类并不复杂,快速浏览下:
org.apache.shardingsphere.underlying.common.metadata.ShardingSphereMetaData

/**
 * ShardingSphere meta data.
 */
@RequiredArgsConstructor
@Getter
public final class ShardingSphereMetaData {
    
    private final DataSourceMetas dataSources;
    
    private final SchemaMetaData schema;
}

org.apache.shardingsphere.underlying.common.metadata.datasource.DataSourceMetas

/**
 * Data source metas.
 */
public final class DataSourceMetas {
    
private final Map<String, DataSourceMetaData> dataSourceMetaDataMap;
…
}

数据源元数据
org.apache.shardingsphere.spi.database.metadata.DataSourceMetaData


/**
 * Data source meta data.
 */
public interface DataSourceMetaData {
    
    /**
     * Get host name.
     * 
     * @return host name
     */
    String getHostName();
    
    /**
     * Get port.
     * 
     * @return port
     */
    int getPort();
    
    /**
     * Get catalog.
     *
     * @return catalog
     */
    String getCatalog();
    
    /**
     * Get schema.
     * 
     * @return schema
     */
    String getSchema();
}

Schema元数据org.apache.shardingsphere.sql.parser.binder.metadata.schema.SchemaMetaData

/**
 * Schema meta data.
 */
public final class SchemaMetaData {
    
private final Map<String, TableMetaData> tables;
…
}

表元数据org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData

/**
 * Table meta data.
 */
@Getter
@EqualsAndHashCode
@ToString
public final class TableMetaData {
    
    private final Map<String, ColumnMetaData> columns;
    
private final Map<String, IndexMetaData> indexes;
…
}

列元数据org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaData

/**
 * Column meta data.
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class ColumnMetaData {
    
    private final String name;
    
    private final int dataType;
    
    private final String dataTypeName;
    
    private final boolean primaryKey;
    
    private final boolean generated;
    
    private final boolean caseSensitive;
}

索引元数据org.apache.shardingsphere.sql.parser.binder.metadata.index.IndexMetaData

/**
 * Index meta data.
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class IndexMetaData {
    
    private final String name;
}

这些元数据类的加载分别有对应的加载类SchemaMetaDataLoader、TableMetaDataLoader、ColumnMetaDataLoader、IndexMetaDataLoader,其核心逻辑都是通过获取数据库连接Connection实例,然后通过其getMetaData()获得DatabaseMetaData实例,然后调用getSchemas、getTables、getColums等方法拿到对应的表与列信息。例如ColumnMetaDataLoader。

org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaDataLoader

/**
 * Column meta data loader.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ColumnMetaDataLoader {
    
    private static final String COLUMN_NAME = "COLUMN_NAME";
    
    private static final String DATA_TYPE = "DATA_TYPE";
    
    private static final String TYPE_NAME = "TYPE_NAME";
    
    /**
     * Load column meta data list.
     * 
     * @param connection connection
     * @param table table name
     * @param databaseType database type
     * @return column meta data list
     * @throws SQLException SQL exception
     */
    public static Collection<ColumnMetaData> load(final Connection connection, final String table, final String databaseType) throws SQLException {
        if (!isTableExist(connection, connection.getCatalog(), table, databaseType)) {
            return Collections.emptyList();
        }
        Collection<ColumnMetaData> result = new LinkedList<>();
        Collection<String> primaryKeys = loadPrimaryKeys(connection, table, databaseType);
        List<String> columnNames = new ArrayList<>();
        List<Integer> columnTypes = new ArrayList<>();
        List<String> columnTypeNames = new ArrayList<>();
        List<Boolean> isPrimaryKeys = new ArrayList<>();
        List<Boolean> isCaseSensitives = new ArrayList<>();
        try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), JdbcUtil.getSchema(connection, databaseType), table, "%")) {
            while (resultSet.next()) {
                String columnName = resultSet.getString(COLUMN_NAME);
                columnTypes.add(resultSet.getInt(DATA_TYPE));
                columnTypeNames.add(resultSet.getString(TYPE_NAME));
                isPrimaryKeys.add(primaryKeys.contains(columnName));
                columnNames.add(columnName);
            }
        }
        try (ResultSet resultSet = connection.createStatement().executeQuery(generateEmptyResultSQL(table, databaseType))) {
            for (String each : columnNames) {
                isCaseSensitives.add(resultSet.getMetaData().isCaseSensitive(resultSet.findColumn(each)));
            }
        }
        for (int i = 0; i < columnNames.size(); i++) {
            // TODO load auto generated from database meta data
            result.add(new ColumnMetaData(columnNames.get(i), columnTypes.get(i), columnTypeNames.get(i), isPrimaryKeys.get(i), false, isCaseSensitives.get(i)));
        }
        return result;
}
…
}

回到功能对应的DataSource最终实现类,例如数据分片ShardingDataSource类

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource

/**
 * Sharding data source.
 */
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
    
    private final ShardingRuntimeContext runtimeContext;
    
    static {
        NewInstanceServiceLoader.register(RouteDecorator.class);
        NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
        NewInstanceServiceLoader.register(ResultProcessEngine.class);
    }
    
    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
        super(dataSourceMap);
        checkDataSourceType(dataSourceMap);
        runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
    }
    
    private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
        for (DataSource each : dataSourceMap.values()) {
            Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
        }
    }
    
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
    }
}
…
}

可以看到,在static代码中注册了该功能需要的路由装饰器、SQL重写上下文装饰器以及结果处理引擎,该类实现了getConnection()方法。

Connection

在ShardingSphere中Connection的实现类层次图:


ShardingSphere中Connection接口的实现类图

与DataSource类似,ShardingSphere在Connection接口的实现中,也是先定义了一个AbstractUnsupportedOperationConnection类,对于主从和数据分片,又定义了一个抽象Connection适配器类。

/**
 * Adapter for {@code Connection}.
 */
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
    
    @Getter
    private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
    
    @Getter
    private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
    
    private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
    
    private boolean autoCommit = true;
    
    private boolean readOnly;
    
    private volatile boolean closed;
    
    private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
    
    protected AbstractConnectionAdapter() {
        rootInvokeHook.start();
    }
    
    /**
     * Get database connection.
     *
     * @param dataSourceName data source name
     * @return database connection
     * @throws SQLException SQL exception
     */
    public final Connection getConnection(final String dataSourceName) throws SQLException {
        return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
    }
    
    /**
     * Get database connections.
     *
     * @param connectionMode connection mode
     * @param dataSourceName data source name
     * @param connectionSize size of connection list to be get
     * @return database connections
     * @throws SQLException SQL exception
     */
    public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
        DataSource dataSource = getDataSourceMap().get(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
        Collection<Connection> connections;
        synchronized (cachedConnections) {
            connections = cachedConnections.get(dataSourceName);
        }
        List<Connection> result;
        if (connections.size() >= connectionSize) {
            result = new ArrayList<>(connections).subList(0, connectionSize);
        } else if (!connections.isEmpty()) {
            result = new ArrayList<>(connectionSize);
            result.addAll(connections);
            List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
            result.addAll(newConnections);
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, newConnections);
            }
        } else {
            result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, result);
            }
        }
        return result;
    }
    
    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
        if (1 == connectionSize) {
            Connection connection = createConnection(dataSourceName, dataSource);
            replayMethodsInvocation(connection);
            return Collections.singletonList(connection);
        }
        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
        synchronized (dataSource) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
}
…
}

可以看到该类内部持有一个Multimap<String, Connection> cachedConnections,该类为guava中支持集合value的一个map,类似Map<String,List<Connection>>,ShardingSphere选择此类型Map,是因为同一数据源可能生成多个数据库连接。
在getConnections方法中,根据cachedConnections已有的连接和目标连接数,如果小于目标连接数,则创建相差的连接。

值得注意的cachedConnections是否有必要加同步锁?

一般而言,各数据库驱动提供的Connection、Statement并不是线程安全的,也就是说并不支持并发操作。通过查看该方法的调用链ShardingPreparedStatement.executeQuery-> PreparedStatementExecutor.init-> PreparedStatementExecutor .obtainExecuteGroups-> AbstractConnectionAdapter.getConnection,可以看到如果并发执行ShardingPreparedStatement.executeQuery方法就会产生对cachedConnections的操作,这里对cachedConnections加synchronized,应该也是为了保证线程安全性,虽然应用这种用法其实并不正确。

这个类还有好多属性设置方法,例如setReadOnly方法,其中recordMethodInvocation方法是为了后续新建的连接都能通过回放已设置的属性,而forceExecuteTemplate.execute则将已创建的连接分别设置该属性值。

  public final void setReadOnly(final boolean readOnly) throws SQLException {
        this.readOnly = readOnly;
        recordMethodInvocation(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
    }

看完Connection的抽象适配器类后,看下其功能实现类,数据分片ShardingConnection,主从MasterSlaveConnection
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection

/**
 * Connection that support sharding.
 */
@Getter
public final class ShardingConnection extends AbstractConnectionAdapter {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final ShardingRuntimeContext runtimeContext;
    
    private final TransactionType transactionType;
    
    private final ShardingTransactionManager shardingTransactionManager;
    
    public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
        this.dataSourceMap = dataSourceMap;
        this.runtimeContext = runtimeContext;
        this.transactionType = transactionType;
        shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
    }
    
    /**
     * Whether hold transaction or not.
     *
     * @return true or false
     */
    public boolean isHoldTransaction() {
        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
    }
    
    @Override
    protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
        return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
    }
    
    private boolean isInShardingTransaction() {
        return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
    }
    
    @Override
    public DatabaseMetaData getMetaData() {
        return new ShardingDatabaseMetaData(this);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingPreparedStatement(this, sql);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}
...
@Override
    public Statement createStatement() {
        return new ShardingStatement(this);
    }
    
    @Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
        return new ShardingStatement(this, resultSetType, resultSetConcurrency);
    }
…
}

可以看到prepareStatement方法返回的是ShardingPreparedStatement实例,createStatement方法返回的是ShardingStatement。
另外createConnection方法(AbstractConnectionAdapter类会调用该方法创建Connection)中,判断是否为分片事务,如果是则通过分片事务管理器获取连接。
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection

/**
 * Connection that support master-slave.
 */
@RequiredArgsConstructor
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final MasterSlaveRuntimeContext runtimeContext;
    
    @Override
    protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
        return dataSource.getConnection();
    }
    
    @Override
    public DatabaseMetaData getMetaData() {
        return new MasterSlaveDatabaseMetaData(this);
    }
    
    @Override
    public Statement createStatement() {
        return new MasterSlaveStatement(this);
    }
    
    @Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
        return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
    }
…
}

相比ShardingConnection,MasterSlaveConnection类则简单一些,因为不涉及分片事务,可以看到创建的Statement分别为MasterSlavePreparedStatement和MasterSlaveStatement。

Statement

接下来我们看下ShardingSphere中Statement的各个实现类,类图如下:


ShaShardingSphere中Statement接口的实现类图

与DataSource与Connection接口实现类一样,分别定义了两个抽象不支持类AbstractUnsupportedOperationStatement和AbstractUnsupportedOperationPreparedStatement,分别对Statement和PreparedStatement接口中不支持的方法进行默认实现(直接抛出SQLFeatureNotSupportedException异常)。

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement

/**
 * Unsupported {@code Statement} METHODS.
 */
public abstract class AbstractUnsupportedOperationStatement extends WrapperAdapter implements Statement {
    
    @Override
    public final int getFetchDirection() throws SQLException {
        throw new SQLFeatureNotSupportedException("getFetchDirection");
    }
    
    @Override
    public final void setFetchDirection(final int direction) throws SQLException {
        throw new SQLFeatureNotSupportedException("setFetchDirection");
    }
    
    @Override
    public final void addBatch(final String sql) throws SQLException {
        throw new SQLFeatureNotSupportedException("addBatch sql");
    }
    
    @Override
    public void clearBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("clearBatch");
    }
…
}

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement

**
 * Unsupported {@code PreparedStatement} methods.
 */
public abstract class AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter implements PreparedStatement {
    
    public AbstractUnsupportedOperationPreparedStatement() {
        super(PreparedStatement.class);
    }
    
    @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        throw new SQLFeatureNotSupportedException("getMetaData");
    }
    
    /**
     * Get parameter meta data.
     *
     * @return parameter metadata
     * @throws SQLException SQL exception
     */
    @Override
    public ParameterMetaData getParameterMetaData() throws SQLException {
        throw new SQLFeatureNotSupportedException("ParameterMetaData");
    }
    
    @Override
    public final void setNString(final int parameterIndex, final String x) throws SQLException {
        throw new SQLFeatureNotSupportedException("setNString");
    }
…
}

这两个类作为抽象父类,只是提供了一个默认实现,在最终的【功能】实现类中,根据功能逻辑对部分方法进行了重写实现。

来看下Statement分支的另外一个抽象类
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter

/**
 * Adapter for {@code Statement}.
 */
@RequiredArgsConstructor
public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {
    
    private final Class<? extends Statement> targetClass;
    
    private boolean closed;
    
    private boolean poolable;
    
    private int fetchSize;
    
    private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    @SuppressWarnings("unchecked")
    @Override
    public final void close() throws SQLException {
        closed = true;
        try {
            forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
        } finally {
            getRoutedStatements().clear();
        }
    }
    
    @Override
    public final boolean isClosed() {
        return closed;
}
…
@Override
    public final int getFetchSize() {
        return fetchSize;
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public final void setFetchSize(final int rows) throws SQLException {
        this.fetchSize = rows;
        recordMethodInvocation(targetClass, "setFetchSize", new Class[] {int.class}, new Object[] {rows});
        forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
    }
…
@Override
    public final int getUpdateCount() throws SQLException {
        if (isAccumulate()) {
            return accumulate();
        } else {
            Collection<? extends Statement> statements = getRoutedStatements();
            if (statements.isEmpty()) {
                return -1;
            }
            return getRoutedStatements().iterator().next().getUpdateCount();
        }
    }
    
    private int accumulate() throws SQLException {
        long result = 0;
        boolean hasResult = false;
        for (Statement each : getRoutedStatements()) {
            int updateCount = each.getUpdateCount();
            if (updateCount > -1) {
                hasResult = true;
            }
            result += updateCount;
        }
        if (result > Integer.MAX_VALUE) {
            result = Integer.MAX_VALUE;
        }
        return hasResult ? Long.valueOf(result).intValue() : -1;
    }
…
    protected abstract boolean isAccumulate();
    
    protected abstract Collection<? extends Statement> getRoutedStatements();
}

从代码中可以看到该类实现了close、poolable、fetchSize、maxFieldSize、maxRows、queryTimeout的getter和setter方法,其实现逻辑是类似的,即依次设置其底层对应的Statement的对应方法,这里依然使用了recordMethodInvocation和forceExecuteTemplate.execute方法。

该类中定义了一个抽象方法getRoutedStatements(),该方法返回路由后对应的真正底层的Statement实现类,不同功能中该方法实现不同,例如数据分片ShardingPreparedStatement中就是路由的Statement,可能有多个,而主从、加密、影子表中该方法则只有一个。

该类中另外一个抽象方法是isAccumulate(),这个方法判断是否需要进行累计,在getUpdateCount()方法中,会首先判断isAccumulate()的值,如果是true,则需要将getRoutedStatements()返回的各Statement的getUpdateCount()值进行累计。此方法的子类实现中,数据分片中如果并非全部是广播表时,此方法返回即为true,主从、加密、影子表
此方法返回则都为false。

ShardingPreparedStatement和ShardingStatement类中

    @Override
    public boolean isAccumulate() {
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
    }

接下来我们看下看下数据分片对应的Statement实现类
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement

**
 * Statement that support sharding.
 */
public final class ShardingStatement extends AbstractStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final StatementExecutor statementExecutor;
    
    private boolean returnGeneratedKeys;
    
    private ExecutionContext executionContext;
    
    private ResultSet currentResultSet;
    
    public ShardingStatement(final ShardingConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) {
        this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        ResultSet result;
        try {
            executionContext = prepare(sql);
            List<QueryResult> queryResults = statementExecutor.executeQuery();
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            currentResultSet = null;
        }
        currentResultSet = result;
        return result;
    }
    
    @Override
    public int executeUpdate(final String sql) throws SQLException {
        try {
            executionContext = prepare(sql);
            return statementExecutor.executeUpdate();
        } finally {
            currentResultSet = null;
        }
    }
…
private ExecutionContext prepare(final String sql) throws SQLException {
        statementExecutor.clear();
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        BasePrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
                runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        ExecutionContext result = prepareEngine.prepare(sql, Collections.emptyList());
        statementExecutor.init(result);
        statementExecutor.getStatements().forEach(this::replayMethodsInvocation);
        return result;
    }
    
    private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
        return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
    }
…
}

在该类的SQL执行方法executeQuery、executeUpdate、execute方法中可以看到,其内部先执行了prepare方法,然后调用了StatementExecutor的executeQuery、executeUpdate、execute方法。
在prepare方法中,首先通过SimpleQueryPrepareEngine.prepare方法完成SQL的解析、路由计算、SQL改写操作(这里已经属于内核引擎部分,在总览篇中已介绍,本篇就不重复展开),然后对StatementExecutor进行初始化以及底层Statement的方法回放(例如setFetchSize、setQueryTimeout)。

真正的SQL执行其实是由StatementExecutor完成的,看下StatementExecutor的类层次图:

org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor

/**
 * Abstract statement executor.
 */
@Getter
public abstract class AbstractStatementExecutor {
    
    private final DatabaseType databaseType;
    
    private final int resultSetType;
    
    private final int resultSetConcurrency;
    
    private final int resultSetHoldability;
    
    private final ShardingConnection connection;
    
    private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
    
    private final SQLExecuteTemplate sqlExecuteTemplate;
    
    private final Collection<Connection> connections = new LinkedList<>();
    
    private final List<List<Object>> parameterSets = new LinkedList<>();
    
    private final List<Statement> statements = new LinkedList<>();
    
    private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
    
    private final Collection<InputGroup<StatementExecuteUnit>> inputGroups = new LinkedList<>();
    
    @Setter
    private SQLStatementContext sqlStatementContext;
    
    public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) {
        this.databaseType = shardingConnection.getRuntimeContext().getDatabaseType();
        this.resultSetType = resultSetType;
        this.resultSetConcurrency = resultSetConcurrency;
        this.resultSetHoldability = resultSetHoldability;
        this.connection = shardingConnection;
        int maxConnectionsSizePerQuery = connection.getRuntimeContext().getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
        ExecutorEngine executorEngine = connection.getRuntimeContext().getExecutorEngine();
        sqlExecutePrepareTemplate = new SQLExecutePrepareTemplate(maxConnectionsSizePerQuery);
        sqlExecuteTemplate = new SQLExecuteTemplate(executorEngine, connection.isHoldTransaction());
    }
    
…
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);// 对输入的分组对象,执行指定的回调操作
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);// 刷新元数据, 因为DDL会修改元数据信息
        return result;
    }
…
    private void refreshMetaDataIfNeeded(final ShardingRuntimeContext runtimeContext, final SQLStatementContext sqlStatementContext) throws SQLException {
        if (null == sqlStatementContext) {
            return;
        }
        if (sqlStatementContext instanceof CreateTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((CreateTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof AlterTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((AlterTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof DropTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((DropTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof CreateIndexStatementContext) {
            refreshTableMetaData(runtimeContext, ((CreateIndexStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof DropIndexStatementContext) {
            refreshTableMetaData(runtimeContext, ((DropIndexStatementContext) sqlStatementContext).getSqlStatement());
        }
    }
    
    private void refreshTableMetaData(final ShardingRuntimeContext runtimeContext, final CreateTableStatement createTableStatement) throws SQLException {
        String tableName = createTableStatement.getTable().getTableName().getIdentifier().getValue();
        runtimeContext.getMetaData().getSchema().put(tableName, loadTableMeta(tableName, databaseType));
}
….
}

AbstractStatementExecutor是各类StatementExecutor的抽象类,它包含了Statement执行的各种信息,包括resultSetType、resultSetConcurrency、resultSetHoldability,另外也持有ShardingSphere的Connection实例、路由后的真实Statement列表,参数集合、执行后的结果集、执行分组信息、以及执行准备引擎、执行引擎。

在executeCallback 方法中,可以看到其内部是通过SQLExecuteTemplate来真正完成SQL的执行,SQLExecuteTemplate是执行引擎的关键类,已在执行引擎篇进行了分析,这里就不展开;之后调用了refreshMetaDataIfNeeded方法,对元数据进行了刷新,因为当执行DDL语句时,表名、列名、字段类型、主键以及索引等信息可能发生了变化,因此需要同步对元数据进行修改。这里的元数据指的就是RuntimeContext中的ShardingSphereMetaData属性,该类实例封装了ShardingSphere所需分库分表等各类元数据,在各环节、引擎之间传递,提供所需的各类基础信息。

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

/**
 * Prepared statement executor.
 */
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
    
    @Getter
    private final boolean returnGeneratedKeys;
    
    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }
    
    /**
     * Initialize executor.
     *
     * @param executionContext execution context
     * @throws SQLException SQL exception
     */
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));// 生成执行分组
        cacheStatements();
    }

    private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {

            @Override
            // 在指定数据源上创建要求数量的数据库连接
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            //根据执行单元信息 创建Statement执行单元对象
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }
    
    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            // 在指定的Statement上执行SQL,将JDBC结果集包装成查询QueryResult对象(基于流模式、基于内存模式两类)
            protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);// 通过executeCallback操作
    }
…
}

可以看到在最终的StatementExecutor实现类中,实现了executeQuery、executeUpdate、execute方法,这些方法中定义SQLExecuteCallback实例,然后由父类中executeCallback完成最后的SQL执行。

在该类中完成的关键操作是init方法,其生成了执行分组信息,同时调用父类中cacheStatements方法,将底层Statement和参数进行了记录。

回到Statement实现类,简单看下主从功能对应的
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement

/**
 * Statement that support master-slave.
 */
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {
    
    private final MasterSlaveConnection connection;
    
    @Getter(AccessLevel.NONE)
    private final DataNodeRouter dataNodeRouter;
    
    private final int resultSetType;
    
    private final int resultSetConcurrency;
    
    private final int resultSetHoldability;
    
    private final Collection<Statement> routedStatements = new LinkedList<>();
    
    public MasterSlaveStatement(final MasterSlaveConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
        this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        dataNodeRouter = new DataNodeRouter(connection.getRuntimeContext().getMetaData(), connection.getRuntimeContext().getProperties(), connection.getRuntimeContext().getSqlParserEngine());
        this.resultSetType = resultSetType;
        this.resultSetConcurrency = resultSetConcurrency;
        this.resultSetHoldability = resultSetHoldability;
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        clearPrevious();
        MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
        SimpleQueryPrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
                Collections.singletonList(runtimeContext.getRule()), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        ExecutionContext executionContext = prepareEngine.prepare(sql, Collections.emptyList());
        ExecutionUnit executionUnit = executionContext.getExecutionUnits().iterator().next();
        Preconditions.checkState(1 == executionContext.getExecutionUnits().size(), "Cannot support executeQuery for DML or DDL");
        Statement statement = connection.getConnection(executionUnit.getDataSourceName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
        routedStatements.add(statement);
        return statement.executeQuery(executionUnit.getSqlUnit().getSql());
    }
    
    @Override
    public int executeUpdate(final String sql) throws SQLException {
        clearPrevious();
        int result = 0;
        MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
        RouteContext routeContext = dataNodeRouter.route(sql, Collections.emptyList(), false);
        routeContext = new MasterSlaveRouteDecorator().decorate(routeContext, runtimeContext.getMetaData(), runtimeContext.getRule(), runtimeContext.getProperties());
        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
            Statement statement = connection.getConnection(each.getDataSourceMapper().getActualName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
            routedStatements.add(statement);
            result += statement.executeUpdate(sql);
        }
        return result;
}
…
}

可以看到,与ShardingStatement类似,executeQuery方法中,也是通过SimpleQueryPrepareEngine进行prepare操作,不过与ShardingStatement中通过StatementExecutor创建Statement,执行SQL不同,这里是直接调用MasterSlaveConnection. getConnection拿到Connection,然后直接调用createStatement方法进行Statement,进而执行SQL的。

另外在其executeUpdate方法中,也不像ShardingStatement中统一都使用SimpleQueryPrepareEngine进行prepare操作,而是,直接就操作了dataNodeRouter. route方法,和MasterSlaveRouteDecorator. decorate方法,这种不一致显然不如ShardingStatement更加容易让人理解。

PreparedStatement

接下来看下PreparedStatement分支,首先看下其抽象类

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter

/**
 * Sharding adapter for {@code PreparedStatement}.
 */
public abstract class AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement {
    
    private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
    
    @Getter
    private final List<Object> parameters = new ArrayList<>();
    
    @Override
    public final void setNull(final int parameterIndex, final int sqlType) {
        setParameter(parameterIndex, null);
    }
    
    @Override
    public final void setNull(final int parameterIndex, final int sqlType, final String typeName) {
        setParameter(parameterIndex, null);
    }
…
private void setParameter(final int parameterIndex, final Object value) {
        if (parameters.size() == parameterIndex - 1) {
            parameters.add(value);
            return;
        }
        for (int i = parameters.size(); i <= parameterIndex - 1; i++) {
            parameters.add(null);
        }
        parameters.set(parameterIndex - 1, value);
    }
    
    protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
        setParameterMethodInvocations.clear();
        addParameters(parameters);
        for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
            each.invoke(preparedStatement);
        }
    }
    
    private void addParameters(final List<Object> parameters) {
        int i = 0;
        for (Object each : parameters) {
            setParameters(new Class[]{int.class, Object.class}, i++ + 1, each);
        }
    }
    
    @SneakyThrows
    private void setParameters(final Class[] argumentTypes, final Object... arguments) {
        setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod("setObject", argumentTypes), arguments, arguments[1]));
    }
    
    @Override
    public final void clearParameters() {
        parameters.clear();
        setParameterMethodInvocations.clear();
    }
}

可以看到该类实现了PreparedStatement接口的各种参数设置方法,其parameters记录了设置的各参数,setParameterMethodInvocations属性记录了参数设置方法调用。

而在其子类中,org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

/**
 * PreparedStatement that support sharding.
 */
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final String sql;
    
    @Getter
    private final ParameterMetaData parameterMetaData;
    
    private final BasePrepareEngine prepareEngine;
    
    private final PreparedStatementExecutor preparedStatementExecutor;
    
    private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
    
    private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
    
    private ExecutionContext executionContext;
    
    private ResultSet currentResultSet;
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
    }
    
    public ShardingPreparedStatement(
        final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
        this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false);
    }
    
    private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
                                      final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        this.sql = sql;
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
        prepareEngine = new PreparedQueryPrepareEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
    
    @Override
    public int executeUpdate() throws SQLException {
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            return preparedStatementExecutor.executeUpdate();
        } finally {
            clearBatch();
        }
}
…
private void initPreparedStatementExecutor() throws SQLException {
        preparedStatementExecutor.init(executionContext);
        setParametersForStatements();
        replayMethodForStatements();
    }
    
    private void setParametersForStatements() {
        for (int i = 0; i < preparedStatementExecutor.getStatements().size(); i++) {
            replaySetParameter((PreparedStatement) preparedStatementExecutor.getStatements().get(i), preparedStatementExecutor.getParameterSets().get(i));
        }
    }
    
    private void replayMethodForStatements() {
        for (Statement each : preparedStatementExecutor.getStatements()) {
            replayMethodsInvocation(each);
        }
}
…
}

可以看到,该类中实现了executeQuery、executeUpdate、execute方法,这几个方法的 内部又分别调用clearPrevious、prepare、 initPreparedStatementExecutor以及PreparedStatementExecutor中对应的execute*方法。

  • clearPrevious方法即清空该PreparedStatement之前执行的参数、连接、结果集、分组信息以及Statement对象;
  • prepare方法是通过调用PreparedQueryPrepareEngine#prepare,其内部实现逻辑在总览篇和引擎篇已进行了分析,这里不重复展开。
  • initPreparedStatementExecutor方法则获取底层真实的PreparedStatement,然后设置对应的参数。
  • PreparedStatementExecutor的execute*方法,前面已介绍。

除了参数设置,与ShardingStatement相比,ShardingPreparedStatement还支持批量操作,即JDBC中的addBatch、executeBatch方法。

看下ShardingPreparedStatement中这两批量操作方法的实现。

    
    @Override
    public void addBatch() {
        try {
            prepare();
            batchPreparedStatementExecutor.addBatchForRouteUnits(executionContext);
        } finally {
            currentResultSet = null;
            clearParameters();
        }
    }
    
    @Override
    public int[] executeBatch() throws SQLException {
        try {
            initBatchPreparedStatementExecutor();
            return batchPreparedStatementExecutor.executeBatch();
        } finally {
            clearBatch();
        }
    }
    
    private void initBatchPreparedStatementExecutor() throws SQLException {
        batchPreparedStatementExecutor.init(executionContext.getSqlStatementContext());
        setBatchParametersForStatements();
    }
    
    private void setBatchParametersForStatements() throws SQLException {
        for (Statement each : batchPreparedStatementExecutor.getStatements()) {
            List<List<Object>> parameterSet = batchPreparedStatementExecutor.getParameterSet(each);
            for (List<Object> parameters : parameterSet) {
                replaySetParameter((PreparedStatement) each, parameters);
                ((PreparedStatement) each).addBatch();
            }
        }
    }
    
    @Override
    public void clearBatch() throws SQLException {
        currentResultSet = null;
        batchPreparedStatementExecutor.clear();
        clearParameters();
    }
    

可以看到,addBatch方法在调用prepare方法后,又调用了BatchPreparedStatementExecutor#addBatchForRouteUnits(executionContext),而executeBatch方法则调用了BatchPreparedStatementExecutor#init和executeBatch方法,setBatchParametersForStatements方法则获取到BatchPreparedStatementExecutor内部的Statement和参数集合后,调用底层PreparedStatement#addBatch方法,添加批量参数。接下来看下BatchPreparedStatementExecutor类。

org.apache.shardingsphere.shardingjdbc.executor.batch.BatchPreparedStatementExecutor

/**
 * Prepared statement executor to process add batch.
 */
public final class BatchPreparedStatementExecutor extends AbstractStatementExecutor {
    
    private final Collection<BatchRouteUnit> routeUnits = new LinkedList<>();
    
    @Getter
    private final boolean returnGeneratedKeys;
    
    private int batchCount;
    
    public BatchPreparedStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
                                          final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
}
    /**
     * Initialize executor.
     *
     * @param sqlStatementContext SQL statement context
     * @throws SQLException SQL exception
     */
    public void init(final SQLStatementContext sqlStatementContext) throws SQLException {
        setSqlStatementContext(sqlStatementContext);
        getInputGroups().addAll(obtainExecuteGroups(routeUnits));
    }
…
    /**
     * Add batch for route units.
     *
     * @param executionContext execution context
     */
    public void addBatchForRouteUnits(final ExecutionContext executionContext) {
        handleOldBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
        handleNewBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
        batchCount++;
    }
    
    private Collection<BatchRouteUnit> createBatchRouteUnits(final Collection<ExecutionUnit> executionUnits) {
        Collection<BatchRouteUnit> result = new LinkedList<>();
        for (ExecutionUnit each : executionUnits) {
            result.add(new BatchRouteUnit(each));
        }
        return result;
    }
    
    private void handleOldBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
        for (BatchRouteUnit each : newRouteUnits) {
            for (BatchRouteUnit unit : routeUnits) {
                if (unit.equals(each)) {
                    reviseBatchRouteUnit(unit, each);
                }
            }
        }
    }
    
    private void reviseBatchRouteUnit(final BatchRouteUnit oldBatchRouteUnit, final BatchRouteUnit newBatchRouteUnit) {
        oldBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters().addAll(newBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters());
        oldBatchRouteUnit.mapAddBatchCount(batchCount);
    }
    
    private void handleNewBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
        newRouteUnits.removeAll(routeUnits);
        for (BatchRouteUnit each : newRouteUnits) {
            each.mapAddBatchCount(batchCount);
        }
        routeUnits.addAll(newRouteUnits);
    }
…
/**
     * Execute batch.
     * 
     * @return execute results
     * @throws SQLException SQL exception
     */
    public int[] executeBatch() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return statement.executeBatch();
            }
        };
        List<int[]> results = executeCallback(callback);
        if (isAccumulate()) {
            return accumulate(results);
        } else {
            return results.get(0);
        }
}
…
}

可以看到在addBatchForRouteUnits方法中,handleOldBatchRouteUnits和
handleNewBatchRouteUnits这两个方法其实就是按照路由单元进行合并批量执行的参数;而在init方法中则生成了执行分组信息;而executeBatch依然是通过父类中executeCallback方法按照分组,依次调用底层PreparedStatement的executeBatch进行SQL的执行。

ResultSet

ResultSet接口表示数据库结果集(通常由Statement执行完查询SQL后得到),它包含了一个类似表格的数据,内部维护了一个游标,访问完一条记录的各列后,可以通过next()方法将游标指向下一行数据,直到访问到最后一条数据为止。

ShardingSphere中ResultSet接口的实现类图

同样,ResultSet的实现类中,也定义了几个抽象的UnSupported类,AbstractUnsupportedUpdateOperationResultSet实现了ResultSet中更新类的方法,目前在ShardingSphere中不支持ResultSet接口中update*方法,AbstractUnsupportedOperationResultSet实现了其它操作类方法,AbstractUnsupportedDatabaseMetaDataResultSet则定义了DatabaseMetaData不支持的方法(ShardingDatabaseMetaData中getSchemas、getColumns等方法的返回值),AbstractUnsupportedGeneratedKeysResultSet定义了GeneratedKeys不支持方法(Statement中getGeneratedKeys()返回值)。

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter

/**
 * Adapter for {@code ResultSet}.
 */
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
    
    @Getter
    private final List<ResultSet> resultSets;
    
    @Getter
    private final Statement statement;
    
    private boolean closed;
    
    private final ForceExecuteTemplate<ResultSet> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    @Getter
    private final ExecutionContext executionContext;
    
    public AbstractResultSetAdapter(final List<ResultSet> resultSets, final Statement statement, final ExecutionContext executionContext) {
        Preconditions.checkArgument(!resultSets.isEmpty());
        this.resultSets = resultSets;
        this.statement = statement;
        this.executionContext = executionContext;
    }
    
    @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
    }
    
    private ShardingRule getShardingRule() {
        ShardingConnection connection = statement instanceof ShardingPreparedStatement ? ((ShardingPreparedStatement) statement).getConnection() : ((ShardingStatement) statement).getConnection();
        return connection.getRuntimeContext().getRule();
    }
    
    @Override
    public final int findColumn(final String columnLabel) throws SQLException {
        return resultSets.get(0).findColumn(columnLabel);
}
…
}

可以看到大部分方法实现,是通过第一个底层ResultSet,调用其对应方法获得,这些属性的设置,也依次对各个底层ResultSet执行对应设置方法即可。

看下数据分片对应的ShardingResultSet类

org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet

/**
 * Result that support sharding.
 */
public final class ShardingResultSet extends AbstractResultSetAdapter {
    
    private final MergedResult mergeResultSet;
    
    private final Map<String, Integer> columnLabelAndIndexMap;

    public ShardingResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final ExecutionContext executionContext) throws SQLException {
        super(resultSets, statement, executionContext);
        this.mergeResultSet = mergeResultSet;
        columnLabelAndIndexMap = createColumnLabelAndIndexMap(resultSets.get(0).getMetaData());
    }
    
    private Map<String, Integer> createColumnLabelAndIndexMap(final ResultSetMetaData resultSetMetaData) throws SQLException {
        Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (int columnIndex = resultSetMetaData.getColumnCount(); columnIndex > 0; columnIndex--) {
            result.put(resultSetMetaData.getColumnLabel(columnIndex), columnIndex);
        }
        return result;
    }
    
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    
    @Override
    public boolean wasNull() throws SQLException {
        return mergeResultSet.wasNull();
    }
    
    @Override
    public boolean getBoolean(final int columnIndex) throws SQLException {
        return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
    }
    
    @Override
    public boolean getBoolean(final String columnLabel) throws SQLException {
        int columnIndex = columnLabelAndIndexMap.get(columnLabel);
        return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
    }
…

}

可以看到各方法调用的其实是MergedResult#getValue获取指定的列的值。

ResultSet作为JDBC的一个通用接口,不仅作为SQL查询返回的结果标识,还用在了很多多条记录表示的场景,例如在DatabaseMetaData、Statement的自增生成键结果等。

org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.DatabaseMetaDataResultSet

/**
 * Database meta data result set.
 */
public final class DatabaseMetaDataResultSet<T extends BaseRule> extends AbstractUnsupportedDatabaseMetaDataResultSet {
    
    private static final String TABLE_NAME = "TABLE_NAME";
    
    private static final String INDEX_NAME = "INDEX_NAME";
    
    private final int type;
    
    private final int concurrency;
    
    private final T rule;
    
    private final ResultSetMetaData resultSetMetaData;
    
    private final Map<String, Integer> columnLabelIndexMap;
    
    private final Iterator<DatabaseMetaDataObject> databaseMetaDataObjectIterator;
    
    private volatile boolean closed;
    
    private DatabaseMetaDataObject currentDatabaseMetaDataObject;
    
    public DatabaseMetaDataResultSet(final ResultSet resultSet, final T rule) throws SQLException {
        this.type = resultSet.getType();
        this.concurrency = resultSet.getConcurrency();
        this.rule = rule;
        this.resultSetMetaData = resultSet.getMetaData();
        this.columnLabelIndexMap = initIndexMap();
        this.databaseMetaDataObjectIterator = initIterator(resultSet);
    }
    
    private Map<String, Integer> initIndexMap() throws SQLException {
        Map<String, Integer> result = new HashMap<>(resultSetMetaData.getColumnCount());
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            result.put(resultSetMetaData.getColumnLabel(i), i);
        }
        return result;
    }
    
    private Iterator<DatabaseMetaDataObject> initIterator(final ResultSet resultSet) throws SQLException {
        LinkedList<DatabaseMetaDataObject> result = new LinkedList<>();
        Set<DatabaseMetaDataObject> removeDuplicationSet = new HashSet<>();
        int tableNameColumnIndex = columnLabelIndexMap.getOrDefault(TABLE_NAME, -1);
        int indexNameColumnIndex = columnLabelIndexMap.getOrDefault(INDEX_NAME, -1);
        while (resultSet.next()) {
            DatabaseMetaDataObject databaseMetaDataObject = generateDatabaseMetaDataObject(tableNameColumnIndex, indexNameColumnIndex, resultSet);
            if (!removeDuplicationSet.contains(databaseMetaDataObject)) {
                result.add(databaseMetaDataObject);
                removeDuplicationSet.add(databaseMetaDataObject);
            }
        }
        return result.iterator();
    }
    
    private DatabaseMetaDataObject generateDatabaseMetaDataObject(final int tableNameColumnIndex, final int indexNameColumnIndex, final ResultSet resultSet) throws SQLException {
        DatabaseMetaDataObject result = new DatabaseMetaDataObject(resultSetMetaData.getColumnCount());
        for (int i = 1; i <= columnLabelIndexMap.size(); i++) {
            if (tableNameColumnIndex == i) {
                String tableName = resultSet.getString(i);
                Collection<String> logicTableNames = rule instanceof ShardingRule ? ((ShardingRule) rule).getLogicTableNames(tableName) : Collections.emptyList();
                result.addObject(logicTableNames.isEmpty() ? tableName : logicTableNames.iterator().next());
            } else if (indexNameColumnIndex == i) {
                String tableName = resultSet.getString(tableNameColumnIndex);
                String indexName = resultSet.getString(i);
                result.addObject(null != indexName && indexName.endsWith(tableName) ? indexName.substring(0, indexName.indexOf(tableName) - 1) : indexName);
            } else {
                result.addObject(resultSet.getObject(i));
            }
        }
        return result;
    }
    
    @Override
    public boolean next() throws SQLException {
        checkClosed();
        if (databaseMetaDataObjectIterator.hasNext()) {
            currentDatabaseMetaDataObject = databaseMetaDataObjectIterator.next();
            return true;
        }
        return false;
    }
    
    @Override
    public void close() throws SQLException {
        checkClosed();
        closed = true;
    }
    
    @Override
    public boolean wasNull() throws SQLException {
        checkClosed();
        return false;
    }
    
    @Override
    public String getString(final int columnIndex) throws SQLException {
        checkClosed();
        checkColumnIndex(columnIndex);
        return (String) ResultSetUtil.convertValue(currentDatabaseMetaDataObject.getObject(columnIndex), String.class);
}
…
}

这些ResultSet的实现类,其构造函数都会传入一个底层ResultSet或者迭代器Iterator,next()方法其实就是通过调用内部ResultSet或者Iterator的nex()完成游标的迁移,而get方法则直接调用 ResultSet或者Iterator中的值的get方法。

MetaData类接口

JDBC中有几个元数据接口DatabaseMetaData、ResultSetMetaData、ParameterMetaData,
分别表示数据库+驱动元数据(Catalog、Schema、Table、Column、Index等信息,通过Connection#getMetaData()方法获得)、结果集元数据(列的类型属性,通过ResultSet# getMetaData()方法获得)以及参数元数据(参数数量、类型以及属性,通过PreparedStatement# getParameterMetaData()获得)。

DatabaseMetaData

ShardingSphere中DatabaseMetaData接口的实现类图

看下涉及到的几个类
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.AdaptedDatabaseMetaData

/**
 * Adapted database meta data.
 */
@RequiredArgsConstructor
public abstract class AdaptedDatabaseMetaData extends WrapperAdapter implements DatabaseMetaData {
    
    private final CachedDatabaseMetaData cachedDatabaseMetaData;
    
    @Override
    public final String getURL() {
        return cachedDatabaseMetaData.getUrl();
    }
    
    @Override
    public final String getUserName() {
        return cachedDatabaseMetaData.getUserName();
    }
    
    @Override
    public final String getDatabaseProductName() {
        return cachedDatabaseMetaData.getDatabaseProductName();
    }

看到此类对DatabaseMetaData方法进行了实现,都是直接调用CachedDatabaseMetaData实例对应方法直接返回。

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.MultipleDatabaseMetaData

/**
 * Multiple database meta data.
 */
@Getter
public abstract class MultipleDatabaseMetaData<C extends AbstractConnectionAdapter> extends AdaptedDatabaseMetaData {
    
    private final C connection;
    
    private final Collection<String> datasourceNames;
    
    private final ShardingSphereMetaData shardingSphereMetaData;
    
    private String currentDataSourceName;
    
    private DatabaseMetaData currentDatabaseMetaData;
    
    public MultipleDatabaseMetaData(final C connection, final Collection<String> datasourceNames,
                                    final CachedDatabaseMetaData cachedDatabaseMetaData, final ShardingSphereMetaData shardingSphereMetaData) {
        super(cachedDatabaseMetaData);
        this.connection = connection;
        this.datasourceNames = datasourceNames;
        this.shardingSphereMetaData = shardingSphereMetaData;
    }
    
    @Override
    public final Connection getConnection() throws SQLException {
        return connection.getConnection(getDataSourceName());
    }
    
    @Override
    public final ResultSet getSuperTypes(final String catalog, final String schemaPattern, final String typeNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTypes(getActualCatalog(catalog), getActualSchema(schemaPattern), typeNamePattern));
    }
    
    @Override
    public final ResultSet getSuperTables(final String catalog, final String schemaPattern, final String tableNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTables(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern)));
    @Override
    public final ResultSet getColumns(final String catalog, final String schemaPattern, final String tableNamePattern, final String columnNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(
            getDatabaseMetaData().getColumns(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern), columnNamePattern));
    }
    
…
  protected abstract String getActualTableNamePattern(String tableNamePattern);
    
    protected abstract String getActualTable(String table);
    
    protected abstract ResultSet createDatabaseMetaDataResultSet(ResultSet resultSet) throws SQLException;
…
}

可以看到,MultipleDatabaseMetaData类中,对AdaptedDatabaseMetaData类中大多数方法进行了重写,通过调用子类实现的createDatabaseMetaDataResultSet方法获得真实的DatabaseMetaDataResultSet,同时通过子类实现getActualTableNamePattern、getActualTable方法获取到真实物理表名信息。

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.ShardingDatabaseMetaData

/**
 * Sharding database meta data.
 */
public final class ShardingDatabaseMetaData extends MultipleDatabaseMetaData<ShardingConnection> {
    
    private ShardingRule shardingRule;
    
    public ShardingDatabaseMetaData(final ShardingConnection connection) {
        super(connection, connection.getDataSourceMap().keySet(), connection.getRuntimeContext().getCachedDatabaseMetaData(), connection.getRuntimeContext().getMetaData());
        shardingRule = connection.getRuntimeContext().getRule();
    }
    
    @Override
    public String getActualTableNamePattern(final String tableNamePattern) {
        if (null == tableNamePattern) {
            return null;
        }
        return shardingRule.findTableRule(tableNamePattern).isPresent() ? "%" + tableNamePattern + "%" : tableNamePattern;
    }
    
    @Override
    public String getActualTable(final String table) {
        if (null == table) {
            return null;
        }
        String result = table;
        if (shardingRule.findTableRule(table).isPresent()) {
            DataNode dataNode = shardingRule.getDataNode(table);
            result = dataNode.getTableName();
        }
        return result;
    }
    
    @Override
    protected ResultSet createDatabaseMetaDataResultSet(final ResultSet resultSet) throws SQLException {
        return new DatabaseMetaDataResultSet<>(resultSet, shardingRule);
    }
}

可以看到,在ShardingDatabaseMetaData类中,主要实现的getActualTable 、getActualTableNamePattern方法中就是根据ShardingRule查找到真实的表名、表名pattern以及创建DatabaseMetaDataResultSet实例。

ResultSetMetaData

ShardingSphere中ResultSetMetaData接口的实现类图

*org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSetMetaData *

/**
 * Sharding result set meta data.
 */
@RequiredArgsConstructor
public final class ShardingResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
    
    private final ResultSetMetaData resultSetMetaData;
    
    private final ShardingRule shardingRule;
    
    private final SQLStatementContext sqlStatementContext;
    
    @Override
    public int getColumnCount() {
        return sqlStatementContext instanceof SelectStatementContext ? ((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().size() : 0;
    }
    
    @Override
    public boolean isAutoIncrement(final int column) throws SQLException {
        return resultSetMetaData.isAutoIncrement(column);
}
…
    @Override
    public String getTableName(final int column) throws SQLException {
        String actualTableName = resultSetMetaData.getTableName(column);
        return shardingRule.getLogicTableNames(actualTableName).isEmpty() ? actualTableName : shardingRule.getLogicTableNames(actualTableName).iterator().next();
    }
    
    @Override
    public String getCatalogName(final int column) {
        return DefaultSchema.LOGIC_NAME;
}
…
}

这个类中实现比较简单,大多数方法逻辑就是调用构造函数传入的ResultSetMetaData对应方法,其它有一些则进行了物理表名与逻辑表名的转换和扩展projection的判断等。ResultSetMetaData的传入则是在AbstractResultSetAdapter中取的第一个内部底层ResultSet。

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter

 @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
    }

ParameterMetaData

image.png

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.metadata.ShardingParameterMetaData

/**
 * Sharding parameter meta data.
 */
@RequiredArgsConstructor
public final class ShardingParameterMetaData extends AbstractUnsupportedOperationParameterMetaData {
    
    private final SQLParserEngine sqlParserEngine;
    
    private final String sql;
    
    @Override
    public int getParameterCount() {
        return sqlParserEngine.parse(sql, true).getParameterCount();
    }
}

可以看到,该类只实现了ParameterMetaData接口中getParameterCount,通过解析引擎获取到参数的数量。这个类实例会在ShardingPreparedStatement的构造函数中进行创建,以供getParameterMetaData()方法返回。

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

/**
 * PreparedStatement that support sharding.
 */
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final String sql;
    
    @Getter
private final ParameterMetaData parameterMetaData;
…

private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
                                      final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        this.sql = sql;
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
…
    }

5.x中的变化

在5.x中,对JDBC的实现类做了进一步的合并与简化,并不是根据功能定义各JDBC实现类,而是统一成ShardingSphere*类,例如将4.1.1中ShardingDataSource、MasterSalveDataSource、ShadowDataSource、EncryptDataSource都统一成了ShardingSphereDataSource类;
ShardingConnection、MasterSlaveConnection、ShadowConnection、EncryptConnection都合并为ShardingSphereConnection类;
ShardingPreparedStatement、MasterSalvePreparedStatement、ShadowPreparedStatement等也类似,都合并成为ShardingSpherePreparedStatement类;

原来不同的功能逻辑都以装饰器方式,分散到了各个统一处理里环节(解析、路由、改写、执行、归并),由内核引擎负责触发执行,这种设计显然更加优雅,也降低了开发人员心智负担。

5.x中的DataSource类层次图


ShardingSphere5.x中DataSource接口实现类图

5.x中Connection的类层次图


ShardingSphere5.x中Connection接口实现类图

5.x中Statement的类层次图

ShardingSphere5.x中Statement接口实现类图

由于5.x还没有发布release版本,代码后续可能还会有大的变化,所以5.x的详细源码待release版本正式发布后再分析。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343