【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序

关键词:AWS API Gateway WebSockets Example

image.png

需求背景

基于AWS APIGateway的微服务架构下,需要实现 “服务器端到客户端” 的通知推送实时交互功能。

AWS APIGateway RestfulAPI 是HTTP协议的,而 “服务器端到客户端” 采用的Websocket是TCP协议的。

由于协议问题,AWS APIGateway RestfulAPI 被PASS,后发现AWS APIGateway WebsocketAPI 已悄悄上线,故决定使用AWS APIGateway WebsocketAPI搭建一个实时聊天系统,从而确定该方案的可行性。

架构方案

采用 API Gateway Websocket API +Lambda + DynamoDB 搭建一个实时聊天程序,如下为基础架构图:

image.png

在我们的应用程序中,设备将连接到API网关。当设备连接时,lambda函数将在DynamoDB表中保存连接ID。当设备断开连接时,另一个lambda函数将在DynamoDB表中移除连接ID。在我们想要将消息发送回设备的实例中,第三个lambda函数将使用回调URL将连接ID​​和POST数据发送回设备。

实现步骤

Step1 创建Gateway WebSocket API

登陆AWS Console 转到Amazon API Gateway服务,单击WebSocket以创建WebSocket API,提供API名称和路径选择表达式。在示例中,添加 $request.body.action作为选择表达式并点击Create API。


image.png
image.png

创建API后,我们将重定向到路由页面。在这里我们可以看到已经预定义的三条路线:$connect,$disconnect和 $default,我们还将创建一个自定义路由onMessage。

在我们的架构中,$connect和$disconnect routes实现以下任务:

$connect - 当调用此路由时,Lambda函数会将连接设备的连接ID添加到DynamoDB。

$disconnect - 调用此路由时,Lambda函数将从DynamoDB中删除已断开连接的设备的连接ID。

onMessage - 当调用此路由时,消息正文将被发送到当时连接的所有设备。

Step2 创建用于"存储连接设备ID"的DynamoDB

image.png

Step3 创建connect Lambda函数 ChatRoomConnectFunction

image.png

在lambda函数的代码中添加以下代码,此代码将连接设备的连接ID添加到我们创建的DynamoDB表中:

const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {
    const connectionId = event.requestContext.connectionId;
    addConnectionId(connectionId).then(() => {
    callback(null, {
        statusCode: 200,
        })
    });
}
function addConnectionId(connectionId) {
    return ddb.put({
        TableName: 'Chat',
        Item: {
            connectionid : connectionId
        },
    }).promise();
}

然后发布Lambda新版本


image.png

Step4 创建disconnect Lambda函数 ChatRoomDonnectFunction

image.png

在lambda函数的代码中添加以下代码,当设备断开连接时,此代码将从DynamoDB表中删除连接ID:

const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context, callback) => {
    const connectionId = event.requestContext.connectionId;
    deleteConnectionId(connectionId).then(() => {
    callback(null, {
        statusCode: 200,
        })
    });
}
function deleteConnectionId(connectionId) {
    return ddb.delete({
        TableName: 'Chat',
        Key: {
            connectionid : connectionId,
        },
    }).promise();
}

然后发布Lambda新版本

image.png

Step5 配置 $connect和 $disconnect路由,测试WebSocket API是否正常工作

现在我们已经创建了DynamoDB表和两个lambda函数。在创建第三个lambda函数之前,让我们再回到API Gateway并使用我们创建的lambda函数配置路由。首先,单击$ connect route。作为集成类型,选择Lambda函数并选择ChatRoomConnectFunction。

image.png

在$disconnect路由上执行相同的操作,其中lambda函数将是ChatRoomDonnectFunction:

image.png
image.png

为了方便调错,建议开启CloudWatch日志记录

image.png

部署后,我们将看到两个URL。第一个URL称为WebSocket URL,第二个URL称为连接URL。

image.png

WebSocket URL是用于通过设备将WebSockets连接到API的URL。第二个URL,即Connection(连接)URL,向连接的客户端发送回调消息、获取连接信息或断开客户端连接。

使用wscat工具进行测试(使用 npm install -g wscat 安装)。

wscat -c wss://91ajt7fo78.execute-api.ap-northeast-2.amazonaws.com/dev
image.png

查看DynamoDB可以看到connectionid

image.png

控制台 ctrl+z 中断连接之后再次查看DynamoDB会发现刚才的connectionid已被删除:


image.png

Step6 实现发送消息多客户端接收功能

新增lambda函数ChatRoomOnMessageFunction,查询Chat DynamoDB表,获取所有连接ID,并将消息发送给这些连接ID对应的终端:

index.js

扫描DynamoDB以获取表中的所有可用记录,lambda函数将解析“message”属性并将其发送给其他终端:

const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
require('./patch.js');
let send = undefined;
function init(event) {
  console.log(event)
  
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  
  
  send = async (connectionId, data) => {
  await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: `Echo: ${data}` }).promise();
  }
}


exports.handler =  (event, context, callback) => {
  init(event);
  let message = JSON.parse(event.body).message
    getConnections().then((data) => {
        console.log(data.Items);
        data.Items.forEach(function(connection) {
           console.log("Connection " +connection.connectionid)
           send(connection.connectionid, message);
        });
    });
    
    return {}
};
function getConnections(){
    return ddb.scan({
        TableName: 'Chat',
    }).promise();
}

patch.js

自动为我们的API创建回调URL并发送POST请求:

require('aws-sdk/lib/node_loader');
var AWS = require('aws-sdk/lib/core');
var Service = AWS.Service;
var apiLoader = AWS.apiLoader;
apiLoader.services['apigatewaymanagementapi'] = {};
AWS.ApiGatewayManagementApi = Service.defineService('apigatewaymanagementapi', ['2018-11-29']);
Object.defineProperty(apiLoader.services['apigatewaymanagementapi'], '2018-11-29', {
  get: function get() {
    var model = {
      "metadata": {
        "apiVersion": "2018-11-29",
        "endpointPrefix": "execute-api",
        "signingName": "execute-api",
        "serviceFullName": "AmazonApiGatewayManagementApi",
        "serviceId": "ApiGatewayManagementApi",
        "protocol": "rest-json",
        "jsonVersion": "1.1",
        "uid": "apigatewaymanagementapi-2018-11-29",
        "signatureVersion": "v4"
      },
      "operations": {
        "PostToConnection": {
          "http": {
            "requestUri": "/@connections/{connectionId}",
            "responseCode": 200
          },
          "input": {
            "type": "structure",
            "members": {
              "Data": {
                "type": "blob"
              },
              "ConnectionId": {
                "location": "uri",
                "locationName": "connectionId"
              }
            },
            "required": [
              "ConnectionId",
              "Data"
            ],
            "payload": "Data"
          }
        }
      },
      "shapes": {}
    }
    model.paginators = {
      "pagination": {}
    }
    return model;
  },
  enumerable: true,
  configurable: true
});
module.exports = AWS.ApiGatewayManagementApi;

在API Gateway中新建route onMessage,并指向ChatRoomOnMessageFunction Lambda函数:

image.png

保存并点击右上角的按钮添加集成响应:

image.png

部署API到相应阶段(如果部署的时候报错,请先给$default路由配置与onMessage一样的集成响应规则即可):

image.png

Step7 测试功能可用

多个终端,并与API Gateway WebsocketAPI 建立连接:

image.png

终端发送内容

{"action" : "onMessage" , "message" : "Hello everyone"}
image.png

END

出现以上截图则说明Demo成功。

说明:
发送内容的action字段value值对应了API Gateway Websocket的路由名称,如上截图我们的action值为onMessage,则在发送消息的时候API Gateway Websocket会找onMessage路由对应的集成请求规则进行相应。

思考:
如何在以上Demo的思路之上进行延伸,使用API Gateway Websocket+Lambda+DynamoDB搭建带权限校验的公司内部公共终端推送微服务?如何可视化PC终端推送策略?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容