最近解析了一个超大的xml,之间遇到很多坑,有写Java程序、spark程序,最后用Python处理的:
Java、spark、python处理XML速度对比:Python>spark>java
因为是XML,所以不能破坏标签的完整性,所以spark可以提交yarn后也只是用一个executors在跑,只不过用了多个cores,速度还是非常慢的(Java和spark用一个早晨没有跑完),期间还遇到了OOM问题,因为spark单个executors的内存大小在配置文件中是有限制的,所以会出现OOM,java大家知道的,首先要读取整个文件到内存中,前提是内存够,再加上中间处理结果的存放,使用内存远大于文件大小21G
数据样例:(数据较简单,原理一样)
<add overwrite="true" commitWithin="10000">
<doc><field name="id" ><![CDATA[286c9edd3f2721730a8cecdbfec94ee4X]]></field>
<field name="an-country" ><![CDATA[GR]]></field>
<field name="an" ><![CDATA[88100105]]></field>
<field name="an-kind" ><![CDATA[A]]></field>
<field name="pn-country" ><![CDATA[GR]]></field>
<field name="pn" ><![CDATA[880100105]]></field>
<field name="pn-kind" ><![CDATA[A]]></field>
<field name="ctfw-country" ><![CDATA[DE]]></field>
<field name="ctfw-num" ><![CDATA[DE2736069]]></field>
<field name="ctfw-kind" ><![CDATA[A1]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
</doc>
<doc><field name="id" ><![CDATA[caf2088f80da92f58c413d23d9cc8124X]]></field>
<field name="an-country" ><![CDATA[GR]]></field>
<field name="an" ><![CDATA[88100091]]></field>
<field name="an-kind" ><![CDATA[A]]></field>
<field name="pn-country" ><![CDATA[GR]]></field>
<field name="pn" ><![CDATA[880100091]]></field>
<field name="pn-kind" ><![CDATA[A]]></field>
<field name="ctfw-country" ><![CDATA[FR]]></field>
<field name="ctfw-country" ><![CDATA[GB]]></field>
<field name="ctfw-country" ><![CDATA[US]]></field>
<field name="ctfw-country" ><![CDATA[EP]]></field>
<field name="ctfw-country" ><![CDATA[EP]]></field>
<field name="ctfw-num" ><![CDATA[FR2585362]]></field>
<field name="ctfw-num" ><![CDATA[GB2141152]]></field>
<field name="ctfw-num" ><![CDATA[US4292035]]></field>
<field name="ctfw-num" ><![CDATA[EP0026529]]></field>
<field name="ctfw-num" ><![CDATA[EP0146289]]></field>
<field name="ctfw-kind" ><![CDATA[A1]]></field>
<field name="ctfw-kind" ><![CDATA[A]]></field>
<field name="ctfw-kind" ><![CDATA[A]]></field>
<field name="ctfw-kind" ><![CDATA[A1]]></field>
<field name="ctfw-kind" ><![CDATA[A2]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
<field name="srepphase" ><![CDATA[SEA]]></field>
</doc>
</add>
spark代码:
spark代码也是将整个文件加载到内存中,耗内存,解析速度慢
object ParseQuoteData1 {
def main(args: Array[String]): Unit = {
//构建sparksession
/val spark = SparkSession.builder
.master("local[1]")
.appName("Parse_xml").getOrCreate()
val sc = spark.sparkContext/
/val conf = new SparkConf().setAppName("quote_parse").setMaster("local[1]")
conf.set("spark.rdd.compress", "true")
val sc = new SparkContext(conf)/
val someXML = XML.loadFile(args(0))
val pubRef_len = (someXML \ "add" \ "doc" ).length
val file = args(1)
val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)))
// val array = new ArrayString
for(a <- 0 to pubRef_len - 1){
val quotedata = (someXML \ "add" \ "doc" )(a)
val fields = quotedata \ "field"
val fields_nature = quotedata \ "field" \"@name"
val quotList = new util.ArrayListString
for(b <- 0 to fields.length -1){
val k = fields_nature(b).text
val v = fields(b).text
val line = k + ":" + v
quotList.add(line)
}
val res = quotList.toString.replace("[", "").replace("]", "")
println(res)
writer.write(res + "\n")
}
writer.close()
}
}
Python代码:
python解析的原理非常符合处理大文件,即使超过50G也可以正常处理,而且速度很快,
解析原理:标签迭代,每次取出需要解析的一个标签,放到内存中解析,内存耗费非常小
-- coding:utf-8 --
from lxml import etree
import time
def fast_iter(context,*args, **kwargs):
"""
读取xml数据,并释放空间
context: etree.iterparse生成的迭代器
"""
# 打开文件
with open('data/result.txt', 'a') as f:
"""
event:事件
elem:元素
"""
# 处理xml数据
for event, elem in context:
list = []
for e in elem:
# 获取标签属性值,获取标签值
s1 = e.get("name") + ":" + e.text
# print(e.get("name") + ":" + e.text)
list = list + [s1]
# 替换list的【】,变为一个 ,分隔的字符串
res = str(list).replace("[", "").replace("]", "").replace("'", "")
f.write(res) # 写入
f.write('\n')
# 重置元素,清空元素内部数据
elem.clear()
# 选取当前节点的所有先辈(父、祖父等)节点,以及当前节点本身
for ftag in elem.xpath('doc'):
# 如果当前节点还有前一个兄弟,则删除父节点的第一个子节点。getprevious():返回当前节点的前一个兄弟或None。
while ftag.getprevious() is not None:
# 删除父节点的第一个子节点,getparent():返回当前节点的父元素或根元素或None。
del ftag.getparent()[0]
# 释放内存
del context
def process_element(elem):
"""
处理element
:params elem: Element
"""
# 储存基因列表
gene_list = []
for i in elem.xpath('add'):
# 获取基因名字
gene = i.text
# 添加到列表
gene_list.append(gene)
print('gene', gene_list)
if name == 'main':
print('start', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
start = time.time()
# 需要处理的文件路径
infile = r'data/patent_info_cited__GR_cited_Thread.xml'
# 通过迭代读取xml,带命名空间的要加上命名空间
# context = etree.iterparse(infile, events=('end',), encoding='UTF-8', tag='{http://uniprot.org/uniprot}doc')
context = etree.iterparse(infile, events=('end',), encoding='UTF-8', tag='doc')
# 快速读取xml数据
fast_iter(context,process_element)
print('stop', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
print('time', time.time() - start)