1:安装python elasticsearch包,pip install elasticsearch
2:安装MySQLdb
1) 从https://pypi.python.org/pypi/MySQL-python下载MySQLdb安装包
2) 解压下载的压缩包
3) 进入MySQLdb安装包目录
4) python setup.py build
5) python setup.py install
3:导入数据以case_fix_attrs表为例导入fixnum条数据。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import MySQLdb
import os
import hashlib
from elasticsearch import Elasticsearch
import elasticsearch.helpers
#DB parameter
strHost = 'xxx.xxx.xxx.xxx'
strDB = 'database'
strUser = 'user'
strPasswd = 'password'
table = 'table'
fixnum = 1000000
def seEncode(ustr, encoding='utf-8'):
'''''负责把入数据库的字符串,转化成utf-8编码'''
if ustr is None:
return ''
if isinstance(ustr, unicode):
return ustr.encode(encoding, 'ignore')
else:
return str(ustr)
#connect to DB
def getConnect(db=strDB, host=strHost, user=strUser, passwd=strPasswd, charset="utf8"):
return MySQLdb.connect(host=strHost, db=strDB, user=strUser, passwd=strPasswd, charset="utf8")
def initClientEncode(conn):
'''''mysql client encoding=utf8'''
curs = conn.cursor()
curs.execute("SET NAMES utf8")
conn.commit()
return curs
class MySQLQueryPagination(object):
def __init__(self,conn,numPerPage = 50000):
self.conn = conn
self.numPerPage = numPerPage
def queryForList(self,sql):
totalPageNum = self.__calTotalPages(sql)
#print totalPageNum
for pageIndex in range(totalPageNum):
yield self.__queryEachPage(sql,pageIndex)
def __createPaginaionQuerySql(self,sql,currentPageIndex):
startIndex = self.__calStartIndex(currentPageIndex)
qSql = r'select * from %s limit %s,%s' % (table,startIndex,self.numPerPage)
#print qSql
return qSql
def __queryEachPage(self,sql,currentPageIndex):
curs = initClientEncode(self.conn)
qSql = self.__createPaginaionQuerySql(sql, currentPageIndex)
curs.execute(qSql)
result = curs.fetchall()
curs.close()
return result
def __calStartIndex(self,currentPageIndex):
startIndex = currentPageIndex * self.numPerPage;
return startIndex;
def __calTotalRowsNum(self,sql):
''''' 计算总行数 '''
tSql = r'select count(*) from ' + table
curs = initClientEncode(self.conn)
curs.execute(tSql)
result = curs.fetchone()
curs.close()
totalRowsNum = 0
if result != None:
totalRowsNum = int(result[0])
return totalRowsNum
def __calTotalPages(self,sql):
''''' 计算总页数 '''
totalRowsNum = 0
if fixnum > 0:
totalRowsNum = fixnum
else:
totalRowsNum = self.__calTotalRowsNum(sql)
totalPages = 0;
if (totalRowsNum % self.numPerPage) == 0:
totalPages = totalRowsNum / self.numPerPage;
else:
totalPages = (totalRowsNum / self.numPerPage) + 1
return totalPages
def __calLastIndex(self, totalRows, totalPages,currentPageIndex):
'''''计算结束时候的索引'''
lastIndex = 0;
if totalRows < self.numPerPage:
lastIndex = totalRows;
elif ((totalRows % self.numPerPage == 0)
or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)) :
lastIndex = currentPageIndex * self.numPerPage
elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages): # 最后一页
lastIndex = totalRows
return lastIndex
def _parse_serialize_case_fix_attrs(self, row):
'''''解析序列化mysql case_fix_attrs表数据'''''
action= {
'_op_type': 'index',
'_index': strDB,
'_type': table,
'_source': {
"id" : row[0],
"title" : row[1],
"no" : row[2],
"remark" : row[3]
}
}
return action
if __name__ == '__main__':
conn = getConnect()
pag = MySQLQueryPagination(conn)
sql = r'SELECT * FROM `case_fix_attrs` WHERE id<%s'
#ES
es = Elasticsearch([{'host':'xxx.xxx.xxx.xxx','port':9200}])
for ret in pag.queryForList(sql):
actions = []
for row in ret:
action = pag._parse_serialize_case_fix_attrs(row)
actions.append(action)
elasticsearch.helpers.bulk( es, actions )
del actions[0:len(actions)]
conn.close()