HBase源码分析之权限验证中讲过了自带的simple认证方式,Apache有个项目,也提供了权限验证,就是Ranger。Ranger的安装方式比较复杂,具体看:https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5.0+Installation
个人感觉Ranger还是略显粗糙,和我预期的Apache顶级项目有差距。
Ranger的权限管理是通过RangerAuthorizationCoprocessor来实现的,实现了MasterObserver、RegionServerObserver、RegionObserver、BulkLoadObserver,各种回调。
和HBase的grant、revoke同步
配置中配置了grant、revoke的时候,是否相应的刷新ranger的标记位UpdateRangerPoliciesOnGrantRevoke
UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
RangerAuthorizationCoprocessor实现了CoprocessorService接口,将自己注册进去,监听grant、revoke。
@Override
public Service getService() {
return AccessControlProtos.AccessControlService.newReflectiveService(this);
}
实现了这2个方法,在这2个方法中判断UpdateRangerPoliciesOnGrantRevoke如果为true,就更新下自己的配置。
/**
* <code>rpc Grant(.GrantRequest) returns (.GrantResponse);</code>
*/
public abstract void grant(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse> done);
/**
* <code>rpc Revoke(.RevokeRequest) returns (.RevokeResponse);</code>
*/
public abstract void revoke(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse> done);
Policy生效规则
各种操作之前调用evaluateAccess,代码简直裹脚布,总结起来就是判断了Namespace、table、column、qualifier的设置,将所有设置集中到AuthorizationSession中,然后调用AuthorizationSession的authorize,判断权限。
ColumnFamilyAccessResult evaluateAccess(String operation, Action action,
final RegionCoprocessorEnvironment env,
final Map<byte[], ? extends Collection<?>> familyMap)
throws AccessDeniedException {
String access = _authUtils.getAccess(action);
User user = getActiveUser();
String userName = _userUtils.getUserAsString(user);
byte[] tableBytes = getTableName(env);
if (tableBytes == null || tableBytes.length == 0) {
throw new AccessDeniedException("Insufficient permissions for operation '" + operation + "',action: " + action);
}
String table = Bytes.toString(tableBytes);
String clusterName = hbasePlugin.getClusterName();
final String messageTemplate = "evaluateAccess: exiting: user[%s], Operation[%s], access[%s], families[%s], verdict[%s]";
ColumnFamilyAccessResult result;
if (canSkipAccessCheck(operation, access, table) || canSkipAccessCheck(operation, access, env)) {
result = new ColumnFamilyAccessResult(true, true, null, null, null, null, null, null);
return result;
}
// let's create a session that would be reused. Set things on it that won't change.
HbaseAuditHandler auditHandler = _factory.getAuditHandler();
AuthorizationSession session = new AuthorizationSession(hbasePlugin)
.operation(operation)
.remoteAddress(getRemoteAddress())
.auditHandler(auditHandler)
.user(user)
.access(access)
.table(table)
.clusterName(clusterName);
Map<String, Set<String>> families = getColumnFamilies(familyMap);
if (families == null || families.isEmpty()) {
session.buildRequest()
.authorize();
boolean authorized = session.isAuthorized();
String reason = "";
if (!authorized) {
reason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, no column families found.", user.getName(), operation, table);
}
AuthzAuditEvent event = auditHandler.getAndDiscardMostRecentEvent(); // this could be null, of course, depending on audit settings of table.
// if authorized then pass captured events as access allowed set else as access denied set.
result = new ColumnFamilyAccessResult(authorized, authorized,
authorized ? Collections.singletonList(event) : null,
null, authorized ? null : event, reason, null, clusterName);
return result;
}
boolean everythingIsAccessible = true;
boolean somethingIsAccessible = false;
/*
* we would have to accumulate audits of all successful accesses and any one denial (which in our case ends up being the last denial)
* We need to keep audit events for family level access check seperate because we don't want them logged in some cases.
*/
List<AuthzAuditEvent> authorizedEvents = new ArrayList<AuthzAuditEvent>();
List<AuthzAuditEvent> familyLevelAccessEvents = new ArrayList<AuthzAuditEvent>();
AuthzAuditEvent deniedEvent = null;
String denialReason = null;
// we need to cache the auths results so that we can create a filter, if needed
Map<String, Set<String>> columnsAccessAllowed = new HashMap<String, Set<String>>();
Set<String> familesAccessAllowed = new HashSet<String>();
Set<String> familesAccessDenied = new HashSet<String>();
Set<String> familesAccessIndeterminate = new HashSet<String>();
for (Map.Entry<String, Set<String>> anEntry : families.entrySet()) {
String family = anEntry.getKey();
session.columnFamily(family);
Set<String> columns = anEntry.getValue();
if (columns == null || columns.isEmpty()) {
session.column(null) // zap stale column from prior iteration of this loop, if any
.buildRequest()
.authorize();
AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for success
if (session.isAuthorized()) {
// we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
somethingIsAccessible = true;
familesAccessAllowed.add(family);
if (auditEvent != null) {
familyLevelAccessEvents.add(auditEvent);
}
} else {
everythingIsAccessible = false;
if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
deniedEvent = auditEvent;
}
session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF_OR_DESCENDANTS)
.buildRequest()
.authorize();
auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for failure
if (session.isAuthorized()) {
// we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
somethingIsAccessible = true;
familesAccessIndeterminate.add(family);
} else {
familesAccessDenied.add(family);
denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s.", user.getName(), operation, table, family);
if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
deniedEvent = auditEvent;
}
}
// Restore the headMatch setting
session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF);
}
} else {
Set<String> accessibleColumns = new HashSet<String>(); // will be used in to populate our results cache for the filter
for (String column : columns) {
session.column(column)
.buildRequest()
.authorize();
AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent();
if (session.isAuthorized()) {
// we need to do 3 things: housekeeping, capturing audit events, building the results cache for filter
somethingIsAccessible = true;
accessibleColumns.add(column);
if (auditEvent != null) {
authorizedEvents.add(auditEvent);
}
} else {
everythingIsAccessible = false;
denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s, column: %s", user.getName(), operation, table, family, column);
if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
deniedEvent = auditEvent;
}
}
if (!accessibleColumns.isEmpty()) {
columnsAccessAllowed.put(family, accessibleColumns);
}
}
}
}
// Cache of auth results are encapsulated the in the filter. Not every caller of the function uses it - only preGet and preOpt will.
RangerAuthorizationFilter filter = new RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, familesAccessIndeterminate, columnsAccessAllowed);
result = new ColumnFamilyAccessResult(everythingIsAccessible, somethingIsAccessible, authorizedEvents, familyLevelAccessEvents, deniedEvent, denialReason, filter, clusterName);
return result;
}
authorize里会调用到RangerPolicyEngineImpl#isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor)方法
@Override
public RangerAccessResult isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor) {
RangerAccessResult ret = isAccessAllowedNoAudit(request);
updatePolicyUsageCounts(request, ret);
if (resultProcessor != null) {
resultProcessor.processResult(ret);
}
return ret;
}
RangerPolicyEngineImpl#isAccessAllowed中会从RangerPolicyRepository中查找该资源的所有Policy,遍历执行RangerDefaultPolicyEvaluator#evaluatePolicyItems,来进行评估是否有权限访问。遍历过程中如果发现了匹配的规则,决定了deny还是allow,遍历就会break。每一次的遍历先从denyEvaluators里查找匹配的deny权限,如果没有找到,就从allowEvaluators里查找匹配的allow权限。
protected void evaluatePolicyItems(RangerAccessRequest request, RangerAccessResult result, boolean isResourceMatch) {
// 先看有没有匹配的deny记录
RangerPolicyItemEvaluator matchedPolicyItem = getMatchingPolicyItem(request, denyEvaluators, denyExceptionEvaluators);
// 再看有没有匹配的allow记录
if (matchedPolicyItem == null && !result.getIsAllowed()) {
matchedPolicyItem = getMatchingPolicyItem(request, allowEvaluators, allowExceptionEvaluators);
}
if (matchedPolicyItem != null) {
RangerPolicy policy = getPolicy();
if (matchedPolicyItem.getPolicyItemType() == RangerPolicyItemEvaluator.POLICY_ITEM_TYPE_DENY) {
if (isResourceMatch) {
result.setIsAllowed(false);
result.setPolicyId(policy.getId());
result.setReason(matchedPolicyItem.getComments());
}
} else {
if (!result.getIsAllowed()) {
result.setIsAllowed(true);
result.setPolicyId(policy.getId());
result.setReason(matchedPolicyItem.getComments());
}
}
}
}
配置更新
在RangerAuthorizationCoprocessor的start中创建了RangerHBasePlugin
@Override
public void start(CoprocessorEnvironment env) throws IOException {
...
// create and initialize the plugin class
RangerHBasePlugin plugin = hbasePlugin;
if(plugin == null) {
synchronized(RangerAuthorizationCoprocessor.class) {
plugin = hbasePlugin;
if(plugin == null) {
plugin = new RangerHBasePlugin(appType);
plugin.init();
UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
hbasePlugin = plugin;
}
}
}
...
}
RangerHBasePlugin的init方法中创建了PolicyRefresher用于同步权限配置,默认刷新时间为30*1000ms,即30s一次主动拉取配置。
public void init() {
...
long pollingIntervalMs = RangerConfiguration.getInstance().getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
...
refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
refresher.setDaemon(true);
refresher.startRefresher();
...
}
PolicyRefresher本质是一个Thread,在start之后,会执行run()方法,这里进入了一个loop,执行完一次配置拉取猴,线程sleep 30s。
public void run() {
while(true) {
loadPolicy();
try {
Thread.sleep(pollingIntervalMs);
} catch(InterruptedException excp) {
break;
}
}
}
private void loadPolicy() {
try {
// 拉取一次配置
ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
if (svcPolicies == null) {
// 启动时拉取失败会从缓存中再读取一次
if (!policiesSetInPlugin) {
svcPolicies = loadFromCache();
}
} else {
// 写到缓存中
saveToCache(svcPolicies);
}
// 生效配置
if (svcPolicies != null) {
plugIn.setPolicies(svcPolicies);
policiesSetInPlugin = true;
setLastActivationTimeInMillis(System.currentTimeMillis());
lastKnownVersion = svcPolicies.getPolicyVersion();
} else {
if (!policiesSetInPlugin && !serviceDefSetInPlugin) {
plugIn.setPolicies(null);
serviceDefSetInPlugin = true;
}
}
} catch (Exception excp) {
}
}
配置读取到之后,会写入RangerBasePlugin,并重新new一个RangerPolicyRepository实例,配置作为构造函数的参数,放入了RangerPolicyRepository。
-END-