1,ES的存储结构了解
在ES中,存储结构主要有四种,与传统的关系型数据库对比如下:
index(Indices)相当于一个database
type相当于一个table
document相当于一个row
properties(Fields)相当于一个column
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
2,ES写入测试
写入一个文档(一条数据)
PUT http://192.168.1.32:9200/twitter/tweet/377827236
{
"tweet_id": "555555555555555555555666",
"user_screen_name": "kanazawa_mj",
"tweet": "blog3444444",
"user_id": "377827236",
"id": 214019
}
我们看到path:/twitter/tweet/377827236包含三部分信息:
名字 | 说明 |
---|---|
索引名 | |
tweet | 类型名 |
377827236 | 这个员工的ID |
3,ES查询测试
查询一个文档,包含love,返回50条数据,采用展开的json格式
GET http://192.168.1.32:9200/twitter/tweet/_search?q=tweet:love&size=50&pretty=true
{
"took" : 20,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 11639,
"max_score" : 8.448289,
"hits" : [
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "AV0fnFOX6PBTXc6mRjpL",
"_score" : 8.448289,
"_source" : {
"tweet_id" : "843105177913757697",
"user_screen_name" : "jessicapalapal",
"tweet" : "Love, love, love ",
"user_id" : "740434015",
"id" : 474551
}
},
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "AV0fni__6PBTXc6mSeyR",
"_score" : 8.436986,
"_source" : {
"tweet_id" : "695096306763583488",
"user_screen_name" : "SampsonMariel",
"tweet" : "Love love love^_^ #ALDUB29thWeeksary",
"user_id" : "2483556636",
"id" : 723297
}
},
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "AV0fmxvV6PBTXc6mQ8Mb",
"_score" : 8.425938,
"_source" : {
"tweet_id" : "835676311637086209",
"user_screen_name" : "thedaveywavey",
"tweet" : "Love is love is love is love. ",
"user_id" : "17191297",
"id" : 311967
}
}
]
}
}
4,ES批量写入测试
- 写入程序,编写Python脚本,生产者和消费者模式,从Mysql数据库读取数据,1000条数据写入一次ES
- 本机环境,Windows,内存占用100M,CPU占用15%
- ES服务,Ubuntu14.04,CPU占用5%,内存较少
- 单进程,5个写入线程,100万行数据,500秒
- 单进程,20个写入线程,100万行数据,500秒
- 补充:据说,修改ES配置,先关闭数据索引,可以提高数据写入速度,尚未测试
5,下一步计划
- ES数据分片机制、搜索参数配置(mapping、filter)等,尚需要根据项目需求,深入学习和测试。
- ES支持的额外功能,例如时间范围搜索、中文简繁体、拼音搜索、GIS位置搜索、英文时态支持等。
6,参考资料
ES的存储结构介绍
https://es.xiaoleilu.com/010_Intro/25_Tutorial_Indexing.html
python操作Elasticsearch
http://www.cnblogs.com/yxpblog/p/5141738.html
Elasticsearch权威指南 - 检索文档
https://es.xiaoleilu.com/010_Intro/30_Tutorial_Search.html
7,附件(Python写入ES代码)
# coding=utf-8
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import time
import argparse
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# ES索引和Type名称
INDEX_NAME = "twitter"
TYPE_NAME = "tweet"
# ES操作工具类
class es_tool():
# 类初始化函数
def __init__(self, hosts, timeout):
self.es = Elasticsearch(hosts, timeout=5000)
pass
# 将数据存储到es中
def set_data(self, fields_data=[], index_name=INDEX_NAME, doc_type_name=TYPE_NAME):
# 创建ACTIONS
ACTIONS = []
# print "es set_data length",len(fields_data)
for fields in fields_data:
# print "fields", fields
# print fields[1]
action = {
"_index": index_name,
"_type": doc_type_name,
"_source": {
"id": fields[0],
"tweet_id": fields[1],
"user_id": fields[2],
"user_screen_name": fields[3],
"tweet": fields[4]
}
}
ACTIONS.append(action)
# print "len ACTIONS", len(ACTIONS)
# 批量处理
success, _ = bulk(self.es, ACTIONS, index=index_name, raise_on_error=True)
print('Performed %d actions' % success)
# 读取参数
def read_args():
parser = argparse.ArgumentParser(description="Search Elastic Engine")
parser.add_argument("-i", dest="input_file", action="store", help="input file1", required=False, default="./data.txt")
# parser.add_argument("-o", dest="output_file", action="store", help="output file", required=True)
return parser.parse_args()
# 初始化es,设置mapping
def init_es(hosts=[], timeout=5000, index_name=INDEX_NAME, doc_type_name=TYPE_NAME):
es = Elasticsearch(hosts, timeout=5000)
my_mapping = {
TYPE_NAME: {
"properties": {
"id": {
"type": "string"
},
"tweet_id": {
"type": "string"
},
"user_id": {
"type": "string"
},
"user_screen_name": {
"type": "string"
},
"tweet": {
"type": "string"
}
}
}
}
try:
# 先销毁,后创建Index和mapping
delete_index = es.indices.delete(index=index_name) # {u'acknowledged': True}
create_index = es.indices.create(index=index_name) # {u'acknowledged': True}
mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name,
body=my_mapping) # {u'acknowledged': True}
if delete_index["acknowledged"] != True or create_index["acknowledged"] != True or mapping_index["acknowledged"] != True:
print "Index creation failed..."
except Exception, e:
print "set_mapping except", e
# 主函数
if __name__ == '__main__':
# args = read_args()
# 初始化es环境
init_es(hosts=["192.168.1.32:9200"], timeout=5000)
# 创建es类
es = es_tool(hosts=["192.168.1.32:9200"], timeout=5000)
# 执行写入操作
tweet_list = [("111","222","333","444","555"), ("11","22","33","44","55")]
es.set_data(tweet_list)