RocketMQ系列:ACL机制

1、RocketMQ ACL使用

     ACL全称access control list,俗称访问控制列表。主要包括如下角色

  • 用户(用户密码)
  • 资源(topic、消费)
  • 权限(是否可以发送或者消费消息)
  • 角色(是否为管理员,并可以配置是否可以进行更新或删除主题和订阅组)

1.1 Broker端开启ACL验证

     首先Broker.conf文件配置 aclEnable=true ,然后需要将 plain_acl.yml 放在 ${ROCKETMQ_HOME}/conf目录, plain_acl.yml

globalWhiteRemoteAddresses:  // 设置IP白名单
- 10.10.103.*
- 192.168.0.*
accounts:   // 配置用户信息
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:  // 用户级别的IP地址白名单
  admin: false   // 当为 true 可以执行更新、删除主题或者订阅组
  defaultTopicPerm: DENY  // DENY拒绝、SUB 订阅权限、PUB 发送权限
  defaultGroupPerm: SUB
  topicPerms:
  - topicA=DENY
  - topicB=PUB|SUB
  - topicC=SUB
  groupPerms:
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB

1.2 服务端验证

      服务端当配置好plain_acl.yml后并在 broker.conf中开启 aclEnable=true ,服务端则会进行下面逻辑验证

  • 客户端请求Ip和全局白名单匹配
  • 请求是否包含用户名并判断用户是否匹配
  • 用户级别的白名单
  • 签名验证
  • 该用户是否具有admin权限
  • 判断admin配置了需要验证的权限并进行验证

2、源码实现

2、1客户端层面

     在构造函数添加 RPCHook ,进行创建ACL对象实例。

AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials("rocketmq", "123456"))
DefaultMQProducer producer = new DefaultMQProducer("GID_test_class", aclClientRPCHook);

     发送消息前置执行钩子函数并验证ACL权限,若抛异常后则无法发送消息。

DefaultMQProducerImpl#executeSendMessageHookBefore
public void executeSendMessageHookBefore(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageBefore(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookBefore", e);
            }
        }
    }
}

2、2服务端层面

Broker端初始化ALC配置, 加载 AccessValidator配置
1. 核心是基于SPI机制,读取META-INF/service/org.apache.rocketmq.acl.AccessValidator 访问验证器
2. 遍历访问验证器,向Broker注册钩子函数。RPCHook在接受请求前进行处理请求
3. 调用AccessValidator#validate,验证acl信息,如果拥有该执行权限则通过,否则报AclException

private void initialAcl() {
    if (!this.brokerConfig.isAclEnable()) {
        return;
    }
    List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class){
    for (AccessValidator accessValidator: accessValidators) {
        final AccessValidator validator = accessValidator;
        accessValidatorMap.put(validator.getClass(),validator);
        this.registerServerRPCHook(new RPCHook() {
            @Override
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                validator.validate(validator.parse(request, remoteAddr));
            }
        });
    }
}

AccessValidator 是访问验证器接口,PlainAccessValidator是该接口的具体实现。
AccessResource parse(RemotingCommand request, String remoteAddr);
从远端请求中解析本次请求对应的访问资源
void validate(AccessResource accessResource);
根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断是否拥有,如果没有则ACLException

当远端请求过来后,触发钩子函数RPCHook,调用 PlainAccessValidator#parse ,并根据 client 端创建 PlainAccessResource实例对象

PlainAccessResource
private String accessKey;  // 访问Key,用户名
private String secretKey;  // 用户密码
private String whiteRemoteAddress; // 远程IP地址白名单
private boolean admin; // 是否是管理员角色
private byte defaultTopicPerm = 1; //默认topic访问权限,如果没有配置topic的权限,则Topic默认的访问权限为1,表示为DENY
private byte defaultGroupPerm = 1; // 默认的消费组访问权限,默认为DENY
private Map<String, Byte> resourcePermMap; // 资源需要的访问权限映射表
private RemoteAddressStrategy remoteAddressStrategy; //远程IP地址验证
private int requestCode; //请求类型code
private byte[] content;  // 请求内容
private String signature;  // 签名字符串,client端进行将请求参数排序,使用secretKey生成签名字符串。服务端则验证签名
private String secretToken;
private String recognition;
  • vPlainAccessValidator#parse,解析远端请求过程 进行验证并转化为PlainAccessResource实例。
  • 封装PlainAccessResource对象实例,包括远程访问IP地址、requestCode、accessKey(请求用户名)、签名字符串(signature)、secretToken
    根据请求命令,设置本次请求需要拥有的权限。
  • 验证签名,根据扩展字段进行排序,便于生成签名字符串,然后将扩展字段与请求体(body)写入content字段。完成从请求头中解析出本次请求需要验证的权限。
PlainAccessValidator#parse
public AccessResource parse(RemotingCommand request, String remoteAddr) {
    PlainAccessResource accessResource = new PlainAccessResource();
    accessResource.setXXX(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
    try {
        switch (request.getCode()) {
            case RequestCode.SEND_MESSAGE:
                accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
                break;
            case RequestCode.SEND_MESSAGE_V2:
                accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
                break;
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
                accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
                break;
            case RequestCode.PULL_MESSAGE:
                accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
                accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
                break;
            case RequestCode.QUERY_MESSAGE:
                accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
                break;
            case RequestCode.HEART_BEAT:
                HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); 
            case RequestCode.UNREGISTER_CLIENT:
                final UnregisterClientRequestHeader unregisterClientRequestHeader =
                    (UnregisterClientRequestHeader) request
                        .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
                accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
                break;
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
                    (GetConsumerListByGroupRequestHeader) request
                        .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
                accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
                break;
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
                    (UpdateConsumerOffsetRequestHeader) request
                        .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
                accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
                accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
                break;
            default:
                break;
        }
    }  
    SortedMap<String, String> map = new TreeMap<String, String>();
    for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
        if (!SessionCredentials.SIGNATURE.equals(entry.getKey()) && !MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) {
            map.put(entry.getKey(), entry.getValue());
        }
    }
    accessResource.setContent(AclUtils.combineRequestContent(request, map));
    return accessResource;
}
  • 加载配置acl配置文件,可以运行时动态修改,最后加载到内存中
  • PlainAccessValidator#validate -> PlainPermissionManager#validate
    根据访问的权限与Broker端配置的权限(plain_acl.yml)进行对比验证,并验证.
public PlainPermissionManager() {
    load();
    watch();
}
// 解析用户配置的访问资源,全局白名单,并加载到内存中
public void load() {
    JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class);
    if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
       for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
         globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
                getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
      }
   }
   JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
   if (accounts != null && !accounts.isEmpty()) {
     List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
     for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
        PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
        plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
     }
   }
} 
// 监听器,默认以500ms的频率判断文件的内容是否变化(根据文件md5签名进行对比),并重新加载配置文件。该方法启动一个守护线程处理
private void watch() {
    try {
        String watchFilePath = fileHome + fileName;
        FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
            @Override
            public void onChanged(String path) {
                load();
            }
        });
        fileWatchService.start();
        this.isWatchStart = true;
    }  
}  

1、如果当前的请求命令属于必须是Admin用户才能访问的权限,并且当前用户并不是管理员角色,则抛出异常
2、遍历需要权限与拥有的权限进行对比,如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限时是否允许

public void validate(PlainAccessResource plainAccessResource) {

    for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
        if (remoteAddressStrategy.match(plainAccessResource)) {
            return;
        }
    }
 
    PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
    if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
        return;
    }
 
    String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());

    checkPerm(plainAccessResource, ownedAccess);
}

void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
   
    if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
        throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
    }
    Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
    Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();

    if (needCheckedPermMap == null) {
        return;
    }

    if (ownedPermMap == null && ownedAccess.isAdmin()) {
        return;
    }

    for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
        String resource = needCheckedEntry.getKey();
        Byte neededPerm = needCheckedEntry.getValue();
        boolean isGroup = PlainAccessResource.isRetryTopic(resource);

        if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
            // Check the default perm
            byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
                ownedAccess.getDefaultTopicPerm();
            if (!Permission.checkPermission(neededPerm, ownedPerm)) {
                throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
            }
            continue;
        }
        if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
            throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容