DruidDataSource详解(二)

DruidDataSource源码解析

获取连接,获取连接时会先根据配置参数初始化连接池。如果配置有maxWaitMillis则等待maxWaitMillis时间后仍不能获取连接则抛出GetConnectionTimeoutException异常。具体代码如下:
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
    init();//初始化连接池
    if (filters.size() > 0) {
        FilterChainImpl filterChain = new FilterChainImpl(this);
        return filterChain.dataSource_connect(this, maxWaitMillis);
    } else {
        return getConnectionDirect(maxWaitMillis);
    }
} 
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
    int notFullTimeoutRetryCnt = 0;
    for (;;) {
        // handle notFullTimeoutRetry
        DruidPooledConnection poolableConnection;
        try {
           //等待一定时间后,仍旧获取不到连接则抛出GetConnectionTimeoutException异常
            poolableConnection = getConnectionInternal(maxWaitMillis);/
        } catch (GetConnectionTimeoutException ex) {
         ....
        }
       //testOnborrow是否为true,如果是则检验连接的有效性,如果为无效连接则抛弃该连接并重新获取
        if (isTestOnBorrow()) {
            boolean validate = testConnectionInternal(poolableConnection.getConnection());
            if (!validate) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip not validate connection.");
                }
                Connection realConnection = poolableConnection.getConnection();
                discardConnection(realConnection);
                continue;
            }
        } else {
            Connection realConnection = poolableConnection.getConnection();
            if (realConnection.isClosed()) {
                discardConnection(null); // 传入null,避免重复关闭
                continue;
            }
             //如果testWhileIdle是true并且空闲时间大于timeBetweenEvictionRunsMillis则验证连接是否有效,如果无效关闭连接
            if (isTestWhileIdle()) {
                final long currentTimeMillis = System.currentTimeMillis();
                final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
                final long idleMillis = currentTimeMillis - lastActiveTimeMillis;
                long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
                if (timeBetweenEvictionRunsMillis <= 0) {
                    timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
                }
                if (idleMillis >= timeBetweenEvictionRunsMillis) {
                    boolean validate = testConnectionInternal(poolableConnection.getConnection());
                    if (!validate) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("skip not validate connection.");
                        }
                        discardConnection(realConnection);
                        continue;
                    }
                }
            }
        }
        if (isRemoveAbandoned()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            poolableConnection.setConnectStackTrace(stackTrace);
            poolableConnection.setConnectedTimeNano();
            poolableConnection.setTraceEnable(true);

            synchronized (activeConnections) {
                activeConnections.put(poolableConnection, PRESENT);
            }
        }

        if (!this.isDefaultAutoCommit()) {
            poolableConnection.setAutoCommit(false);
        }

        return poolableConnection;
    }
}
初始化线程池
    public void init() throws SQLException {
     if (inited) {
        return;
     }
    final ReentrantLock lock = this.lock;//默认非公平锁,通过DruidDataSource构造函数可以修改锁策略   
    try {
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        throw new SQLException("interrupt", e);
    }
    boolean init = false;
    try {
        if (inited) {
            return;
        }
        init = true;

        initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
        this.id = DruidDriver.createDataSourceId();
        if (this.id > 1) {
            long delta = (this.id - 1) * 100000;
            this.connectionIdSeed.addAndGet(delta);
            this.statementIdSeed.addAndGet(delta);
            this.resultSetIdSeed.addAndGet(delta);
            this.transactionIdSeed.addAndGet(delta);
        }

        if (this.jdbcUrl != null) {
            this.jdbcUrl = this.jdbcUrl.trim();
            initFromWrapDriverUrl();
        }

        if (this.dbType == null || this.dbType.length() == 0) {
            this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
        }

        for (Filter filter : filters) {
            filter.init(this);
        }

        if (JdbcConstants.MYSQL.equals(this.dbType) || //
            JdbcConstants.MARIADB.equals(this.dbType)) {
            boolean cacheServerConfigurationSet = false;
            if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                cacheServerConfigurationSet = true;
            } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                cacheServerConfigurationSet = true;
            }
            if (cacheServerConfigurationSet) {
                this.connectProperties.put("cacheServerConfiguration", "true");
            }
        }
        .....
        if (this.driverClass != null) {
            this.driverClass = driverClass.trim();
        }

        initFromSPIServiceLoader();

        if (this.driver == null) {
            if (this.driverClass == null || this.driverClass.isEmpty()) {
                this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
            }

            if (MockDriver.class.getName().equals(driverClass)) {
                driver = MockDriver.instance;
            } else {
                driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
            }
        } else {
            if (this.driverClass == null) {
                this.driverClass = driver.getClass().getName();
            }
        }

        initCheck();//如果为oracle数据库或者DB2数据库则初始化check

        initExceptionSorter();//初始化ExceptionSorter
        initValidConnectionChecker();//初始化ConnectionChecker
        validationQueryCheck();

        if (isUseGlobalDataSourceStat()) {
            dataSourceStat = JdbcDataSourceStat.getGlobal();
            if (dataSourceStat == null) {
                dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
                JdbcDataSourceStat.setGlobal(dataSourceStat);
            }
            if (dataSourceStat.getDbType() == null) {
                dataSourceStat.setDbType(this.getDbType());
            }
        } else {
            dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
        }
        dataSourceStat.setResetStatEnable(this.resetStatEnable);

        connections = new DruidConnectionHolder[maxActive];

        SQLException connectError = null;

        try {
            // 按照配置参数initialSize初始化连接池
            for (int i = 0, size = getInitialSize(); i < size; ++i) {
                // createPhysicalConnection() 创建物理连接,后边我们会看到这个物理连接是怎么创建的,会让创建连接线程等待,当
                Connection conn = createPhysicalConnection();
                DruidConnectionHolder holder = new DruidConnectionHolder(this, conn);
                connections[poolingCount] = holder;
                incrementPoolingCount();//增加连接池可用连接数量即poolingCount++
            }

            if (poolingCount > 0) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            }
        } catch (SQLException ex) {
            LOG.error("init datasource error, url: " + this.getUrl(), ex);
            connectError = ex;
        }
        createAndLogThread();//如果配置timeBetweenLogStatsMillis则创建log线程每隔指定时间记录连接池状态
        createAndStartCreatorThread();//创建并且启动线程在存在等待获取连接线程的的时候,创建连接。
        createAndStartDestroyThread();//创建并启动销毁线程
        initedLatch.await();
        initedTime = new Date();
        registerMbean();
        if (connectError != null && poolingCount == 0) {
            throw connectError;
        }
    } catch (SQLException e) {
        LOG.error("dataSource init error", e);
        throw e;
    } catch (InterruptedException e) {
        throw new SQLException(e.getMessage(), e);
    } finally {
        inited = true;
        lock.unlock();

        if (init && LOG.isInfoEnabled()) {
            LOG.info("{dataSource-" + this.getID() + "} inited");
        }
    }
}
创建物理连接,可以看到创建物理连接很简单,就是根据JDBC去创建connection
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
    Connection conn;
    if (getProxyFilters().size() == 0) {
        conn = getDriver().connect(url, info);
    } else {
        conn = new FilterChainImpl(this).connection_connect(info);
    }

    createCount.incrementAndGet();

    return conn;
}
下面再看看createAndStartCreatorThread 是什么东西,可以看到DruidDataSource会启动一个线程当活动连接数+池中的连接数小于maxActive时并且池中连接数小于等待获取连接的线程数时会创建新的物理连接并放到连接池中。
 protected void createAndStartCreatorThread() {
    if (createScheduler == null) {
        String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
        createConnectionThread = new CreateConnectionThread(threadName);
        createConnectionThread.start();
        return;
    }

    initedLatch.countDown();
}
public class CreateConnectionThread extends Thread {

    public CreateConnectionThread(String name){
        super(name);
        this.setDaemon(true);
    }

    public void run() {
        initedLatch.countDown();

        int errorCount = 0;
        for (;;) {
            // addLast
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e2) {
                break;
            }

            try {
                // 必须存在线程等待,才创建连接
                if (poolingCount >= notEmptyWaitThreadCount) {
                    empty.await();
                }

                // 防止创建超过maxActive数量的连接
                if (activeCount + poolingCount >= maxActive) {
                    empty.await();
                    continue;
                }

            } catch (InterruptedException e) {
                lastCreateError = e;
                lastErrorTimeMillis = System.currentTimeMillis();
                break;
            } finally {
                lock.unlock();
            }

            Connection connection = null;

            try {
                connection = createPhysicalConnection();
            } catch (SQLException e) {
                LOG.error("create connection error, url: " + jdbcUrl, e);

                errorCount++;

                if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                    if (breakAfterAcquireFailure) {
                        break;
                    }

                    try {
                        Thread.sleep(timeBetweenConnectErrorMillis);
                    } catch (InterruptedException interruptEx) {
                        break;
                    }
                }
            } catch (RuntimeException e) {
                LOG.error("create connection error", e);
                continue;
            } catch (Error e) {
                LOG.error("create connection error", e);
                break;
            }

            if (connection == null) {
                continue;
            }

            put(connection);

            errorCount = 0; // reset errorCount
        }
    }
}
最后我们再看下createAndStartDestroyThread 做了什么事情, createAndStartDestroyThread该方法有两种回收连接池的策略,第一种是通过作业调度第二种是通过Thread.sleep方法。两种策略按timeBetweenEvictionRunsMillis间隔调度执行destoryTask;destoryTask的 shrink(true) 主要根据配置参数判断线程池中连接的空闲时间idleMillis 是否大于等于 minEvictableIdleTimeMillis,如果是则关闭该连接。removeAbandoned()方法主要是在配置连接泄露处理策略时,关闭泄露的连接
       protected void createAndStartDestroyThread() {
    destoryTask = new DestroyTask();

    if (destroyScheduler != null) {
        long period = timeBetweenEvictionRunsMillis;
        if (period <= 0) {
            period = 1000;
        }
        destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period,
                                                                      TimeUnit.MILLISECONDS);
        initedLatch.countDown();
        return;
    }

    String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
    destroyConnectionThread = new DestroyConnectionThread(threadName);
    destroyConnectionThread.start();
}
 public class DestroyTask implements Runnable {

    @Override
    public void run() {
        shrink(true);

        if (isRemoveAbandoned()) {
            removeAbandoned();
        }
    }

}
public class DestroyConnectionThread extends Thread {

    public DestroyConnectionThread(String name){
        super(name);
        this.setDaemon(true);
    }

    public void run() {
        initedLatch.countDown();

        for (;;) {
            // 从前面开始删除
            try {
                if (closed) {
                    break;
                }

                if (timeBetweenEvictionRunsMillis > 0) {
                    Thread.sleep(timeBetweenEvictionRunsMillis);
                } else {
                    Thread.sleep(1000); //
                }

                if (Thread.interrupted()) {
                    break;
                }

                destoryTask.run();
            } catch (InterruptedException e) {
                break;
            }
        }
}
removeAbandoned处理逻辑
public int removeAbandoned() {
    int removeCount = 0;

    long currrentNanos = System.nanoTime();

    List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();

    synchronized (activeConnections) {
        Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();

        for (; iter.hasNext();) {
            DruidPooledConnection pooledConnection = iter.next();

            if (pooledConnection.isRunning()) {
                continue;
            }

            long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);

            if (timeMillis >= removeAbandonedTimeoutMillis) {
                iter.remove();
                pooledConnection.setTraceEnable(false);
                abandonedList.add(pooledConnection);
            }
        }
    }

    if (abandonedList.size() > 0) {
        for (DruidPooledConnection pooledConnection : abandonedList) {
            synchronized (pooledConnection) {
                if (pooledConnection.isDisable()) {
                    continue;
                }
            }
            
            JdbcUtils.close(pooledConnection);
            pooledConnection.abandond();
            removeAbandonedCount++;
            removeCount++;

            if (isLogAbandoned()) {
                StringBuilder buf = new StringBuilder();
                buf.append("abandon connection, owner thread: ");
                buf.append(pooledConnection.getOwnerThread().getName());
                buf.append(", connected time nano: ");
                buf.append(pooledConnection.getConnectedTimeNano());
                buf.append(", open stackTrace\n");

                StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
                for (int i = 0; i < trace.length; i++) {
                    buf.append("\tat ");
                    buf.append(trace[i].toString());
                    buf.append("\n");
                }

                LOG.error(buf.toString());
            }
        }
    }

    return removeCount;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,169评论 11 349
  • 转自:http://blog.csdn.net/jackfrued/article/details/4492194...
    王帅199207阅读 8,493评论 3 93
  • ˇ 渔船在海面上起伏着。 “我想尿尿。”阿冒说。 阿冒站在船沿把着小坞舱小便,玲姨把着阿冒的小手。 “男人真好...
    大雨盛阅读 226评论 0 0
  • 青衫不华阅读 65评论 0 0