JDBC操作MySQL(3)—查询(普通、流式、游标)

问题

通过JDBC对MySQL进行数据查询时,有个很容易踩的坑,以下面代码为例:

    public static void selectNormal() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
        PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
        //statement.setFetchSize(100);
        ResultSet resultSet = statement.executeQuery();
        
        while(resultSet.next()){
            System.out.println(resultSet.getString(1));
        }
        resultSet.close();
        statement.close();
        connection.close();
    }

这段代码在查询结果数据条数较大时则会出现内存溢出OOM问题:

为了更容易模拟错误,可将jvm内存设置较小,增加jvm参数 -Xms16m -Xmx16m

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
   at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2213)
   at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1992)
   at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3413)
   at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:471)
   at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3115)
   at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2344)
   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2739)
   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
   at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
   at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
   at com.cmbc.dap.dao.test.MysqlBatchTest.selectNormal(MysqlBatchTest.java:46)
   at com.cmbc.dap.dao.test.MysqlBatchTest.main(MysqlBatchTest.java:13)

你可能会说设置fetchSize即可,但不幸的是,将上述代码设置fetchSize代码注释打开,依然会报出同样错误,fetchSize并没有生效,MySQL仍然一股脑将所有数据加载到内存,直到撑爆。
对于大数据量下查询,如果才能保证应用程序正确运行呢?寻根溯源,我们还是通过查看MySQL驱动源码来找答案。

MySQL驱动 查询实现原理

com.mysql.jdbc.PreparedStatement

public java.sql.ResultSet executeQuery() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            MySQLConnection locallyScopedConn = this.connection;

            checkForDml(this.originalSql, this.firstCharOfStmt);

            this.batchedGeneratedKeys = null;

            resetCancelledState();

            implicitlyCloseAllOpenResults();

            clearWarnings();

            if (this.doPingInstead) {
                doPingInstead();

                return this.results;
            }

            setupStreamingTimeout(locallyScopedConn);

            Buffer sendPacket = fillSendPacket();

            String oldCatalog = null;

            if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
                oldCatalog = locallyScopedConn.getCatalog();
                locallyScopedConn.setCatalog(this.currentCatalog);
            }

            //
            // Check if we have cached metadata for this query...
            //
            CachedResultSetMetaData cachedMetadata = null;
            if (locallyScopedConn.getCacheResultSetMetadata()) {
                cachedMetadata = locallyScopedConn.getCachedMetaData(this.originalSql);
            }

            Field[] metadataFromCache = null;

            if (cachedMetadata != null) {
                metadataFromCache = cachedMetadata.fields;
            }

            locallyScopedConn.setSessionMaxRows(this.maxRows);

            this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, metadataFromCache, false);

            if (oldCatalog != null) {
                locallyScopedConn.setCatalog(oldCatalog);
            }

            if (cachedMetadata != null) {
                locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, cachedMetadata, this.results);
            } else {
                if (locallyScopedConn.getCacheResultSetMetadata()) {
                    locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, null /* will be created */, this.results);
                }
            }
            this.lastInsertId = this.results.getUpdateID();

            return this.results;
        }
    }

上面代码中我们特别注意createStreamingResultSet方法,此方法返回是否创建流式结果集,即采用流式查询。流式查询与普通查询不同之处在于并不是一次性将所有数据加载到内存,在调用next()方法时,MySQL驱动只从网络数据流获取到1条数据,然后返回应用,这样就避免了内存溢出问题。我们看下该方法的实现:

 /**
     * We only stream result sets when they are forward-only, read-only, and the
     * fetch size has been set to Integer.MIN_VALUE
     * 
     * @return true if this result set should be streamed row at-a-time, rather
     *         than read all at once.
     */
    protected boolean createStreamingResultSet() {
        return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
                && (this.fetchSize == Integer.MIN_VALUE));
    }

可以看到满足这三个条件即会采用流式查询,前面两个其实就是MySQL创建Statement的默认的游标类型,在PreparedStatement类我们可以看到

    private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;
    private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;
    public java.sql.PreparedStatement prepareStatement(String sql)
            throws SQLException {
        return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE,
                DEFAULT_RESULT_SET_CONCURRENCY);
    }

因此创建statement,不指定后面两个参数默认也是满足流式查询的条件的。

PreparedStatement statement = connection.prepareStatement("select * from test");

而第三个条件却很奇怪,fetchSize必须为Integer.MIN_VALUE即-2147483648,而这样一个负数是MySQL自定义的的特殊含义值,在JDBC接口规范并无此说明。至此我们就知道了如何使用流式查询了,修改代码如下:

public static void selectStream() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
        PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
        statement.setFetchSize(Integer.MIN_VALUE);
        
        long begin = System.currentTimeMillis();
        ResultSet resultSet = statement.executeQuery();
        
        while(resultSet.next()){
            //System.out.println(resultSet.getString(1));
        }
        long end = System.currentTimeMillis();
        System.out.println("selectStream span time="+(end-begin) + "ms");
        
        resultSet.close();
        statement.close();
        connection.close();
    }

运行,果然解决了OOM问题,无论数据量多大,都可以正常查询了。

在StatementImpl中有enableStreamingResults()方法,该方法其实就是设置这三个条件的,网上很多文章介绍此种方式开启流式查询,但笔者不太推荐这种方式,因为需要强制转换为MySQL驱动中的StatementImpl类,这其实已经并非JDBC的标准接口。

     public void enableStreamingResults() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {
            this.originalResultSetType = this.resultSetType;
            this.originalFetchSize = this.fetchSize;

            setFetchSize(Integer.MIN_VALUE);
            setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
        }
    }

至此,我们已经知道如何使用流式查询解决大数据查询时的OOM问题,但流式查询的实现原理我们还不清楚,因此我们继续看源代码一探究竟,为了更方便展示方法调用层次,我画了一个调用序列图:



我们直接看com.mysql.jdbc.MysqlIO中的getResultSet方法:

/**
     * Build a result set. Delegates to buildResultSetWithRows() to build a
     * JDBC-version-specific ResultSet, given rows as byte data, and field
     * information.
     *
     * @param callingStatement DOCUMENT ME!
     * @param columnCount the number of columns in the result set
     * @param maxRows the maximum number of rows to read (-1 means all rows)
     * @param resultSetType (TYPE_FORWARD_ONLY, TYPE_SCROLL_????)
     * @param resultSetConcurrency the type of result set (CONCUR_UPDATABLE or
     *        READ_ONLY)
     * @param streamResults should the result set be read all at once, or
     *        streamed?
     * @param catalog the database name in use when the result set was created
     * @param isBinaryEncoded is this result set in native encoding?
     * @param unpackFieldInfo should we read MYSQL_FIELD info (if available)?
     *
     * @return a result set
     *
     * @throws SQLException if a database access error occurs
     */
    protected ResultSetImpl getResultSet(StatementImpl callingStatement,
        long columnCount, int maxRows, int resultSetType,
        int resultSetConcurrency, boolean streamResults, String catalog,
        boolean isBinaryEncoded, Field[] metadataFromCache)
        throws SQLException {
        Buffer packet; // The packet from the server
        Field[] fields = null;

        // Read in the column information

        if (metadataFromCache == null /* we want the metadata from the server */) {
            fields = new Field[(int) columnCount];

            for (int i = 0; i < columnCount; i++) {
                Buffer fieldPacket = null;

                fieldPacket = readPacket();
                fields[i] = unpackField(fieldPacket, false);
            }
        } else {
            for (int i = 0; i < columnCount; i++) {
                skipPacket();
            }
        }

        packet = reuseAndReadPacket(this.reusablePacket);
        
        readServerStatusForResultSets(packet);

        //
        // Handle cursor-based fetch first
        //

        if (this.connection.versionMeetsMinimum(5, 0, 2)
                && this.connection.getUseCursorFetch()
                && isBinaryEncoded
                && callingStatement != null
                && callingStatement.getFetchSize() != 0
                && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
            ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

            boolean usingCursor = true;

            //
            // Server versions 5.0.5 or newer will only open
            // a cursor and set this flag if they can, otherwise
            // they punt and go back to mysql_store_results() behavior
            //

            if (this.connection.versionMeetsMinimum(5, 0, 5)) {
                usingCursor = (this.serverStatus &
                        SERVER_STATUS_CURSOR_EXISTS) != 0;
            }

            if (usingCursor) {
                RowData rows = new RowDataCursor(
                    this,
                    prepStmt,
                    fields);

                ResultSetImpl rs = buildResultSetWithRows(
                    callingStatement,
                    catalog,
                    fields,
                    rows, resultSetType, resultSetConcurrency, isBinaryEncoded);

                if (usingCursor) {
                    rs.setFetchSize(callingStatement.getFetchSize());
                }

                return rs;
            }
        }

        RowData rowData = null;

        if (!streamResults) {
            rowData = readSingleRowSet(columnCount, maxRows,
                    resultSetConcurrency, isBinaryEncoded,
                    (metadataFromCache == null) ? fields : metadataFromCache);
        } else {
            rowData = new RowDataDynamic(this, (int) columnCount,
                    (metadataFromCache == null) ? fields : metadataFromCache,
                    isBinaryEncoded);
            this.streamingData = rowData;
        }

        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,
                (metadataFromCache == null) ? fields : metadataFromCache,
            rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);



        return rs;
    }

三种查询方式

上代码可以看到,MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:

  • 1. RowDataStatic 静态结果集,默认的查询方式,普通查询
  • 2. RowDataDynamic 动态结果集,流式查询
  • 3. RowDataCursor 游标结果集,服务器端基于游标查询

简单看下这几个类的实现代码:

方式1 普通查询
 private RowData readSingleRowSet(long columnCount, int maxRows, int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields) throws SQLException {
        RowData rowData;
        ArrayList<ResultSetRow> rows = new ArrayList<ResultSetRow>();

        boolean useBufferRowExplicit = useBufferRowExplicit(fields);

        // Now read the data
        ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null);

        int rowCount = 0;

        if (row != null) {
            rows.add(row);
            rowCount = 1;
        }

        while (row != null) {
            row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null);

            if (row != null) {
                if ((maxRows == -1) || (rowCount < maxRows)) {
                    rows.add(row);
                    rowCount++;
                }
            }
        }

        rowData = new RowDataStatic(rows);

        return rowData;
    }

可以看出,此种方式其实就是一次性把查询的所有结果集都保存在本地数组中,所以如果数据量太大,超过jvm内存,则会报文中篇头所示的OOM错误。

方式2 流式查询

每次只获取一条结果集,待应用处理完再次调用next()时,继续获取下一条数据,由代码可以看出流式查询获取数据的方法与普通查询其实是一样的( this.io.nextRow),不同之处在与普通查询时先获取所有数据,然后交给应用处理(next方法其实都是从内存数组遍历),而流式查询时逐条获取,待应用处理完再去拿下一条数据。
com.mysql.jdbc.RowDataDynamic

private void nextRecord() throws SQLException {

        try {
            if (!this.noMoreRows) {
                this.nextRow = this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded, java.sql.ResultSet.CONCUR_READ_ONLY, true,
                        this.useBufferRowExplicit, true, null);

                if (this.nextRow == null) {
                    this.noMoreRows = true;
                    this.isAfterEnd = true;
                    this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);

                    if (this.index == -1) {
                        this.wasEmpty = true;
                    }
                }
            } else {
                this.nextRow = null;
                this.isAfterEnd = true;
            }
        } catch (SQLException sqlEx) {
            if (sqlEx instanceof StreamingNotifiable) {
                ((StreamingNotifiable) sqlEx).setWasStreamingResults();
            }

            // There won't be any more rows
            this.noMoreRows = true;

            // don't wrap SQLExceptions
            throw sqlEx;
        } catch (Exception ex) {
            String exceptionType = ex.getClass().getName();
            String exceptionMessage = ex.getMessage();

            exceptionMessage += Messages.getString("RowDataDynamic.7");
            exceptionMessage += Util.stackTraceToString(ex);

            SQLException sqlEx = SQLError.createSQLException(
                    Messages.getString("RowDataDynamic.8") + exceptionType + Messages.getString("RowDataDynamic.9") + exceptionMessage,
                    SQLError.SQL_STATE_GENERAL_ERROR, this.exceptionInterceptor);
            sqlEx.initCause(ex);

            throw sqlEx;
        }
    }
方式3 RowDataCursor 基于游标

从代码我们惊喜的发现,MySQL其实是支持游标查询的,这种方式下MySQL服务器端一次只发送fetchSize条数据,MySQL驱动会获取完fetchSize条数据后返回给应用,应用处理完继续调用next()时,继续发送fetch命令,继续获取下一批次fetchSize条数据。

 protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, long statementId, Field[] columnTypes, int fetchSize,
            boolean useBufferRowExplicit) throws SQLException {

        if (fetchedRows == null) {
            fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
        } else {
            fetchedRows.clear();
        }

        this.sharedSendPacket.clear();

        this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
        this.sharedSendPacket.writeLong(statementId);
        this.sharedSendPacket.writeLong(fetchSize);

        sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);

        ResultSetRow row = null;

        while ((row = nextRow(columnTypes, columnTypes.length, true, ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
            fetchedRows.add(row);
        }

        return fetchedRows;
    }

我们看下基于游标的查询测试代码:(设置useCursorFetch=true,指定fetchSize)

public static void selectStreamWithUseCursorFetch() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useCursorFetch=true", "root", "123456");
        PreparedStatement statement = connection.prepareStatement("select * from test");
        statement.setFetchSize(10);
        
        long begin = System.currentTimeMillis();
        ResultSet resultSet = statement.executeQuery();
        
        
        while(resultSet.next()){
            //System.out.println(resultSet.getString(1));
        }
        
        long end = System.currentTimeMillis();
        System.out.println("selectStreamWithUseCursorFetch span time="+(end-begin) + "ms");
        resultSet.close();
        statement.close();
        connection.close();
    }

运行发现大数据量时这种方式也可正常运行。应用指定每次查询获取的条数fetchSize,MySQL服务器每次只查询指定条数的数据,因此单次查询相比与前面两种方式占用MySQL时间较短。但由于MySQL方不知道客户端什么时候将数据消费完,MySQL需要建立一个临时空间来存放每次查询出的数据,大数据量时MySQL服务器IOPS、磁盘占用都会飙升,而且需要与服务器进行更多次的网络通讯,因此最终查询效率是不如流式查询的。

本地测试查询100w数据,方式2与方式3执行时间对比:

selectStreamWithUseCursorFetch span time=507ms
selectStream span time=155ms

从结果上看,由于基于游标方式,服务器端需要更多额外处理,查询性能更低些,对于大数据量一般情况下推荐基于动态结果集的流式查询。

总结:

本文通过对MySQL驱动中查询模块的源码进行剖析,可知MySQL支持三种不同的查询方式,分别适用不同的场景,了解其各自优缺点后,才能在实际项目中正确使用。
一、普通查询

  • 优点:应用代码简单,数据量较小时操作速度快。
  • 缺点:数据量大时会出现OOM问题。

二、流式查询

  • 优点:大数据量时不会有OOM问题。
  • 缺点:占用数据库时间更长,导致网络拥塞的可能性较大。

三、游标查询

  • 优点:大数据量时不会有OOM问题,相比流式查询对数据库单次占用时间较短。
  • 缺点:相比流式查询,对服务端资源消耗更大,响应时间更长。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容

  • JDBC基础知识 一、采用JDBC访问数据库的基本步骤: A.载入JDBC驱动程序 B.定义连接URL ...
    java日记阅读 3,832评论 0 20
  • 今天看到一位朋友写的mysql笔记总结,觉得写的很详细很用心,这里转载一下,供大家参考下,也希望大家能关注他原文地...
    信仰与初衷阅读 4,723评论 0 30
  • 一、前言 MySQL 是目前使用比较广泛的关系型数据库,而从数据库里面根据条件查询数据到内存的情况想必大家在日常项...
    阿里加多阅读 9,327评论 2 3
  • 本文主要内容1、JDBC2、DBUtils 01JDBC概念和数据库驱动程序 A: JDBC概念和数据库驱动程序a...
    乘风破浪的姐姐阅读 798评论 0 6
  • 今天你去操场跑步了,我看你跑的大汗淋漓,也不愿意停下来。我拼命的喊你,让你停下来休息,但你好像听不见我的声音,我只...
    黄小黑阅读 426评论 3 7