记录一次ES数据库的迁移
起因是由于老库中的模板映射不是我们想要的模板,需要将数据迁移到新的ES集群中,新集群已经执行了我们想要的模板
PS:由于之前没有整过整过东西,所以,走了不少的弯路,研究好几种方法
Elasticsearch-Migration方法
elasticsearch-migration支持:多个版本间的数据迁移,使用scroll+bulk的接口原理
1.安装和使用
elasticsearch-migration支持linux,windows等不同系统,下载解压后可以直接运行
安装:
源码地址:https://github.com/medcl/esm
编译好的地址:https://github.com/medcl/elasticsearch-dump/releases
有各个版本可以选择
直接下载下来在选定目录解压即可
使用示例
./esm -s http://IP:9200 -d http://IP:9200/ -x 索引名称 -w=5 -b=10 -c 10000
主要几个参数:
索引名称可以使用*进行模糊匹配
-w 表示线程数
-b 表示一次bulk请求数据大小,单位MB默认 5M
-c 一次scroll请求数量
实际使用下来,在测试环境的话,对集群数据进行迁移,只需要将源和目标设置为两个集群的主节点就行了,
但是有两个集群的服务器的配置必须对等,如果服务器配置不均衡的话,会出现数据积压,导致程序中断。
还有这个工具是用GO编写的,在部分环境不支持的服务器上面还用不了,所以最终没有采用这种方法。
设置快照镜像导入的方法
这个方法使用于大数据量的迁移
首先,设置一个目录,这是一个备份的存储目录,放在源数据的服务器上
sudo mkdir /data/es/data_backup
# 该目录要是elasticsearch的启动用户可访问的,我们的环境一般是普通用户(如:centos)
chown -R centos:centos /data/es/data_backup
修改elasticsearch的配置文件 elasticsearch.yml
vi ../../elasticsearch.yml
#在文件最后添加一条
path.repo: ["/data/es/data_backup "]
!ps:这条语句一定要注意格式,:后面要接一个空格,字符用英文字符
然后,重启ES
借助kibana进行ES语句的操作
# 创建仓库,创建一个名为my_backup的仓库
PUT _snapshot/my_backup
{
"type": "fs",
"settings": {
"location": "/data/es/data_backup ",
"compress": true
}
}
# 检查仓库是否创建成功(my_backup为之前创建的仓库名称)
GET _snapshot/my_backup
# 删除仓库
DELETE _snapshot/my_backup
备份索引数据
# 给所有索引创建快照
PUT _snapshot/my_backup/snapshot_name
# 针对具体的index创建快照备份
# 其中my_backup是仓库名称,snapshot_name是快照的名字
# ignore_unavailable在创建快照的过程中会忽略不存在的索引,默认情况下,如果没有设置,在索引不存在的情况下快照请求将会失败
# include_global_state能够防止集群的全局状态被作为快照的一部分存储起来
# 多个索引间不要加空格,否则只会对第一个索引操作
PUT _snapshot/my_backup/snapshot_name?wait_for_completion=true
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false
}
# 查看指定仓库下所有快照
GET _snapshot/my_backup/_all
# 查看具体某一个快照的信息
GET _snapshot/my_backup/snapshot_name/_status
# 删除快照,要指定仓库名和快照名
# 也能用于取消一个正在进行的快照创建过程,会删除备份了一半的快照
DELETE _snapshot/my_backup/snapshot_name
然后开始进行数据迁移
在目标服务器上面重复1-4步,建立一个相同名称的目录,作为数据目录
将源集群的备份内容(/data/es/data_backup 里的所有文件),复制到迁移目标的仓库目录里,接下来就是类似批量导入了
# 如果索引已经存在目标的集群,需要先关闭索引,恢复数据后再开启
POST /index_name/_close
POST _snapshot/my_backup/snapshot_name/_restore
POST /index_name/_open
# 从快照中恢复指定的索引,并给索引重命名
# indices里面可以写多个索引名,中间用逗号隔开
# rename_pattern可以写完整的索引名,也可以用正则匹配索引名的局部
# rename_replacement将替换rename_pattern中匹配到的局部(如果是正则,不是将整个索引名都替换)
#下面这条语句会把index_1,index_2恢复为restored_index_1和restored_index_2
POST /_snapshot/my_backup/snapshot_1/_restore
{
"indices": "index_1,index_2",
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}
# 查看恢复的状态
GET _snapshot/my_backup/snapshot_name/_status
然后,对于集群数据的导入,因为某种特殊的原因,没有继续研究,故弃之,改用另外的手段
logstash数据迁移方法,这也是最终使用的方案
使用这种方法,对于数据量不是特别庞大的数据源还是比较适用的,因为一般的集群都是使用的ELK,这样也不需要其余操作
只需要改动一下logstash的配置就行了。logstash本身支持从各种源搬运数据到各种目标
#配置修改
input{
elasticsearch {
hosts => ["IP:9200","IP:9200"]
index => "*" # * 代表着所有索引,也可以指定特定的索引,把索引名称配置好就行了
size => 1000
}
}
output {
elasticsearch {
hosts => ["IP:9200","IP:9200"]
index => "%{[@metadata][_index]}"
#document_type => "%{[@metadata][_type]}" #这type和id这两项如果不是同时迁移多个不同的索引的情况下,建议直接指定
#document_id => "%{[@metadata][_id]}"
}
# stdout { codec => rubydebug }
}
对于一些特殊情况下的logstash数据迁移:
# 在源数据和目标数据之间需要做转换的
input{
elasticsearch {
hosts => "IP:9200"
index => "*"
query => '{"query": {"exists" : { "field" : "requestTime" }}}'
# 索引过滤语句
}
}
filter {
mutate {
rename => { "appId" => "s_appId" } #字段修改
convert => { "l_requestTime" => "integer"} #字段类型修改 (_mapping)
remove_field => ["@timestamp", "@version"] #过滤掉logstash 自己加上的字段
add_field =>["s_serverTime","2018-06-15"] #新增字段
}
ruby{
code=>"
time = Time.at(event.get('l_requestTime')/1000).strftime('%Y-%m-%d')
event.set('s_serverTimestamp', time.to_s)
"#索引时间指定
}
}
output {
elasticsearch {
hosts => "IP:9200"
document_type => "*"
index => "*"
}
# stdout { codec => rubydebug }
}