1 继承工厂类 BasePooledObjectFactory
public class ChannelPoolFactory extends BasePooledObjectFactory<ManagedChannel> {
private String host;
private int port;
private String domain;
private File cert;
public ChannelPoolFactory(String host, int port, String domain, String cert) {
this.host = host;
this.port = port;
this.domain = domain;
URL url = ChannelPoolFactory.class.getClassLoader().getResource(cert);
this.cert = new File(url.getFile());
}
@Override
public ManagedChannel create() throws SSLException, UnknownHostException {
SslContext sslContext = GrpcSslContexts.forClient().trustManager(cert).build();
InetAddress address = InetAddress.getByAddress(domain, InetAddress.getByName(host).getAddress());
ManagedChannel channel = NettyChannelBuilder.forAddress(new InetSocketAddress(address, port))
.negotiationType(NegotiationType.TLS).sslContext(sslContext).build();
return channel;
}
2 生成GenericObjectPoolConfig 和 Factory来构造 ObjectPool ;
public GenericObjectPool(PooledObjectFactory<T> factory,
GenericObjectPoolConfig config) {
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
setConfig(config);
startEvictor(getTimeBetweenEvictionRunsMillis());
}
3 在 GenericObjectPool 有borrowObject 和 returnObject方法,以借还池中对象。
在borrowObject中
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
if (!p.allocate()) {
p = null;
}
在create()中有
try {
p = factory.makeObject();
} catch (Exception e) {
createCount.decrementAndGet();
throw e;
}
在PooledObjectFactory中 makeObject()实现为:
@Override
public PooledObject<T> makeObject() throws Exception {
return wrap(create());
}
由此可知,如果首次borrowObject,将会调用工厂方法创建对象。
创建成功后将会执行
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
final void updateStatsBorrow(PooledObject<T> p, long waitTime) {
borrowedCount.incrementAndGet();
idleTimes.add(p.getIdleTimeMillis());
waitTimes.add(waitTime);
// lock-free optimistic-locking maximum
long currentMax;
do {
currentMax = maxBorrowWaitTimeMillis.get();
if (currentMax >= waitTime) {
break;
}
} while (!maxBorrowWaitTimeMillis.compareAndSet(currentMax, waitTime));
}
此操作将会更新参数。
4 returnObject
方法用来归还对象,此处一定要在finally代码中执行。否则每次执行的时候将会创建新对象,到最大对象数目,程序将出现异常。
@Override
public void returnObject(T obj) {
PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));
synchronized(p) {
final PooledObjectState state = p.getState();
if (state != PooledObjectState.ALLOCATED) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
} else {
p.markReturning(); // Keep from being marked abandoned
}
}
long activeTime = p.getActiveTimeMillis();
try {
factory.passivateObject(p);
} catch (Exception e1) {
swallowException(e1);
destroy(p);
ensureIdle(1, false);
updateStatsReturn(activeTime);
return;
}
int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (Exception e) {
swallowException(e);
}
} else {
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
从中我们可以看到一些destory object策略。