Implementing a Serverless Architecture with AWS Managed Services
实验目标
实验目标
- 通过AWS的管理服务实现一个serverless的架构;
- 通过S3和DynamoDB触发AWS的lamdba函数;
实验目标架构
关键业务流程场景
- 创建一个交易文件transaction.txt到S3的bucket中;
- 这个会触发lambda的函数,执行数据库操作,将文件内容逐条分析,并插入到DynamoDB的两张表中;
- 如果表中的数据金额超过1500¥,那么会触发一个SNS操作进行通知
- 这个SNS通知操作,会触发数据保存到SQS队列中,供后续信用卡部门通知客户
操作步骤
创建一个Lambda函数去处理银行交易文件
- 用途:用于处理传送到S3的交易文件
-
架构示意图
- 关键操作流程
-
关键配置
同时设置函数的timeout时间为20 sec;
- 函数代码块
# TransactionProcessor Lambda function
#
# This function is triggered by an object being created in an Amazon S3 bucket.
# The file is downloaded and each line is inserted into DynamoDB tables.
from __future__ import print_function
import json, urllib, boto3, csv
# Connect to S3 and DynamoDB
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
# Connect to the DynamoDB tables
customerTable = dynamodb.Table('Customer');
transactionsTable = dynamodb.Table('Transactions');
# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):
# Show the incoming event in the debug log
print("Event received by Lambda function: " + json.dumps(event, indent=2))
# Get the bucket and object key from the Event
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
localFilename = '/tmp/transactions.txt'
# Download the file from S3 to the local filesystem
try:
s3.meta.client.download_file(bucket, key, localFilename)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
# Read the Transactions CSV file. Delimiter is the '|' character
with open(localFilename) as csvfile:
reader = csv.DictReader(csvfile, delimiter='|')
# Read each row in the file
rowCount = 0
for row in reader:
rowCount += 1
# Show the row in the debug log
print(row['customer_id'], row['customer_address'], row['trn_id'], row['trn_date'], row['trn_amount'])
try:
# Insert Customer ID and Address into Customer DynamoDB table
customerTable.put_item(
Item={
'CustomerId': row['customer_id'],
'Address': row['customer_address']})
# Insert transaction details into Transactions DynamoDB table
transactionsTable.put_item(
Item={
'CustomerId': row['customer_id'],
'TransactionId': row['trn_id'],
'TransactionDate': row['trn_date'],
'TransactionAmount': int(row['trn_amount'])})
except Exception as e:
print(e)
print("Unable to insert data into DynamoDB table".format(e))
# Finished!
return "%d transactions inserted" % rowCount
- 函数代码块的主要逻辑
根据触发的事件,从S3下载文件;
循环检索文件中的每一行;
将数据插入到DynamoDB的Customer表和Transcations表; - 添加触发器
在函数中添加触发器,选择S3,配置触发器:
Bucket:就是我们要上传transaction.txt文件的那个inputs3bucketfortransact;
Event Type:选择Object Created;
创建一个Lambda函数去计算交易总额,同时对高金额账户进行通知
- 用途:实时统计交易综合,并对超过1500¥的金额账户进行SNS通知
-
目标架构
-
函数配置参考
这里的角色是实验环境预先初始化的,需要有数据库和SNS等相关的访问权限
- 执行代码块
# TotalNotifier Lambda function
#
# This function is triggered when values are inserted into the Transactions DynamoDB table.
# Transaction totals are calculated and notifications are sent to SNS if limits are exceeded.
from __future__ import print_function
import json, boto3
# Connect to SNS
sns = boto3.client('sns')
alertTopic = 'HighBalanceAlert'
snsTopicArn = [t['TopicArn'] for t in sns.list_topics()['Topics'] if t['TopicArn'].endswith(':' + alertTopic)][0]
# Connect to DynamoDB
dynamodb = boto3.resource('dynamodb')
transactionTotalTableName = 'TransactionTotal'
transactionsTotalTable = dynamodb.Table(transactionTotalTableName);
# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):
# Show the incoming event in the debug log
print("Event received by Lambda function: " + json.dumps(event, indent=2))
# For each transaction added, calculate the new Transactions Total
for record in event['Records']:
customerId = record['dynamodb']['NewImage']['CustomerId']['S']
transactionAmount = int(record['dynamodb']['NewImage']['TransactionAmount']['N'])
# Update the customer's total in the TransactionTotal DynamoDB table
response = transactionsTotalTable.update_item(
Key={
'CustomerId': customerId
},
UpdateExpression="add accountBalance :val",
ExpressionAttributeValues={
':val': transactionAmount
},
ReturnValues="UPDATED_NEW"
)
# Retrieve the latest account balance
latestAccountBalance = response['Attributes']['accountBalance']
print("Latest account balance: " + format(latestAccountBalance))
# If balance > $1500, send a message to SNS
if latestAccountBalance >= 1500:
# Construct message to be sent
message = '{"customerID": "' + customerId + '", ' + '"accountBalance": "' + str(latestAccountBalance) + '"}'
print(message)
# Send message to SNS
sns.publish(
TopicArn=snsTopicArn,
Message=message,
Subject='Warning! Account balance is very high',
MessageStructure='raw'
)
# Finished!
return 'Successfully processed {} records.'.format(len(event['Records']))
- 代码块主要逻辑
- 连接SNS和DynamoDB
- 计算 transaction totals,同时将其保存在 TransactionTotal这个DynamoDB 表中;
- 当交易金额超过1500¥,通过SNS发送通知;
- 给DynamoDB添加触发器,配置信息如下
DynamoDB Table:Transactions
Start position:latest - 保存整个Lambda函数;
创建一个SNS Topic
- 用途:当交易金额超过1500¥这个设定值的时候,发起账户通知
-
关键业务流程如下图
- 创建一个SNS topic,名字“HighBalanceAlert”;
- 创建订阅:可以选择EMAIL或者SMS进行通知,这里我们可以选择自己的EMAIL或者手机号接收通知;
创建两个简单的SQS队列
- 用途:用于订阅SNS发出的信息,用于后续相关业务部门进行信息处理
-
关键业务架构流程
- 相关操作
- 选择Simple Queue Service,选择默认的队列就行,不需要高FIFO队列(部分国家和地区没有)
- 创建两个队列“CustomerNotify”“CreditCollection”
- 设置两个队列,设置订阅消息,选择“Subscribe Queues to SNS Topic”;
- 选择HighBalanceAlert这个SNS进行订阅
通过传递一个transaction文件来测试ServerLess架构
- 文件地址及内容
customer_id|customer_address|trn_id|trn_date|trn_amount
C1|1 Smith Street, London|T01|03/16/2017|100
C2|2 Smith Street, London|T02|03/16/2017|200
C2|2 Smith Street, London|T03|03/16/2017|50
C2|2 Smith Street, London|T04|03/16/2017|300
C2|2 Smith Street, London|T05|03/16/2017|100
C2|2 Smith Street, London|T06|03/16/2017|150
C2|2 Smith Street, London|T07|03/16/2017|400
C2|2 Smith Street, London|T08|03/16/2017|50
C2|2 Smith Street, London|T09|03/16/2017|50
C2|2 Smith Street, London|T10|03/16/2017|10
C2|2 Smith Street, London|T11|03/16/2017|10
C2|2 Smith Street, London|T12|03/16/2017|10
C2|2 Smith Street, London|T13|03/16/2017|20
C1|1 Smith Street, London|T14|03/16/2017|51
C1|1 Smith Street, London|T15|03/16/2017|25
C1|1 Smith Street, London|T16|03/16/2017|27
C1|1 Smith Street, London|T17|03/16/2017|29
C1|1 Smith Street, London|T18|03/16/2017|19
C1|1 Smith Street, London|T19|03/16/2017|33
C1|1 Smith Street, London|T20|03/16/2017|35
C1|1 Smith Street, London|T21|03/16/2017|39
C1|1 Smith Street, London|T22|03/16/2017|41
C1|1 Smith Street, London|T23|03/16/2017|199
C2|2 Smith Street, London|T24|03/16/2017|400
- 上传文件到S3
选择我们监控的S3的Bucket,理论上传递上去基本上会触发后续的操作。 - 检查文件逻辑是否正确
- 检查我们的S3存储内容
- 检查我们的DynamoDB的Transaction表和TransactionTotal表
- 查收我们的邮件和手机短信是否收到提醒;
- 检查我们创建用来存储SNS信息的SQS队列“CustomerNotify”“CreditCollection”;