首先,集群怎么加载ranger插件:
1.首先是hdfs的hdfs-site.xml中配置了以下配置:
<name>dfs.namenode.inode.attributes.provider.class</name>
<value>org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer</value>
2.加载过程:
首先,在ranger-hdfs-plugin-shim这个包下有RangerHdfsAuthorizer这个类。在Namenode启动时会将这个类动态加载进入虚拟机,并实例化包ranger-hdfs-plugin下的RangerHdfsAuthorizerorizer。
//TODO RANGER_PLUGIN_TYPE : hdfs
rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
@SuppressWarnings("unchecked") //TODO RANGER_HDFS_AUTHORIZER_IMPL_CLASSNAME : org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer
Class<INodeAttributeProvider> cls = (Class<INodeAttributeProvider>) Class.forName(RANGER_HDFS_AUTHORIZER_IMPL_CLASSNAME,
true, rangerPluginClassLoader);
//TODO 激活插件
activatePluginClassLoader();
// TODO 获取实例
rangerHdfsAuthorizerImpl = cls.newInstance();
getInstance方法体:
me = ret = AccessController.doPrivileged(
new PrivilegedExceptionAction<RangerPluginClassLoader>(){
public RangerPluginClassLoader run() throws Exception {
return new RangerPluginClassLoader(pluginType,pluginClass);
}
}
);
}
类加载:
super(RangerPluginClassLoaderUtil.getInstance().getPluginFilesForServiceTypeAndPluginclass(pluginType, pluginClass), null);
componentClassLoader = AccessController.doPrivileged(
new PrivilegedAction<MyClassLoader>() {
public MyClassLoader run() {
return new MyClassLoader(Thread.currentThread().getContextClassLoader());
}
}
);
插件激活:
Thread.currentThread().setContextClassLoader(this);
其次,插件初始化:
在ranger-hdfs-plugin包下:RangerHdfsPlugin 继承 RangerBasePlugin,首先是初始化方法init,调用RangerBasePlugin 的init方法,主要实现以下操作:
super.init();
父类init:
//TODO 第一步:
// TODO 清空 refresher(策略刷新)、serviceName(服务名称)、policyEngine(策略引擎)
cleanup();
//TODO 第二步:加载配置,设置一些值
RangerConfiguration configuration = RangerConfiguration.getInstance();
//TODO hdfs 加载配置文件ranger-hdfs-audit.xml ranger-hdfs-security.xml
configuration.addResourcesForServiceType(serviceType);
// TODO appid:hdfs 初始化审计
configuration.initAudit(appId);
//TODO ranger 插件前缀
String propertyPrefix = "ranger.plugin." + serviceType;
/**
* 以下配置都是读取的配置文件 上面configuration加载的配置文件
*/
// TODO 刷新器定期更新策略的轮询间隔时间。Ranger 插件会定期从Ranger Admin拉取新的策略信息,并保存在Hdfs缓存中。
long pollingIntervalMs = configuration.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
// TODO 从Ranger Admin拉取策略到Hdfs插件的临时存放目录。
String cacheDir = configuration.get(propertyPrefix + ".policy.cache.dir");
serviceName = configuration.get(propertyPrefix + ".service.name");
clusterName = RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", "");
//TODO 使用转发的ip地址
useForwardedIPAddress = configuration.getBoolean(propertyPrefix + ".use.x-forwarded-for.ipaddress", false);
String trustedProxyAddressString = configuration.get(propertyPrefix + ".trusted.proxy.ipaddresses");
trustedProxyAddresses = StringUtils.split(trustedProxyAddressString, RANGER_TRUSTED_PROXY_IPADDRESSES_SEPARATOR_CHAR);
//TODO 第三步:设置RangerPolicyEngineOptions类的成员变量值
policyEngineOptions.configureForPlugin(configuration, propertyPrefix);
LOG.info(policyEngineOptions);
// TODO 创建WebAdmin 通过类加载器获取实例
RangerAdminClient admin = createAdminClient(serviceName, appId, propertyPrefix);
// TODO 创建策略刷新器
refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
refresher.setDaemon(true);
// TODO 开始刷新
refresher.startRefresher();
long policyReorderIntervalMs = configuration.getLong(propertyPrefix + ".policy.policyReorderInterval", 60 * 1000);
if (policyReorderIntervalMs >= 0 && policyReorderIntervalMs < 15 * 1000) {
policyReorderIntervalMs = 15 * 1000;
}
if (LOG.isDebugEnabled()) {
LOG.debug(propertyPrefix + ".policy.policyReorderInterval:" + policyReorderIntervalMs);
}
if (policyEngineOptions.disableTrieLookupPrefilter && policyReorderIntervalMs > 0) {
policyEngineRefreshTimer = new Timer("PolicyEngineRefreshTimer", true);
try {
//TODO 通过定时器去刷新
policyEngineRefreshTimer.schedule(new PolicyEngineRefresher(this), policyReorderIntervalMs, policyReorderIntervalMs);
策略引擎配置:
//TODO 是否使用上下文增强器
disableContextEnrichers = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.context.enrichers", false);
// TODO 是否使用自定义条件。在Ranger0.5版本之后加入上下文增强器和用户自定义条件这样的“钩子”函数以增加授权策略的可扩展性
disableCustomConditions = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.custom.conditions", false);
// TODO 是否使用基于标签的策略评估。在Ranger0.6版本以后,Ranger不仅仅支持基于资源的策略,
// 还支持基于标签的策略,该策略的优点是资源分类与访问授权的分离,标记的单个授权策略可用于授权跨各种Hadoop组件访问资源
disableTagPolicyEvaluation = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.tagpolicy.evaluation", false);
//TODO 是否禁用查找过滤器
disableTrieLookupPrefilter = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.trie.lookup.prefilter", false);
cacheAuditResults = conf.getBoolean(propertyPrefix + ".policyengine.option.cache.audit.results", true);
rangerAdmin执行流程(传入的是配置文件的时间)(定时的更新策略),使用jersey来构建RESTful服务(RangerAdminRESTClient)客户端实现:
public ClientResponse run() {
//TODO 创建WebResource实例,并根据URI和查询参数(lastKnownVersion、pluginId)构建URL
WebResource secureWebResource = createWebResource(RangerRESTUtils.REST_URL_POLICY_GET_FOR_SECURE_SERVICE_IF_UPDATED + serviceName)
.queryParam(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion))
.queryParam(RangerRESTUtils.REST_PARAM_LAST_ACTIVATION_TIME, Long.toString(lastActivationTimeInMillis))
.queryParam(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId)
.queryParam(RangerRESTUtils.REST_PARAM_CLUSTER_NAME, clusterName);
return secureWebResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
}
// 304 未修正
if (response == null || response.getStatus() == HttpServletResponse.SC_NOT_MODIFIED) {
if (response == null) {
LOG.error("Error getting policies; Received NULL response!!. secureMode=" + isSecureMode + ", user=" + user + ", serviceName=" + serviceName);
} else {
RESTResponse resp = RESTResponse.fromClientResponse(response);
if (LOG.isDebugEnabled()) {
LOG.debug("No change in policies. secureMode=" + isSecureMode + ", user=" + user + ", response=" + resp + ", serviceName=" + serviceName);
}
}
ret = null;
// 200
} else if (response.getStatus() == HttpServletResponse.SC_OK) {
ret = response.getEntity(ServicePolicies.class);
// 404
} else if (response.getStatus() == HttpServletResponse.SC_NOT_FOUND) {