CloudSearch支持SDK方式上传数据,可通过创建一个Lambda函数来读取SNS通知消息,并将值写入CloudSearch中。那么实现通过/search?q=newuser搜索到新注册的用户信息的功能,就可以在新用户注册发布到SNS中信息以JSON格式写入,再用一个Lambda订阅JSON格式数据并写入CloudSearch实现。
- User对象序列化为JSON格式
- 新用户注册Lambda,发布Json格式的User到SNS
- 订阅SNS消息并反序列化User对象发邮件
- 订阅SNS消息并上传Json格式User到CloudSearch
工程说明
工程主要是通过SNS发布订阅,lambda-userregistration-cloudsearch(工程)接受订阅信息并写入CloudSearch中,lambda-userregistration-welcomemail(工程)接受订阅信息并发送邮件给新注册用户。
1. User对象序列化为JSON格式
为方便Lambda接收SNS的用户数据为JSON,所以在新用户注册成功后发布到SNS,就需要将User对象格式化为JSON。具体需要将service-user工程中的User类添加Json注释(@JsonProperty)。
/*
User POJO类
*/
public class User {
@DynamoDBHashKey(attributeName = "UserId")
@JsonProperty("userid")
private String id;
@DynamoDBIndexHashKey(globalSecondaryIndexName = "UsernameIndex", attributeName = "Username")
@JsonProperty("username")
private String username;
@DynamoDBIndexHashKey(globalSecondaryIndexName = "EmailIndex", attributeName = "Email")
@JsonProperty("email")
private String email;
//getter/setter 方法
}
2. 新用户注册Lambda,发布Json格式的User到SNS
lambda-userregistration工程的Handler类,在用户注册成功后,将User对象格式为JSON格式,并发布到SNS中。重点关注notifySnsSubscribers的new ObjectMapper().writeValueAsString(user)
代码,将序列化的User对象发布到UserRegistrationSnsTopic的主题中。
private void notifySnsSubscribers(User user) {
try {
//发布UserRegistrationSnsTopic主题,内容序列化的User对象
amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), new ObjectMapper().writeValueAsString(user));
LOGGER.info("SNS notification sent for "+user.getEmail());
} catch (Exception anyException) {
LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
}
}
3. 订阅SNS消息并反序列化User对象发邮件
lambda-userregistration-welcomemail订阅SNS消息,并反序列化User对象,为方便后续其他Lambda(如:lambda-userregistration-cloudsearch工程的Lambda)接受SNS消息并反序列化的重复操作,我们创建一个接受SNS消息并反序列化的Lambda基类(SnsLambdaHandler)。
public abstract class SnsLambdaHandler<I> implements RequestHandler<SNSEvent, Void> {
private static final Logger LOGGER = Logger.getLogger(SnsLambdaHandler.class);
private final ObjectMapper objectMapper;
protected SnsLambdaHandler() {
objectMapper=new ObjectMapper();
}
//需要子类实现的方法
public abstract void handleSnsRequest(I input, Context context);
@SuppressWarnings("unchecked")
private Class<I> getJsonType() {
return (Class<I>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
@Override
public Void handleRequest(SNSEvent input, Context context) {
//接受SNS消息
input.getRecords().forEach(snsMessage -> {
try {
//接受消息,并反序列化
I deserializedPayload = objectMapper.readValue(snsMessage.getSNS().getMessage(), getJsonType());
handleSnsRequest(deserializedPayload, context);
} catch (IOException anyException) {
LOGGER.error("JSON could not be deserialized", anyException);
}
});
return null;
}
}
订阅SNS的Lambda类修改为继承SnsLambdaHandler
类,并在类中实现public abstract void handleSnsRequest(I input, Context context);方法。由于之前是Eamil文本,现将User格式化为JSON,影响之前发送邮件业务逻辑,稍作修改,如果不关注发邮件逻辑可以直接跳过。
public class Handler extends SnsLambdaHandler<User> {
//.....
private void sendEmail(final User user) {
final String emailAddress = user.getEmail();
//收件地址
Destination destination = new Destination().withToAddresses(emailAddress);
Message message = new Message()
.withBody(new Body().withText(new Content("Welcome to our forum!")))
.withSubject(new Content("Welcome!"));
//发送邮件,发件地址从配置的环境变量中获取
//......
}
@Override
public void handleSnsRequest(User input, Context context){
//收到的是标准的SNSEvent事件
//getRecords()返回的是一个列表,表示Lambda可能一次收多条SNS消息。
//input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
//return null;
sendEmail(input);
}
}
4. 订阅SNS消息并上传Json格式User到CloudSearch
写入CloudSearch的Lambda同样需要订阅SNS并且需要将User反序列化,所以继承SnsLambdaHandler减少重复接受订阅消息和反序列化。
JSON格式数据写入CloudSearch,我们需要CloudSearch的AmazonCloudSearchDomainClient类帮忙,创建该类的同时需要设置Endpoint(即某个CloudSearch的Search Endpoint)的值。
]
还有一点需要注意,在uploadDocument时JSON格式需要如下方式(支持批量),id,type,fields都是必填字段。id是用来表示唯一性字段,type字段有add和delete分别是用来新增和删除文档内容。fields则是需要搜索的字段内容。下面类的
uploadDocument
方法主要功能是组装并写入CloudSearch。
[{
"id": "1234-1234-1234",
"type": "add",
"fields": {
"userid": "1234-1234-1234",
"eamil": "abc@abc.com",
"username": "testtest"
}
}]
写入CloudSearch的Lambda函数:
public class Handler extends SnsLambdaHandler<User> {
private static final Injector INJECTOR = Guice.createInjector();
private static final Logger LOGGER = Logger.getLogger(Handler.class);
private AmazonCloudSearchDomainClient amazonCloudSearchDomainClient;
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public Handler setAmazonCloudSearchDomainClient(AmazonCloudSearchDomainClient amazonCloudSearchDomainClient) {
//获取CloudSearch的端点
this.amazonCloudSearchDomainClient = amazonCloudSearchDomainClient;
this.amazonCloudSearchDomainClient.setEndpoint(System.getenv("CloudSearchDomain"));
return this;
}
public Handler() {
INJECTOR.injectMembers(this);
Objects.nonNull(amazonCloudSearchDomainClient);
}
//更新CloudSearch文档
private void uploadDocument(User user) {
try {
//创建CloudSearchAPI需要的数据格式,add,id,fields键是必须的。
final Map<String, Object> documentRequest = new HashMap<>();
documentRequest.put("type", "add");
documentRequest.put("id", user.getId());
documentRequest.put("fields", user);
LOGGER.info("User with id " + user.getId() + " is being uploaded to CloudSearch");
//documentRequest对象转为byte数组
byte[] jsonAsByteStream = objectMapper.writeValueAsBytes(new Map[]{documentRequest});
if (jsonAsByteStream != null) {
ByteArrayInputStream document = new ByteArrayInputStream(jsonAsByteStream);
amazonCloudSearchDomainClient.uploadDocuments(new UploadDocumentsRequest()
.withDocuments(document)
.withContentLength((long) document.available())
.withContentType(ContentType.Applicationjson)
);
}
} catch (JsonProcessingException jsonProcessingException) {
LOGGER.error("Object could not be converted to JSON", jsonProcessingException);
} catch (Exception anyException) {
LOGGER.error("Upload was failing", anyException);
}
}
@Override
public void handleSnsRequest(User input, Context context) {
uploadDocument(input);
}
}
该类中重点的依赖包:aws-java-sdk-cloudsearch
是cloudsearch的SDK,具体依赖关系配置详见该工程下的build.gradle
文件。
同样配置Lambda的cloudformation与其他Lambda配置相同,详见cloudformation.template中的UserRegistrationCloudSearchLambda
,UserRegistrationCloudSearchLambdaPermission
。
最后一步:./gradlew deploy
部署工程,部署成功后。
通过https://<youdomain>/users,body数据
{"username":"testuser24","email":"lazy24@163.com"}
提交注册信息。
通过https://<youdomain>/search?q=testuser24检索到新注册用户数据。
异常一
com.amazonaws.services.cloudsearchdomain.model.AmazonCloudSearchDomainException: User: arn:aws:sts::083845954160:assumed-role/serverlessbook-LambdaExecutionRole-1CQQ1SF5ASHEB/serverlessbook-UserRegistrationCloudSearchLambda-WI941096GZTW is not authorized to perform: cloudsearch:document on resource: serverlessbook (Service: AmazonCloudSearchDomain; Status Code: 403; Error Code: AccessDenied; Request ID: ebb327dc-6ff3-4a3f-8e92-65986e76babd; Proxy: null)
提示主要是Lambda在uploaddocument的时候没有权限。
解决方案:在对应的Lambda的Role中添加"arn:aws:iam::aws:policy/CloudSearchFullAccess"。该工程cloudformation.tempalte中涉及的是LambdaExecutionRole
。
"LambdaExecutionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"Path": "/",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com",
"apigateway.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}]
},
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AWSLambdaFullAccess",
"arn:aws:iam::aws:policy/CloudSearchFullAccess"
]
}
}
Github代码地址:https://github.com/zhujinhuant/serverlessbook/tree/master/serverlessbook-15