关键词:AWS API Gateway WebSockets Example
需求背景
基于AWS APIGateway的微服务架构下,需要实现 “服务器端到客户端” 的通知推送实时交互功能。
AWS APIGateway RestfulAPI 是HTTP协议的,而 “服务器端到客户端” 采用的Websocket是TCP协议的。
由于协议问题,AWS APIGateway RestfulAPI 被PASS,后发现AWS APIGateway WebsocketAPI 已悄悄上线,故决定使用AWS APIGateway WebsocketAPI搭建一个实时聊天系统,从而确定该方案的可行性。
架构方案
采用 API Gateway Websocket API +Lambda + DynamoDB 搭建一个实时聊天程序,如下为基础架构图:
在我们的应用程序中,设备将连接到API网关。当设备连接时,lambda函数将在DynamoDB表中保存连接ID。当设备断开连接时,另一个lambda函数将在DynamoDB表中移除连接ID。在我们想要将消息发送回设备的实例中,第三个lambda函数将使用回调URL将连接ID和POST数据发送回设备。
实现步骤
Step1 创建Gateway WebSocket API
登陆AWS Console 转到Amazon API Gateway服务,单击WebSocket以创建WebSocket API,提供API名称和路径选择表达式。在示例中,添加 $request.body.action作为选择表达式并点击Create API。
创建API后,我们将重定向到路由页面。在这里我们可以看到已经预定义的三条路线:$connect,$disconnect和 $default,我们还将创建一个自定义路由onMessage。
在我们的架构中,$connect和$disconnect routes实现以下任务:
$connect - 当调用此路由时,Lambda函数会将连接设备的连接ID添加到DynamoDB。
$disconnect - 调用此路由时,Lambda函数将从DynamoDB中删除已断开连接的设备的连接ID。
onMessage - 当调用此路由时,消息正文将被发送到当时连接的所有设备。
Step2 创建用于"存储连接设备ID"的DynamoDB
Step3 创建connect Lambda函数 ChatRoomConnectFunction
在lambda函数的代码中添加以下代码,此代码将连接设备的连接ID添加到我们创建的DynamoDB表中:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context, callback) => {
const connectionId = event.requestContext.connectionId;
addConnectionId(connectionId).then(() => {
callback(null, {
statusCode: 200,
})
});
}
function addConnectionId(connectionId) {
return ddb.put({
TableName: 'Chat',
Item: {
connectionid : connectionId
},
}).promise();
}
然后发布Lambda新版本
Step4 创建disconnect Lambda函数 ChatRoomDonnectFunction
在lambda函数的代码中添加以下代码,当设备断开连接时,此代码将从DynamoDB表中删除连接ID:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context, callback) => {
const connectionId = event.requestContext.connectionId;
deleteConnectionId(connectionId).then(() => {
callback(null, {
statusCode: 200,
})
});
}
function deleteConnectionId(connectionId) {
return ddb.delete({
TableName: 'Chat',
Key: {
connectionid : connectionId,
},
}).promise();
}
然后发布Lambda新版本
Step5 配置 $connect和 $disconnect路由,测试WebSocket API是否正常工作
现在我们已经创建了DynamoDB表和两个lambda函数。在创建第三个lambda函数之前,让我们再回到API Gateway并使用我们创建的lambda函数配置路由。首先,单击$ connect route。作为集成类型,选择Lambda函数并选择ChatRoomConnectFunction。
在$disconnect路由上执行相同的操作,其中lambda函数将是ChatRoomDonnectFunction:
为了方便调错,建议开启CloudWatch日志记录
部署后,我们将看到两个URL。第一个URL称为WebSocket URL,第二个URL称为连接URL。
WebSocket URL是用于通过设备将WebSockets连接到API的URL。第二个URL,即Connection(连接)URL,向连接的客户端发送回调消息、获取连接信息或断开客户端连接。
使用wscat工具进行测试(使用 npm install -g wscat 安装)。
wscat -c wss://91ajt7fo78.execute-api.ap-northeast-2.amazonaws.com/dev
查看DynamoDB可以看到connectionid
控制台 ctrl+z 中断连接之后再次查看DynamoDB会发现刚才的connectionid已被删除:
Step6 实现发送消息多客户端接收功能
新增lambda函数ChatRoomOnMessageFunction,查询Chat DynamoDB表,获取所有连接ID,并将消息发送给这些连接ID对应的终端:
index.js
扫描DynamoDB以获取表中的所有可用记录,lambda函数将解析“message”属性并将其发送给其他终端:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
require('./patch.js');
let send = undefined;
function init(event) {
console.log(event)
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
send = async (connectionId, data) => {
await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: `Echo: ${data}` }).promise();
}
}
exports.handler = (event, context, callback) => {
init(event);
let message = JSON.parse(event.body).message
getConnections().then((data) => {
console.log(data.Items);
data.Items.forEach(function(connection) {
console.log("Connection " +connection.connectionid)
send(connection.connectionid, message);
});
});
return {}
};
function getConnections(){
return ddb.scan({
TableName: 'Chat',
}).promise();
}
patch.js
自动为我们的API创建回调URL并发送POST请求:
require('aws-sdk/lib/node_loader');
var AWS = require('aws-sdk/lib/core');
var Service = AWS.Service;
var apiLoader = AWS.apiLoader;
apiLoader.services['apigatewaymanagementapi'] = {};
AWS.ApiGatewayManagementApi = Service.defineService('apigatewaymanagementapi', ['2018-11-29']);
Object.defineProperty(apiLoader.services['apigatewaymanagementapi'], '2018-11-29', {
get: function get() {
var model = {
"metadata": {
"apiVersion": "2018-11-29",
"endpointPrefix": "execute-api",
"signingName": "execute-api",
"serviceFullName": "AmazonApiGatewayManagementApi",
"serviceId": "ApiGatewayManagementApi",
"protocol": "rest-json",
"jsonVersion": "1.1",
"uid": "apigatewaymanagementapi-2018-11-29",
"signatureVersion": "v4"
},
"operations": {
"PostToConnection": {
"http": {
"requestUri": "/@connections/{connectionId}",
"responseCode": 200
},
"input": {
"type": "structure",
"members": {
"Data": {
"type": "blob"
},
"ConnectionId": {
"location": "uri",
"locationName": "connectionId"
}
},
"required": [
"ConnectionId",
"Data"
],
"payload": "Data"
}
}
},
"shapes": {}
}
model.paginators = {
"pagination": {}
}
return model;
},
enumerable: true,
configurable: true
});
module.exports = AWS.ApiGatewayManagementApi;
在API Gateway中新建route onMessage,并指向ChatRoomOnMessageFunction Lambda函数:
保存并点击右上角的按钮添加集成响应:
部署API到相应阶段(如果部署的时候报错,请先给$default路由配置与onMessage一样的集成响应规则即可):
Step7 测试功能可用
多个终端,并与API Gateway WebsocketAPI 建立连接:
终端发送内容
{"action" : "onMessage" , "message" : "Hello everyone"}
END
出现以上截图则说明Demo成功。
说明:
发送内容的action字段value值对应了API Gateway Websocket的路由名称,如上截图我们的action值为onMessage,则在发送消息的时候API Gateway Websocket会找onMessage路由对应的集成请求规则进行相应。
思考:
如何在以上Demo的思路之上进行延伸,使用API Gateway Websocket+Lambda+DynamoDB搭建带权限校验的公司内部公共终端推送微服务?如何可视化PC终端推送策略?