Dynamodb
Amazon DynamoDB 是一种完全托管的 NoSQL 数据库服务,提供快速而可预测的性能,能够实现无缝扩展。使用 DynamoDB,您可以免除操作和扩展分布式数据库的管理工作负担,因而无需担心硬件预置、设置和配置、复制、软件修补或集群扩展等问题。
Dynamodb是亚马逊AWS推出的一款数据库存储产品。如下所示,亚马逊AWS的产品还真是丰富无边。
最近由于公司的技术调整,有些数据库迁移到亚马逊了,同时还有S3服务也启用了。
我也只好学了一些相关的知识,下面,我将自己常用到的一些东西记录一下。
用python连接Dynamodb的服务
你可以使用 AWS 管理控制台与 DynamoDB 交互,也可以通过编程,但是编程访问需要获取相应的秘钥许可。
Boto is the Amazon Web Services (AWS) SDK for Python, which allows Python developers to write software that makes use of Amazon services like S3 and EC2. Boto provides an easy to use, object-oriented API as well as low-level direct service access.
由上可知,python编程是使用boto包来操纵dynamodb的,如今最新版本是 Boto3.
- 安装boto3
pip install boto3
- 导入boto3
import boto3
- 获取aws的接入ID和秘钥
每一个用户名都可以申请唯一的id和秘钥,用于编程访问dynamodb,具体请参考:指南
一般都是下载到csv文件,然后在编程的时候引入即可,形式如下:
self.AWS_ACCESS_ID = 'AKIAI6xxxxxxxOGDQ7A'
self.AWS_ACCESS_KEY = 'hQ7xxxxLOD2Jxxxxxxx5148TM0mxxxxMR'
由于这是私钥,只要知道上面的这两串内容,便可在互联网上任何位置访问目的dynamodb数据库,所以不可轻易示人,特别是带有write权限的账号。中间已被我用x代替一部分。
创建服务
import boto3
from boto3 import Session
from boto3.dynamodb.conditions import Attr, Key
class DynamoDBService:
def __init__(self):
self.this_day = datetime.date.today()
# 这两个key像是账号和密码一般,需要在后台申请导出,唯一的
self.AWS_ACCESS_ID = 'AKIxxxAI6KxxxxxxxFGOxxxxDQ7A'
self.AWS_ACCESS_KEY = 'hQ7FxxxxxxxxxxxxxuMcxxJxrxxxxxxm53UoxxMR'
def get_service(self, table_name):
"""将service单独拿出来的目的,我为了初始化类的时候不会那么慢"""
client = boto3.client('dynamodb', region_name='us-west-2',
aws_access_key_id=self.AWS_ACCESS_ID,
aws_secret_access_key=self.AWS_ACCESS_KEY)
dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
aws_access_key_id=self.AWS_ACCESS_ID,
aws_secret_access_key=self.AWS_ACCESS_KEY)
# 通过dynamodb服务获取目标table的操作对象
table_handle = dynamodb.Table(table_name)
return table_handle
你只需要将秘钥换成自己的,然后像这样:
table_name="h5_visit_Info"
table_handle_h5_visit_info = self.get_service(table_name)
便获取了目标数据表的控制权,接下来就可以调用相应的方法了。
query
def operate_table(self, table_name="h5_visit_Info"):
# 通过dynamodb服务获取目标table的操作对象
table_handle_h5_visit_info = self.get_service(table_name)
"""查询,根据某一key(column)查询"""
response = table_handle_h5_visit_info.query(
KeyConditionExpression=Key('uid').eq('f3d61094c65a42489d0e54d4c30b7e6f')
)
print response
# response中包含了很多内容,response本身是个json字符串,其Items键的内容才是table中的内容
print type(response)
items = response['Items']
print items
print json.dumps(items)
上面是一个query方法,用来查询特定的记录,其参数很复杂,但常用的也就是条件表达式KeyConditionExpression,需要引入key:
from boto3.dynamodb.conditions import Key
很显然,Key('uid').eq('f3d61094c65a42489d0e54d4c30b7e6f')
的意思便是uid='f3d61094c65a42489d0e54d4c30b7e6f'
,类似的表达还有gt、lt、gtq、between等,详见:Key
scan
下面详细说一下scan,scan函数就是扫描,将扫描整个表:
response = table_handle.scan(
FilterExpression=Attr('first_name').begins_with('J') & Attr('account_type').eq('super_user')
)
items = response['Items']
print(items)
如上就是一个简单的用法,但是,aws每次只允许获取1M上限的结果,如果你的条件产生的结果超出了1M,那么你就得分页。同样,分页的参数也是放在scan里面的:ExclusiveStartKey
我要扫描的表名称叫做:h5_visit_Info
我想要获取整张表,
def get_table_info(self, table_name="h5_visit_Info"):
"""
对表进行分页扫描,这里尝试的是对visit_info表进行扫描
"""
# 通过dynamodb服务获取目标table的操作对象
table_handle = self.get_service(table_name)
# 查询表的内容,利用scan分页,先将所有数据拿出来再说
table_info = DataFrame()
# 这个值是在分页查询的时候,用来记录页面的最后一个主键的下一个,以方便下一个页面的开启
last_evaluated_key = None
i = 0
while True:
# 刚开始,不需要传入startkey
if last_evaluated_key is None:
response = table_handle.scan()
else:
# 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid
try:
response = table_handle.scan(ExclusiveStartKey=last_evaluated_key)
except:
break
# response 有一个标准的json格式,包含了这次scan结果的各种信息
# print type(response) #字典
# 获取结果中的内容,其中键Items的值就是表的内容
# 首先提取info,要对info里面的内容进行进一步解析出来,info里面的也是一个json格式,所以可以直接转为json
info = [json.loads(item['info']) for item in response['Items']]
info = DataFrame(info)
# 从新设置index,方便后面合并contact
info.index = range(0, len(info))
# print info.head(5)
# 获取基本信息,包含创建时间和uid
basic = [[str(item['createTime']), item['uid']] for item in response['Items']]
# 这里好绕,本来以为item是一个字典的,结果没法用字典的函数来执行,只能如此了
basic = DataFrame(basic)
basic.columns = [['createTime', 'uid']]
basic.index = range(0, len(basic))
# print basic.head(5)
# 合并表
basic_info = pd.concat([info, basic], axis=1)
print len(basic_info)
# 下面是将Unix时间戳转化成日期,目标lvt是以毫秒来计算的,所以要先除以1000才能带入函数utcfromtimestamp
basic_info['lvt'] = basic_info['lvt'].map(
lambda time_stamp: datetime.datetime.utcfromtimestamp(long(time_stamp) / 1000).strftime("%Y-%m-%d"))
basic_info['createTime'] = basic_info['createTime'].map(
lambda time_stamp: datetime.datetime.utcfromtimestamp(long(time_stamp) / 1000).strftime("%Y-%m-%d"))
# 这里其实有数据需要清洗的,比如vt里面有空值,要将vt转化成数字才能筛选出vt大于等于2的
# basic_info['vt'] = basic_info['vt'].astype(long)
basic_info = basic_info[basic_info['vt'] >= 2]
print len(basic_info)
print "----" * 30, i
i += 1
table_info = table_info.append(basic_info)
# 判断有没有这个LastEvaluatedKey,如果有,那么还有页面没有拉取玩,将这个值带入request的ExclusiveStartKey中,继续读取页面
if response.has_key('LastEvaluatedKey'):
last_evaluated_key = response['LastEvaluatedKey']
print last_evaluated_key
continue
else:
break
print len(table_info)
table_info.to_excel('visit_info_{}.xlsx'.format(self.this_day))
pass
当然,上面的程序还有存在bug,待我下一次更新