AWS Lambda笔记-Lambda Upload CloudSearch-15【含源代码】

CloudSearch支持SDK方式上传数据,可通过创建一个Lambda函数来读取SNS通知消息,并将值写入CloudSearch中。那么实现通过/search?q=newuser搜索到新注册的用户信息的功能,就可以在新用户注册发布到SNS中信息以JSON格式写入,再用一个Lambda订阅JSON格式数据并写入CloudSearch实现。

  1. User对象序列化为JSON格式
  2. 新用户注册Lambda,发布Json格式的User到SNS
  3. 订阅SNS消息并反序列化User对象发邮件
  4. 订阅SNS消息并上传Json格式User到CloudSearch

工程说明

工程主要是通过SNS发布订阅,lambda-userregistration-cloudsearch(工程)接受订阅信息并写入CloudSearch中,lambda-userregistration-welcomemail(工程)接受订阅信息并发送邮件给新注册用户。


工程关系配置

1. User对象序列化为JSON格式

为方便Lambda接收SNS的用户数据为JSON,所以在新用户注册成功后发布到SNS,就需要将User对象格式化为JSON。具体需要将service-user工程中的User类添加Json注释(@JsonProperty)。

/*
 User POJO类
*/
public class User {

    @DynamoDBHashKey(attributeName = "UserId")
    @JsonProperty("userid")
    private String id;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "UsernameIndex", attributeName = "Username")
    @JsonProperty("username")
    private String username;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "EmailIndex", attributeName = "Email")
    @JsonProperty("email")
    private String email;
    //getter/setter 方法
}

2. 新用户注册Lambda,发布Json格式的User到SNS

lambda-userregistration工程的Handler类,在用户注册成功后,将User对象格式为JSON格式,并发布到SNS中。重点关注notifySnsSubscribers的new ObjectMapper().writeValueAsString(user)代码,将序列化的User对象发布到UserRegistrationSnsTopic的主题中。

    private void notifySnsSubscribers(User user) {
      try {
        //发布UserRegistrationSnsTopic主题,内容序列化的User对象
        amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), new ObjectMapper().writeValueAsString(user));
        LOGGER.info("SNS notification sent for "+user.getEmail());
      } catch (Exception anyException) {
        LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
      }
    }

3. 订阅SNS消息并反序列化User对象发邮件

lambda-userregistration-welcomemail订阅SNS消息,并反序列化User对象,为方便后续其他Lambda(如:lambda-userregistration-cloudsearch工程的Lambda)接受SNS消息并反序列化的重复操作,我们创建一个接受SNS消息并反序列化的Lambda基类(SnsLambdaHandler)。

public abstract class SnsLambdaHandler<I> implements RequestHandler<SNSEvent, Void> {

    private static final Logger LOGGER = Logger.getLogger(SnsLambdaHandler.class);

    private final ObjectMapper objectMapper;

    protected SnsLambdaHandler() {
        objectMapper=new ObjectMapper();
    }
    //需要子类实现的方法
    public abstract void handleSnsRequest(I input, Context context);

    @SuppressWarnings("unchecked")
    private Class<I> getJsonType() {
        return (Class<I>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    @Override
    public Void handleRequest(SNSEvent input, Context context) {
        //接受SNS消息
        input.getRecords().forEach(snsMessage -> {
            try {
                //接受消息,并反序列化
                I deserializedPayload = objectMapper.readValue(snsMessage.getSNS().getMessage(), getJsonType());
                handleSnsRequest(deserializedPayload, context);
            } catch (IOException anyException) {
                LOGGER.error("JSON could not be deserialized", anyException);
            }
        });
        return null;
    }
}

订阅SNS的Lambda类修改为继承SnsLambdaHandler类,并在类中实现public abstract void handleSnsRequest(I input, Context context);方法。由于之前是Eamil文本,现将User格式化为JSON,影响之前发送邮件业务逻辑,稍作修改,如果不关注发邮件逻辑可以直接跳过。

public class Handler extends SnsLambdaHandler<User> {
  //.....
  private void sendEmail(final User user) {
    final String emailAddress = user.getEmail();
    //收件地址
    Destination destination = new Destination().withToAddresses(emailAddress);
    Message message = new Message()
        .withBody(new Body().withText(new Content("Welcome to our forum!")))
        .withSubject(new Content("Welcome!"));
    //发送邮件,发件地址从配置的环境变量中获取
    //......
  }

  @Override
  public void handleSnsRequest(User input, Context context){
    //收到的是标准的SNSEvent事件
    //getRecords()返回的是一个列表,表示Lambda可能一次收多条SNS消息。
    //input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
    //return null;
    sendEmail(input);
  }
}

4. 订阅SNS消息并上传Json格式User到CloudSearch

写入CloudSearch的Lambda同样需要订阅SNS并且需要将User反序列化,所以继承SnsLambdaHandler减少重复接受订阅消息和反序列化。
JSON格式数据写入CloudSearch,我们需要CloudSearch的AmazonCloudSearchDomainClient类帮忙,创建该类的同时需要设置Endpoint(即某个CloudSearch的Search Endpoint)的值。

Search Endpoint的值

]
还有一点需要注意,在uploadDocument时JSON格式需要如下方式(支持批量),id,type,fields都是必填字段。id是用来表示唯一性字段,type字段有add和delete分别是用来新增和删除文档内容。fields则是需要搜索的字段内容。下面类的uploadDocument方法主要功能是组装并写入CloudSearch。

[{
    "id": "1234-1234-1234",
    "type": "add",
    "fields": {
        "userid": "1234-1234-1234",
        "eamil": "abc@abc.com",
        "username": "testtest"
    }
}]

写入CloudSearch的Lambda函数:

public class Handler extends SnsLambdaHandler<User> {
    private static final Injector INJECTOR = Guice.createInjector();
    private static final Logger LOGGER = Logger.getLogger(Handler.class);
    private AmazonCloudSearchDomainClient amazonCloudSearchDomainClient;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Inject
    public Handler setAmazonCloudSearchDomainClient(AmazonCloudSearchDomainClient amazonCloudSearchDomainClient) {
        //获取CloudSearch的端点
        this.amazonCloudSearchDomainClient = amazonCloudSearchDomainClient;
        this.amazonCloudSearchDomainClient.setEndpoint(System.getenv("CloudSearchDomain"));
        return this;
    }

    public Handler() {
        INJECTOR.injectMembers(this);
        Objects.nonNull(amazonCloudSearchDomainClient);
    }

    //更新CloudSearch文档
    private void uploadDocument(User user) {
        try {
            //创建CloudSearchAPI需要的数据格式,add,id,fields键是必须的。
            final Map<String, Object> documentRequest = new HashMap<>();
            documentRequest.put("type", "add");
            documentRequest.put("id", user.getId());
            documentRequest.put("fields", user);
            LOGGER.info("User with id " + user.getId() + " is being uploaded to CloudSearch");
            //documentRequest对象转为byte数组
            byte[] jsonAsByteStream = objectMapper.writeValueAsBytes(new Map[]{documentRequest});
            if (jsonAsByteStream != null) {
                ByteArrayInputStream document = new ByteArrayInputStream(jsonAsByteStream);
                amazonCloudSearchDomainClient.uploadDocuments(new UploadDocumentsRequest()
                        .withDocuments(document)
                        .withContentLength((long) document.available())
                        .withContentType(ContentType.Applicationjson)
                );
            }
        } catch (JsonProcessingException jsonProcessingException) {
            LOGGER.error("Object could not be converted to JSON", jsonProcessingException);
        } catch (Exception anyException) {
            LOGGER.error("Upload was failing", anyException);
        }
    }

    @Override
    public void handleSnsRequest(User input, Context context) {
        uploadDocument(input);
    }
}

该类中重点的依赖包:aws-java-sdk-cloudsearch是cloudsearch的SDK,具体依赖关系配置详见该工程下的build.gradle文件。
同样配置Lambda的cloudformation与其他Lambda配置相同,详见cloudformation.template中的UserRegistrationCloudSearchLambda,UserRegistrationCloudSearchLambdaPermission
最后一步:./gradlew deploy部署工程,部署成功后。
通过https://<youdomain>/users,body数据
{"username":"testuser24","email":"lazy24@163.com"}提交注册信息。
通过https://<youdomain>/search?q=testuser24检索到新注册用户数据。

检索到的数据

异常一

com.amazonaws.services.cloudsearchdomain.model.AmazonCloudSearchDomainException: User: arn:aws:sts::083845954160:assumed-role/serverlessbook-LambdaExecutionRole-1CQQ1SF5ASHEB/serverlessbook-UserRegistrationCloudSearchLambda-WI941096GZTW is not authorized to perform: cloudsearch:document on resource: serverlessbook (Service: AmazonCloudSearchDomain; Status Code: 403; Error Code: AccessDenied; Request ID: ebb327dc-6ff3-4a3f-8e92-65986e76babd; Proxy: null)

提示主要是Lambda在uploaddocument的时候没有权限。
解决方案:在对应的Lambda的Role中添加"arn:aws:iam::aws:policy/CloudSearchFullAccess"。该工程cloudformation.tempalte中涉及的是LambdaExecutionRole

"LambdaExecutionRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "Path": "/",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "lambda.amazonaws.com",
                        "apigateway.amazonaws.com"
                    ]
                },
                "Action": [
                    "sts:AssumeRole"
                ]
            }]
        },
        "ManagedPolicyArns": [
            "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
            "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
            "arn:aws:iam::aws:policy/AWSLambdaFullAccess",
            "arn:aws:iam::aws:policy/CloudSearchFullAccess"
        ]
    }
}

Github代码地址:https://github.com/zhujinhuant/serverlessbook/tree/master/serverlessbook-15

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343