在Java中创建和销毁连接是一个费时的事情,因此多数情况下,被创建的连接在使用后不会立即被销毁,而是通过一个叫做连接池的容器缓存起来,以备下回使用。
连接池的实现需要考虑线程的并发问题,因此方便起见,使用Apache的一个开源连接池commons-pool
。其中常用的数据库连接池dbcp项目中就是基于commons-pool
实现的池。
下面记录一下使用这个通用连接池解决Thrift长连接的问题:
添加commons-pool2的maven依赖坐标
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
实现PooledObjectFactory接口
PooledObjectFactory
是一个范型接口,通过实现这个接口,可以约束池中的对象如何被创建、销毁、激活、反激活。
下面的程序是对Thrift
中的Protocol
对象实现的一个PooledObjectFactory
。
实现的逻辑是:
- 连接对象被创建后即打开连接
- 连接对象销毁前关闭连接
- 每次返回给池的borrower线程的时候保证返回的连接已被打开
- 对池中对象借还的过程不做打开关闭连接的操作(提高效率)
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.commons.pool2.impl.DefaultPooledObject;
public class ThriftPooledObjectFactory implements PooledObjectFactory<TProtocol> {
private String serverIP;
private int serverPort;
private int timeOut;
ThriftPooledObjectFactory(ThriftServiceConfigItem thriftServiceConfigItem){
this.serverIP = thriftServiceConfigItem.getServerIP();
this.serverPort = thriftServiceConfigItem.getServerPort();
this.timeOut = thriftServiceConfigItem.getTimeOut();
}
@Override
public PooledObject<TProtocol> makeObject() throws Exception {
TSocket tSocket = new TSocket(serverIP, serverPort, timeOut);
TTransport tTransport = new TFramedTransport(tSocket);
TProtocol tProtocol = new TBinaryProtocol(tTransport);
tProtocol.getTransport().open();
return new DefaultPooledObject<>(tProtocol);
}
@Override
public void destroyObject(PooledObject<TProtocol> p) throws Exception {
TProtocol tProtocol = p.getObject();
if(tProtocol.getTransport().isOpen()){
tProtocol.getTransport().close();
}
}
@Override
public boolean validateObject(PooledObject<TProtocol> p) {
// 这里确保返回的是已打开的连接
TProtocol tProtocol = p.getObject();
return tProtocol.getTransport().isOpen();
}
@Override
public void activateObject(PooledObject<TProtocol> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<TProtocol> p) throws Exception {
}
}
创建连接池
首先,定义一个ConnectionProvider
接口,这个接口里定义了对连接池中的对象借和还的操作:
import org.apache.thrift.protocol.TProtocol;
public interface ConnectionProvider {
/**
* 从连接池里获取一个TProtocol对象
* @return
*/
TProtocol getConnection();
/**
* 将一个TProtocol对象放回连接池
* @param tProtocol
*/
void returnConnection(TProtocol tProtocol);
}
实现这个接口:
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.protocol.TProtocol;
import org.springframework.beans.factory.InitializingBean;
public class ConnectionProviderImpl implements ConnectionProvider, InitializingBean {
// thrift服务器的配置
private ThriftServiceConfigItem thriftServiceConfigItem;
// 连接池
private GenericObjectPool<TProtocol> objectPool;
@Override
public TProtocol getConnection() {
try {
return objectPool.borrowObject();
} catch (Exception e) {
throw new RuntimeException("getConnection出现异常", e);
}
}
@Override
public void returnConnection(TProtocol tProtocol) {
try {
// 将对象放回对象池
objectPool.returnObject(tProtocol);
} catch (Exception e) {
throw new RuntimeException("returnConnection出现异常", e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
// 初始化连接工厂
ThriftPooledObjectFactory thriftPooledObjectFactory
= new ThriftPooledObjectFactory(thriftServiceConfigItem);
// 初始化连接池
objectPool = new GenericObjectPool<>(thriftPooledObjectFactory);
// TODO:设置连接池的参数,否则使用默认的配置
}
public void setThriftServiceConfigItem(ThriftServiceConfigItem thriftServiceConfigItem) {
this.thriftServiceConfigItem = thriftServiceConfigItem;
}
}
这里使用了Spring的IoC组件,并实现了InitializingBean接口,使得在Spring容器初始化的时候可以在该bean中创建连接池(使用了连接池的默认配置,必要的时候可以自定义参数)。并且在该bean中实现对池中对象的借还操作。
在Spring的配置文件中配置连接池:
<!-- Thrift配置 -->
<bean id="thriftConfig" class="xxx.xxx.ThriftServiceConfigItem" lazy-init="true">
<property name="serverIP" value="127.0.0.1" />
<property name="serverPort" value="8080" />
<property name="timeOut" value="5000" />
</bean>
<!-- Thrift连接池 -->
<bean id="connectionProvider" class="com.elong.web.suggest.city.utils.ConnectionProviderImpl">
<property name="thriftServiceConfigItem" ref="thriftConfig" />
</bean>
在程序中通过连接池使用连接:
// 从连接池中取得连接
TProtocol tProtocol = connectionProvider.getConnection();
// 使用取得的连接创建Client对象
ThriftProxy.Client client = new ThriftProxy.Client(tProtocol);
// do something with the client ...
// 将连接归还到池中
connectionProvider.returnConnection(tProtocol);
// do some other things ...