序
本文主要介绍一下mysql jdbc statement的queryTimeout及resultSet的next方法
executeQuery()
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/PreparedStatement.java
/**
* A Prepared SQL query is executed and its ResultSet is returned
*
* @return a ResultSet that contains the data produced by the query - never
* null
*
* @exception SQLException
* if a database access error occurs
*/
public java.sql.ResultSet executeQuery() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
MySQLConnection locallyScopedConn = this.connection;
checkForDml(this.originalSql, this.firstCharOfStmt);
this.batchedGeneratedKeys = null;
resetCancelledState();
implicitlyCloseAllOpenResults();
clearWarnings();
if (this.doPingInstead) {
doPingInstead();
return this.results;
}
setupStreamingTimeout(locallyScopedConn);
Buffer sendPacket = fillSendPacket();
String oldCatalog = null;
if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
oldCatalog = locallyScopedConn.getCatalog();
locallyScopedConn.setCatalog(this.currentCatalog);
}
//
// Check if we have cached metadata for this query...
//
CachedResultSetMetaData cachedMetadata = null;
if (locallyScopedConn.getCacheResultSetMetadata()) {
cachedMetadata = locallyScopedConn.getCachedMetaData(this.originalSql);
}
Field[] metadataFromCache = null;
if (cachedMetadata != null) {
metadataFromCache = cachedMetadata.fields;
}
locallyScopedConn.setSessionMaxRows(this.maxRows);
this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, metadataFromCache, false);
if (oldCatalog != null) {
locallyScopedConn.setCatalog(oldCatalog);
}
if (cachedMetadata != null) {
locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, cachedMetadata, this.results);
} else {
if (locallyScopedConn.getCacheResultSetMetadata()) {
locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, null /* will be created */, this.results);
}
}
this.lastInsertId = this.results.getUpdateID();
return this.results;
}
}
results为JDBC42ResultSet,这里通过executeInternal来执行
executeInternal
/**
* Actually execute the prepared statement. This is here so server-side
* PreparedStatements can re-use most of the code from this class.
*
* @param maxRowsToRetrieve
* the max number of rows to return
* @param sendPacket
* the packet to send
* @param createStreamingResultSet
* should a 'streaming' result set be created?
* @param queryIsSelectOnly
* is this query doing a SELECT?
* @param unpackFields
*
* @return the results as a ResultSet
*
* @throws SQLException
* if an error occurs.
*/
protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly,
Field[] metadataFromCache, boolean isBatch) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
try {
MySQLConnection locallyScopedConnection = this.connection;
this.numberOfExecutions++;
ResultSetInternalMethods rs;
CancelTask timeoutTask = null;
try {
if (locallyScopedConnection.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConnection.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConnection.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
}
if (!isBatch) {
statementBegins();
}
rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency,
createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch);
if (timeoutTask != null) {
timeoutTask.cancel();
locallyScopedConnection.getCancelTimer().purge();
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;
}
timeoutTask = null;
}
synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;
if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}
resetCancelledState();
throw cause;
}
}
} finally {
if (!isBatch) {
this.statementExecuting.set(false);
}
if (timeoutTask != null) {
timeoutTask.cancel();
locallyScopedConnection.getCancelTimer().purge();
}
}
return rs;
} catch (NullPointerException npe) {
checkClosed(); // we can't synchronize ourselves against async connection-close due to deadlock issues, so this is the next best thing for
// this particular corner case.
throw npe;
}
}
}
- 执行前创建timer并调度
timeoutTask = new CancelTask(this);
locallyScopedConnection.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
- 执行之后或异常
timeoutTask.cancel();
locallyScopedConnection.getCancelTimer().purge();
最后是通过MysqlIO的getResultSet方法来获取
MysqlIO.getResultSet
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/MysqlIO.java
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
Buffer packet; // The packet from the server
Field[] fields = null;
// Read in the column information
if (metadataFromCache == null /* we want the metadata from the server */) {
fields = new Field[(int) columnCount];
for (int i = 0; i < columnCount; i++) {
Buffer fieldPacket = null;
fieldPacket = readPacket();
fields[i] = unpackField(fieldPacket, false);
}
} else {
for (int i = 0; i < columnCount; i++) {
skipPacket();
}
}
// There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set
if (!isEOFDeprecated() ||
// if we asked to use cursor then there should be an OK packet here
(this.connection.versionMeetsMinimum(5, 0, 2) && callingStatement != null && isBinaryEncoded && callingStatement.isCursorRequired())) {
packet = reuseAndReadPacket(this.reusablePacket);
readServerStatusForResultSets(packet);
}
//
// Handle cursor-based fetch first
//
if (this.connection.versionMeetsMinimum(5, 0, 2) && this.connection.getUseCursorFetch() && isBinaryEncoded && callingStatement != null
&& callingStatement.getFetchSize() != 0 && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;
boolean usingCursor = true;
//
// Server versions 5.0.5 or newer will only open a cursor and set this flag if they can, otherwise they punt and go back to mysql_store_results()
// behavior
//
if (this.connection.versionMeetsMinimum(5, 0, 5)) {
usingCursor = (this.serverStatus & SERVER_STATUS_CURSOR_EXISTS) != 0;
}
if (usingCursor) {
RowData rows = new RowDataCursor(this, prepStmt, fields);
ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, resultSetType, resultSetConcurrency, isBinaryEncoded);
if (usingCursor) {
rs.setFetchSize(callingStatement.getFetchSize());
}
return rs;
}
}
RowData rowData = null;
if (!streamResults) {
rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
} else {
rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
this.streamingData = rowData;
}
ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
resultSetConcurrency, isBinaryEncoded);
return rs;
}
这里要注意this.connection.getUseCursorFetch(),必须这个为true,usingCursor才会是true,然后rows对象才会是RowDataCursor,否则是一口气拉取所有数据
CancelTask
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/StatementImpl.java
/**
* Thread used to implement query timeouts...Eventually we could be more
* efficient and have one thread with timers, but this is a straightforward
* and simple way to implement a feature that isn't used all that often.
*/
class CancelTask extends TimerTask {
SQLException caughtWhileCancelling = null;
StatementImpl toCancel;
Properties origConnProps = null;
String origConnURL = "";
long origConnId = 0;
CancelTask(StatementImpl cancellee) throws SQLException {
this.toCancel = cancellee;
this.origConnProps = new Properties();
Properties props = StatementImpl.this.connection.getProperties();
Enumeration<?> keys = props.propertyNames();
while (keys.hasMoreElements()) {
String key = keys.nextElement().toString();
this.origConnProps.setProperty(key, props.getProperty(key));
}
this.origConnURL = StatementImpl.this.connection.getURL();
this.origConnId = StatementImpl.this.connection.getId();
}
@Override
public void run() {
Thread cancelThread = new Thread() {
@Override
public void run() {
Connection cancelConn = null;
java.sql.Statement cancelStmt = null;
try {
MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
if (physicalConn != null) {
if (physicalConn.getQueryTimeoutKillsConnection()) {
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
physicalConn.realClose(false, false, true,
new MySQLStatementCancelledException(Messages.getString("Statement.ConnectionKilledDueToTimeout")));
} else {
synchronized (StatementImpl.this.cancelTimeoutMutex) {
if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
// All's fine
cancelConn = physicalConn.duplicate();
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + physicalConn.getId());
} else {
try {
cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
} catch (NullPointerException npe) {
// Log this? "Failed to connect to " + origConnURL + " and KILL query"
}
}
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
}
}
}
} catch (SQLException sqlEx) {
CancelTask.this.caughtWhileCancelling = sqlEx;
} catch (NullPointerException npe) {
// Case when connection closed while starting to cancel.
// We can't easily synchronize this, because then one thread can't cancel() a running query.
// Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
} finally {
if (cancelStmt != null) {
try {
cancelStmt.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
if (cancelConn != null) {
try {
cancelConn.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
CancelTask.this.toCancel = null;
CancelTask.this.origConnProps = null;
CancelTask.this.origConnURL = null;
}
}
};
cancelThread.start();
}
}
如果queryTimeoutKillsConnection则kill连接,否则发送kill query命令,同时标记状态
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
executeInternal里头会判断,然后抛出异常
synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;
if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}
resetCancelledState();
throw cause;
}
}
ResultSetImpl
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/ResultSetImpl.java
/**
* A ResultSet is initially positioned before its first row, the first call
* to next makes the first row the current row; the second call makes the
* second row the current row, etc.
*
* <p>
* If an input stream from the previous row is open, it is implicitly closed. The ResultSet's warning chain is cleared when a new row is read
* </p>
*
* @return true if the new current is valid; false if there are no more rows
*
* @exception SQLException
* if a database access error occurs
*/
public boolean next() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (this.onInsertRow) {
this.onInsertRow = false;
}
if (this.doingUpdates) {
this.doingUpdates = false;
}
boolean b;
if (!reallyResult()) {
throw SQLError.createSQLException(Messages.getString("ResultSet.ResultSet_is_from_UPDATE._No_Data_115"), SQLError.SQL_STATE_GENERAL_ERROR,
getExceptionInterceptor());
}
if (this.thisRow != null) {
this.thisRow.closeOpenStreams();
}
if (this.rowData.size() == 0) {
b = false;
} else {
this.thisRow = this.rowData.next();
if (this.thisRow == null) {
b = false;
} else {
clearWarnings();
b = true;
}
}
setRowPositionValidity();
return b;
}
}
这里调用了this.rowData.next()来获取数据,如果开启cursorFetch模式,则这个rowDate是RowDataCursor对象
RowDataCursor.next()
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/RowDataCursor.java
/**
* Returns the next row.
*
* @return the next row value
* @throws SQLException
* if a database error occurs
*/
public ResultSetRow next() throws SQLException {
if (this.fetchedRows == null && this.currentPositionInEntireResult != BEFORE_START_OF_ROWS) {
throw SQLError.createSQLException(Messages.getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"),
SQLError.SQL_STATE_GENERAL_ERROR, this.mysql.getExceptionInterceptor());
}
if (!hasNext()) {
return null;
}
this.currentPositionInEntireResult++;
this.currentPositionInFetchedRows++;
// Catch the forced scroll-passed-end
if (this.fetchedRows != null && this.fetchedRows.size() == 0) {
return null;
}
if ((this.fetchedRows == null) || (this.currentPositionInFetchedRows > (this.fetchedRows.size() - 1))) {
fetchMoreRows();
this.currentPositionInFetchedRows = 0;
}
ResultSetRow row = this.fetchedRows.get(this.currentPositionInFetchedRows);
row.setMetadata(this.metadata);
return row;
}
注意hasNext()方法
/**
* Returns true if another row exists.
*
* @return true if more rows
* @throws SQLException
* if a database error occurs
*/
public boolean hasNext() throws SQLException {
if (this.fetchedRows != null && this.fetchedRows.size() == 0) {
return false;
}
if (this.owner != null && this.owner.owningStatement != null) {
int maxRows = this.owner.owningStatement.maxRows;
if (maxRows != -1 && this.currentPositionInEntireResult + 1 > maxRows) {
return false;
}
}
if (this.currentPositionInEntireResult != BEFORE_START_OF_ROWS) {
// Case, we've fetched some rows, but are not at end of fetched block
if (this.currentPositionInFetchedRows < (this.fetchedRows.size() - 1)) {
return true;
} else if (this.currentPositionInFetchedRows == this.fetchedRows.size() && this.lastRowFetched) {
return false;
} else {
// need to fetch to determine
fetchMoreRows();
return (this.fetchedRows.size() > 0);
}
}
// Okay, no rows _yet_, so fetch 'em
fetchMoreRows();
return this.fetchedRows.size() > 0;
}
注意fetchMoreRows()方法
private void fetchMoreRows() throws SQLException {
if (this.lastRowFetched) {
this.fetchedRows = new ArrayList<ResultSetRow>(0);
return;
}
synchronized (this.owner.connection.getConnectionMutex()) {
boolean oldFirstFetchCompleted = this.firstFetchCompleted;
if (!this.firstFetchCompleted) {
this.firstFetchCompleted = true;
}
int numRowsToFetch = this.owner.getFetchSize();
if (numRowsToFetch == 0) {
numRowsToFetch = this.prepStmt.getFetchSize();
}
if (numRowsToFetch == Integer.MIN_VALUE) {
// Handle the case where the user used 'old' streaming result sets
numRowsToFetch = 1;
}
this.fetchedRows = this.mysql.fetchRowsViaCursor(this.fetchedRows, this.statementIdOnServer, this.metadata, numRowsToFetch,
this.useBufferRowExplicit);
this.currentPositionInFetchedRows = BEFORE_START_OF_ROWS;
if ((this.mysql.getServerStatus() & SERVER_STATUS_LAST_ROW_SENT) != 0) {
this.lastRowFetched = true;
if (!oldFirstFetchCompleted && this.fetchedRows.size() == 0) {
this.wasEmpty = true;
}
}
}
}
这里会先把firstFetchCompleted标记为true,然后通过this.mysql.fetchRowsViaCursor去拉取下一批数据
MysqlIO.fetchRowsViaCursor
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/MysqlIO.java
protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, long statementId, Field[] columnTypes, int fetchSize,
boolean useBufferRowExplicit) throws SQLException {
if (fetchedRows == null) {
fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
} else {
fetchedRows.clear();
}
this.sharedSendPacket.clear();
this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
this.sharedSendPacket.writeLong(statementId);
this.sharedSendPacket.writeLong(fetchSize);
sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);
ResultSetRow row = null;
while ((row = nextRow(columnTypes, columnTypes.length, true, ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
fetchedRows.add(row);
}
return fetchedRows;
}
这里注意sharedSendPacket指定了fetchSize,然后调用sendCommand方法把这批数据拉到reusablePacket,之后nextRow方法是从reusablePacket读取数据。
- sendCommand()
/**
* Send a command to the MySQL server If data is to be sent with command,
* it should be put in extraData.
*
* Raw packets can be sent by setting queryPacket to something other
* than null.
*
* @param command
* the MySQL protocol 'command' from MysqlDefs
* @param extraData
* any 'string' data for the command
* @param queryPacket
* a packet pre-loaded with data for the protocol (i.e.
* from a client-side prepared statement).
* @param skipCheck
* do not call checkErrorPacket() if true
* @param extraDataCharEncoding
* the character encoding of the extraData
* parameter.
*
* @return the response packet from the server
*
* @throws SQLException
* if an I/O error or SQL error occurs
*/
final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis)
throws SQLException {
this.commandCount++;
//
// We cache these locally, per-command, as the checks for them are in very 'hot' sections of the I/O code and we save 10-15% in overall performance by
// doing this...
//
this.enablePacketDebug = this.connection.getEnablePacketDebug();
this.readPacketSequence = 0;
int oldTimeout = 0;
if (timeoutMillis != 0) {
try {
oldTimeout = this.mysqlConnection.getSoTimeout();
this.mysqlConnection.setSoTimeout(timeoutMillis);
} catch (SocketException e) {
throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e,
getExceptionInterceptor());
}
}
try {
checkForOutstandingStreamingData();
// Clear serverStatus...this value is guarded by an external mutex, as you can only ever be processing one command at a time
this.oldServerStatus = this.serverStatus;
this.serverStatus = 0;
this.hadWarnings = false;
this.warningCount = 0;
this.queryNoIndexUsed = false;
this.queryBadIndexUsed = false;
this.serverQueryWasSlow = false;
//
// Compressed input stream needs cleared at beginning of each command execution...
//
if (this.useCompression) {
int bytesLeft = this.mysqlInput.available();
if (bytesLeft > 0) {
this.mysqlInput.skip(bytesLeft);
}
}
try {
clearInputStream();
//
// PreparedStatements construct their own packets, for efficiency's sake.
//
// If this is a generic query, we need to re-use the sending packet.
//
if (queryPacket == null) {
int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 + ((extraData != null) ? extraData.length() : 0) + 2;
if (this.sendPacket == null) {
this.sendPacket = new Buffer(packLength);
}
this.packetSequence = -1;
this.compressedPacketSequence = -1;
this.readPacketSequence = 0;
this.checkPacketSequence = true;
this.sendPacket.clear();
this.sendPacket.writeByte((byte) command);
if ((command == MysqlDefs.INIT_DB) || (command == MysqlDefs.QUERY) || (command == MysqlDefs.COM_PREPARE)) {
if (extraDataCharEncoding == null) {
this.sendPacket.writeStringNoNull(extraData);
} else {
this.sendPacket.writeStringNoNull(extraData, extraDataCharEncoding, this.connection.getServerCharset(),
this.connection.parserKnowsUnicode(), this.connection);
}
}
send(this.sendPacket, this.sendPacket.getPosition());
} else {
this.packetSequence = -1;
this.compressedPacketSequence = -1;
send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement
}
} catch (SQLException sqlEx) {
// don't wrap SQLExceptions
throw sqlEx;
} catch (Exception ex) {
throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex,
getExceptionInterceptor());
}
Buffer returnPacket = null;
if (!skipCheck) {
if ((command == MysqlDefs.COM_EXECUTE) || (command == MysqlDefs.COM_RESET_STMT)) {
this.readPacketSequence = 0;
this.packetSequenceReset = true;
}
returnPacket = checkErrorPacket(command);
}
return returnPacket;
} catch (IOException ioEx) {
preserveOldTransactionState();
throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx,
getExceptionInterceptor());
} catch (SQLException e) {
preserveOldTransactionState();
throw e;
} finally {
if (timeoutMillis != 0) {
try {
this.mysqlConnection.setSoTimeout(oldTimeout);
} catch (SocketException e) {
throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e,
getExceptionInterceptor());
}
}
}
}
注意这里的mysqlConnection如果timeoutMillis不为0则会setSoTimeout;但是fetchRowsViaCursors方法设置的为0,即不改变原来的设置。那么就是使用连接参数中设定的值。
小结
对于mysql jdbc来说:
- url设置useCursorFetch=true,且设置了statement的fetchSize,这样才真正的批量fetch,否则是全量拉取数据
- executeQuery方法在fetch模式下不会去拉取第一批数据,而是在resultSet的next方法中判断,根据需要拉取
- resultSet的next方法不受queryTimeout参数的影响,应该是受最底层的socketTimeout影响
- queryTimeout会新建一个cancelTask并使用Timer调度,如果超时了则执行cancel动作
如果queryTimeoutKillsConnection则kill连接,否则发送kill query命令,同时标记状态,然后抛出MySQLTimeoutException异常.
- executeQuery没有超时,则会cancel掉这个timerTask,然后调用purge从队列中移除