网上有很多讲ranger原理等相关的文章,策略同步相关出现最多的应该是这张图
这个流程图基本描述是准确的,但是不够细致,比如有以下几个问题并未能描述体现:
- 缓存文件丢失怎么办?缓存文件被恶意篡改怎么办?
- 更新的具体机制是什么?每次都是拉取一份覆盖cache中的吗?
...
关于策略周期问题
这里有一个小小的细节,ranger策略同步线程,并不是简单的30s启动一次更新,而是内置了一个简易的生产消费模型;
// 生产者任务
public final class DownloaderTask extends TimerTask {
private static final Log LOG = LogFactory.getLog(DownloaderTask.class);
private final DownloadTrigger timerTrigger = new DownloadTrigger();
private final BlockingQueue<DownloadTrigger> queue;
public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
this.queue = queue;
}
// 插件按固定周期30s往队列中塞入一个task:
@Override
public void run() {
try {
queue.put(timerTrigger);
timerTrigger.waitForCompletion();
} catch (InterruptedException excp) {
LOG.error("Caught exception. Exiting thread");
}
}
}
// 消费线程
public void run() {
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
}
while(true) {
try {
DownloadTrigger trigger = policyDownloadQueue.take();
loadPolicy();
trigger.signalCompletion();
} catch(InterruptedException excp) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
break;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
}
}
策略同步详解流程
结合流程图可以得知:
- 策略同步默认根据service版本号来判断是否存在更新,进而根据service与policy的映射关系来获取更新后的策略信息,如果没有则不做任何处理;同时可以通过配置ranger.admin.supports.policy.deltas=true来实现通过策略变化增量更新,这种方式默认是关闭的,需要手动开启,通过直接读取policy_change_log表来读取增量策略;
- 缓存的json文件只有在rangeradmin意外终止且plugin发生重启时候生效,其他情况并不会造成影响;admin终止只是策略不在更新而已;ranger本身是支持策略的导入导出的,如果遇见这种情况应该讲备份的策略信息先倒入到admin再重新安装;