实验4:通过AWS的管理服务实现一个Serverless架构

Implementing a Serverless Architecture with AWS Managed Services

实验目标

实验目标

  1. 通过AWS的管理服务实现一个serverless的架构;
  2. 通过S3和DynamoDB触发AWS的lamdba函数;

实验目标架构

实验目标架构

关键业务流程场景

  1. 创建一个交易文件transaction.txt到S3的bucket中;
  2. 这个会触发lambda的函数,执行数据库操作,将文件内容逐条分析,并插入到DynamoDB的两张表中;
  3. 如果表中的数据金额超过1500¥,那么会触发一个SNS操作进行通知
  4. 这个SNS通知操作,会触发数据保存到SQS队列中,供后续信用卡部门通知客户

操作步骤

创建一个Lambda函数去处理银行交易文件

  1. 用途:用于处理传送到S3的交易文件
  2. 架构示意图


    处理交易文件
  3. 关键操作流程
  • 关键配置


    函数关键配置

    同时设置函数的timeout时间为20 sec;

  • 函数代码块
# TransactionProcessor Lambda function
#
# This function is triggered by an object being created in an Amazon S3 bucket.
# The file is downloaded and each line is inserted into DynamoDB tables.

from __future__ import print_function
import json, urllib, boto3, csv

# Connect to S3 and DynamoDB
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')

# Connect to the DynamoDB tables
customerTable     = dynamodb.Table('Customer');
transactionsTable = dynamodb.Table('Transactions');

# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):

  # Show the incoming event in the debug log
  print("Event received by Lambda function: " + json.dumps(event, indent=2))

  # Get the bucket and object key from the Event
  bucket = event['Records'][0]['s3']['bucket']['name']
  key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
  localFilename = '/tmp/transactions.txt'

  # Download the file from S3 to the local filesystem
  try:
    s3.meta.client.download_file(bucket, key, localFilename)
  except Exception as e:
    print(e)
    print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
    raise e

  # Read the Transactions CSV file. Delimiter is the '|' character
  with open(localFilename) as csvfile:
    reader = csv.DictReader(csvfile, delimiter='|')

    # Read each row in the file
    rowCount = 0
    for row in reader:
      rowCount += 1

      # Show the row in the debug log
      print(row['customer_id'], row['customer_address'], row['trn_id'], row['trn_date'], row['trn_amount'])

      try:
        # Insert Customer ID and Address into Customer DynamoDB table
        customerTable.put_item(
          Item={
            'CustomerId': row['customer_id'],
            'Address':  row['customer_address']})

        # Insert transaction details into Transactions DynamoDB table
        transactionsTable.put_item(
          Item={
            'CustomerId':    row['customer_id'],
            'TransactionId':   row['trn_id'],
            'TransactionDate':  row['trn_date'],
            'TransactionAmount': int(row['trn_amount'])})

      except Exception as e:
         print(e)
         print("Unable to insert data into DynamoDB table".format(e))

    # Finished!
    return "%d transactions inserted" % rowCount
  • 函数代码块的主要逻辑
    根据触发的事件,从S3下载文件;
    循环检索文件中的每一行;
    将数据插入到DynamoDB的Customer表和Transcations表;
  • 添加触发器
    在函数中添加触发器,选择S3,配置触发器:
    Bucket:就是我们要上传transaction.txt文件的那个inputs3bucketfortransact;
    Event Type:选择Object Created;

创建一个Lambda函数去计算交易总额,同时对高金额账户进行通知

  1. 用途:实时统计交易综合,并对超过1500¥的金额账户进行SNS通知
  2. 目标架构


    交易提醒流程
  3. 函数配置参考


    函数配置参考

    这里的角色是实验环境预先初始化的,需要有数据库和SNS等相关的访问权限

  4. 执行代码块
# TotalNotifier Lambda function
#
# This function is triggered when values are inserted into the Transactions DynamoDB table.
# Transaction totals are calculated and notifications are sent to SNS if limits are exceeded.

from __future__ import print_function
import json, boto3

# Connect to SNS
sns = boto3.client('sns')
alertTopic = 'HighBalanceAlert'
snsTopicArn = [t['TopicArn'] for t in sns.list_topics()['Topics'] if t['TopicArn'].endswith(':' + alertTopic)][0]

# Connect to DynamoDB
dynamodb = boto3.resource('dynamodb')
transactionTotalTableName = 'TransactionTotal'
transactionsTotalTable = dynamodb.Table(transactionTotalTableName);

# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):

  # Show the incoming event in the debug log
  print("Event received by Lambda function: " + json.dumps(event, indent=2))

  # For each transaction added, calculate the new Transactions Total
  for record in event['Records']:
    customerId = record['dynamodb']['NewImage']['CustomerId']['S']
    transactionAmount = int(record['dynamodb']['NewImage']['TransactionAmount']['N'])

    # Update the customer's total in the TransactionTotal DynamoDB table
    response = transactionsTotalTable.update_item(
      Key={
        'CustomerId': customerId
      },
      UpdateExpression="add accountBalance :val",
      ExpressionAttributeValues={
        ':val': transactionAmount
      },
      ReturnValues="UPDATED_NEW"
    )

    # Retrieve the latest account balance
    latestAccountBalance = response['Attributes']['accountBalance']
    print("Latest account balance: " + format(latestAccountBalance))

    # If balance > $1500, send a message to SNS
    if latestAccountBalance >= 1500:

      # Construct message to be sent
      message = '{"customerID": "' + customerId + '", ' + '"accountBalance": "' + str(latestAccountBalance) + '"}'
      print(message)

      # Send message to SNS
      sns.publish(
        TopicArn=snsTopicArn,
        Message=message,
        Subject='Warning! Account balance is very high',
        MessageStructure='raw'
      )

  # Finished!
  return 'Successfully processed {} records.'.format(len(event['Records']))
  1. 代码块主要逻辑
  • 连接SNS和DynamoDB
  • 计算 transaction totals,同时将其保存在 TransactionTotal这个DynamoDB 表中;
  • 当交易金额超过1500¥,通过SNS发送通知;
  1. 给DynamoDB添加触发器,配置信息如下
    DynamoDB Table:Transactions
    Start position:latest
  2. 保存整个Lambda函数;

创建一个SNS Topic

  1. 用途:当交易金额超过1500¥这个设定值的时候,发起账户通知
  2. 关键业务流程如下图


    高金额交易通知
  3. 创建一个SNS topic,名字“HighBalanceAlert”;
  4. 创建订阅:可以选择EMAIL或者SMS进行通知,这里我们可以选择自己的EMAIL或者手机号接收通知;

创建两个简单的SQS队列

  1. 用途:用于订阅SNS发出的信息,用于后续相关业务部门进行信息处理
  2. 关键业务架构流程


    关键业务流程
  3. 相关操作
  • 选择Simple Queue Service,选择默认的队列就行,不需要高FIFO队列(部分国家和地区没有)
  • 创建两个队列“CustomerNotify”“CreditCollection”
  • 设置两个队列,设置订阅消息,选择“Subscribe Queues to SNS Topic”;
  • 选择HighBalanceAlert这个SNS进行订阅

通过传递一个transaction文件来测试ServerLess架构

  1. 文件地址及内容
customer_id|customer_address|trn_id|trn_date|trn_amount
C1|1 Smith Street, London|T01|03/16/2017|100
C2|2 Smith Street, London|T02|03/16/2017|200
C2|2 Smith Street, London|T03|03/16/2017|50
C2|2 Smith Street, London|T04|03/16/2017|300
C2|2 Smith Street, London|T05|03/16/2017|100
C2|2 Smith Street, London|T06|03/16/2017|150
C2|2 Smith Street, London|T07|03/16/2017|400
C2|2 Smith Street, London|T08|03/16/2017|50
C2|2 Smith Street, London|T09|03/16/2017|50
C2|2 Smith Street, London|T10|03/16/2017|10
C2|2 Smith Street, London|T11|03/16/2017|10
C2|2 Smith Street, London|T12|03/16/2017|10
C2|2 Smith Street, London|T13|03/16/2017|20
C1|1 Smith Street, London|T14|03/16/2017|51
C1|1 Smith Street, London|T15|03/16/2017|25
C1|1 Smith Street, London|T16|03/16/2017|27
C1|1 Smith Street, London|T17|03/16/2017|29
C1|1 Smith Street, London|T18|03/16/2017|19
C1|1 Smith Street, London|T19|03/16/2017|33
C1|1 Smith Street, London|T20|03/16/2017|35
C1|1 Smith Street, London|T21|03/16/2017|39
C1|1 Smith Street, London|T22|03/16/2017|41
C1|1 Smith Street, London|T23|03/16/2017|199
C2|2 Smith Street, London|T24|03/16/2017|400
  1. 上传文件到S3
    选择我们监控的S3的Bucket,理论上传递上去基本上会触发后续的操作。
  2. 检查文件逻辑是否正确
  • 检查我们的S3存储内容
  • 检查我们的DynamoDB的Transaction表和TransactionTotal表
  • 查收我们的邮件和手机短信是否收到提醒;
  • 检查我们创建用来存储SNS信息的SQS队列“CustomerNotify”“CreditCollection”;
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容

  • https://aws.amazon.com/cn/s3/faqs/#sia_anchor Amazon Simp...
    守望者_1065阅读 8,220评论 0 5
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,597评论 18 139
  • 在前一部分我们提到使用Docker部署应用同时采用12factors原则时,服务器对于容器来讲,只依赖它的计算资源...
    iambowen阅读 6,068评论 0 12
  • 2017.12.30 莉子 一天又一天一周过去了,今天总算是下班早了一点,可以好好进行一下本周的梳理。 1.收获 ...
    莉子_ac68阅读 214评论 0 0
  • 《一个人的朝圣》里有句话说“你还以为走路是世界上最简单的事情呢,这些原本是本能的事情实际做起来有多困难。”她继续说...
    痴人一念阅读 620评论 6 24