1.背景:群发消息,需要批量用户信息,运营通过excel格式提供。
2.技术栈:angularjs2+coreUI,nodejs,springCloud,springboot。
3.流程:
一 . input type="file"上传文件(可以采用ng2-file-upload,后台配合分段上传),此项目单个大文件上传,赶工期。
html代码:
<input (change)="onFileChanged($event.target.files)"
type="file" id="excelfile1" name="excelfile1" accept=".xls,.xlsx">
ts代码:
onFileChanged(files){
if (files.length > 0) {
this.file = files[0];
}
console.log(this.file);
}
二 . angularjs http模块上传,代码如下
上传业务逻辑
uploadExcel(){
if(!this.file) {
alert("请选择文件!");
return ;
}
let formData: FormData = new FormData();
formData.append(encodeURI(this.file.name), this.file, encodeURI(this.file.name));
this._restService.testpostOps(url+"?fileName="+encodeURI(this.file.name)+"&fileLen="+this.file.size, formData,
new RequestOptions({
headers:new Headers({
// "Content-Type":"multipart/form-data",
"Accept": "application/json"
})
}))
.subscribe(
(res: WrapperResult) => this.doUploadRes(res),
err => alert("错误")
)
}
http发送模块
testpostOps(url: string, body: Object, options: RequestOptions): Observable<Object> {
return this.http.post(url, body, options)
.map(this.extractData)
// .timeout(10)
.catch(this.handleError);
}
注意content-type文件上传后面是要带边界码的,http协议博大精深啊。
三 .node.js转发模块 不好意思 写这个的时候还在技术实现阶段还没加入node.js。
四 .springboot微服物接受请求并处理
因为单个大文件传输,又因为Cloud中的网关限制了连接时间和读取时间
zuul.host.socket-timeout-millis=40000
ribbon.ConnectTimeout=40000
ribbon.ReadTimeout=40000
所以并没有走网关,直连服务。能看到对于特殊数据的处理,复杂的网络结构是个问题,最好在架构设计之初就能考虑到非结构化大数据的处理。
五 .接受代码实现。
接受文件,编码问题最好注意下
接口
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@Path("/xxxx")
WrapperResult<ExcelFileInfo> uploadExcel(FormDataMultiPart formParams, @QueryParam("fileName")String fileName,
@QueryParam("fileLen")String fileLen);
接口实现
public WrapperResult<ExcelFileInfo> uploadExcel(FormDataMultiPart formParams, String fileName,String fileLen) {
String key = null;
try {
String fieldName = URLEncoder.encode(fileName, "UTF-8");
FormDataBodyPart filePart = formParams.getField(fieldName.replace("+","%20"));
if (null == filePart) {
logger.info("filePart is null, field name should be storefile");
return new WrapperResult<>(WrapperResultStatus.FAILD.status(), "文件为空", null);
}
key = oSSService.saveExcelUp(filePart.getValueAs(InputStream.class),fileLen);
uploadExcelAsync.uploadExcel(key,fileLen);
}catch (Exception e){
e.printStackTrace();
}
return new WrapperResult<ExcelFileInfo>(WrapperResultStatus.SUCCESS.status(),"success", new ExcelFileInfo(fileName,key));
}
springboot集成jersey做restful接口服务,自我感觉没有官方支持还是最好不要这样做,技术团队有信息的可以试试。
文件放在aliyun oss里面。
六 .批量插入数据库实现。
批量插数据,采用异步后台处理 @Async 注释 注意@EnableAsync开启支持
@Async
public void uploadExcel(String key,String fileLen) {
logger.info("uploadExcel thread start.");
logger.info(platformTransactionManager.getClass().getName());
InputStream ins = oSSService.readExcel(key);
batchIntoDatabaseTx.parseExcelAndBatchIntoDatabaseByPoiEventMode(ins);
logger.info("uploadExcel thread end.");
}
事物支持 注意事物的实现原理 私有方法是不起作用的 我被同一问题绊倒了两次
@Transactional
public void parseExcelAndBatchIntoDatabaseByPoiEventMode(InputStream ins){
try {
parseUserIdFromXlsx.process(ins);
}catch (Exception e){
}finally {
try{
if(ins!=null){
ins.close();
ins=null;
}
}catch (IOException e){
}
}
}
eventmode模式读入数据并且由SAX处理(excel底层数据结构是XML形式的)
private void processSheet(
InputStream stream, ParseUserIdSaxHandler parseUserIdSaxHandler,
ReadOnlySharedStringsTable readOnlySharedStringsTable, StylesTable styleTable)
throws ParserConfigurationException, SAXException, IOException {
XMLReader sheetParser = SAXHelper.newXMLReader();
ContentHandler handler = new ExcelXSSFSheetXMLHandler(
styleTable,null,readOnlySharedStringsTable,parseUserIdSaxHandler,new DataFormatter(),false);
sheetParser.setContentHandler(handler);
sheetParser.parse(new InputSource(stream));
}
public void process(InputStream ins) throws IOException, OpenXML4JException, SAXException, ParserConfigurationException {
OPCPackage p = OPCPackage.open(ins);
ReadOnlySharedStringsTable readOnlySharedStringsTable = new ReadOnlySharedStringsTable(p);
XSSFReader xssfReader = new XSSFReader(p);
StylesTable styleTable = xssfReader.getStylesTable();
Iterator<InputStream> iterator = xssfReader.getSheetsData();
while(iterator.hasNext()){
InputStream stream = iterator.next();
processSheet(stream,new ParseUserIdSaxHandler(),readOnlySharedStringsTable,styleTable);
stream.close();
}
}
有SAX经验的可以自己写,或者用官方提供的XSSFSheetXMLHandler配合定义的接口SheetContentsHandler实现解析。
我的SAXhandler,我也是实现了接口SheetContentsHandler,但是为了有个endDocument()接口,我稍微改了下XSSFSheetXMLHandler变成ExcelXSSFSheetXMLHandler,应该还有多接口继承等更好的方法,尝试了下感觉脑子很混乱,就简单的来了。
private class ParseUserIdSaxHandler implements ExcelXSSFSheetXMLHandler.SheetContentsHandler {
private boolean firstCellOfRow = false;
private int num = 0;
private List<MsgPushDetail> msgPushDetailList = new ArrayList<MsgPushDetail>();
// private int currentRow = -1;
// private int currentCol = -1;
@Override
public void startRow(int rowNum) {
firstCellOfRow = true;
// currentRow = rowNum;
// currentCol = -1;
}
@Override
public void endRow(int rowNum) {
}
@Override
public void cell(String cellReference, String formattedValue, XSSFComment xssfComment) {
if (firstCellOfRow) {
firstCellOfRow = false;
if(!formattedValue.matches("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$")){
fileFormatFlag = false;
}
if(!fileFormatCheck){
BatchInto(formattedValue);
}
} else {
fileFormatFlag = false;
}
// gracefully handle missing CellRef here in a similar way as XSSFCell does
// if(cellReference == null) {
// cellReference = new CellAddress(currentRow, currentCol).formatAsString();
// }
//
// currentCol = (new CellReference(cellReference)).getCol();
}
@Override
public void headerFooter(String s, boolean b, String s1) {
}
@Override
public void endDocument() {
batchIntoExcelMapper.batchIntoExcel(msgPushDetailList);
System.out.println("最后剩下" + num + "条记录插入");
dataClear();
}
private void BatchInto(String userId){
msgPushDetailList.add(new MsgPushDetail(null,null,userId,null));
num++;
if(num >= 10000){
batchIntoExcelMapper.batchIntoExcel(msgPushDetailList);
dataClear();
}
}
private void dataClear(){
msgPushDetailList.clear();
num = 0;
}
}
批量插入数据层用的是mybatis,用的是insert into table(xxx) values(),()的批量形式,(ps:写数字纯属我没把配置抽出去,简单实现功能).
sql语句,mybatis 接口注解script方式。
@Insert({"<script>",
"insert into xxx(xxx) values",
"<foreach item='msgPushDetail' index='index' collection='msgPushDetailList' open='(' separator='),(' close=')'>",
"#{msgPushDetail.xxx,jdbcType=VARCHAR}",
"</foreach>",
"</script>"})
int batchIntoExcel(@Param("msgPushDetailList") List<MsgPushDetail> msgPushDetailList);
4.反思:用excel传大量数据处理本身就不是很靠谱,基本大量数据还是采用数据库存储,代码异步多线程批量处理这种方法比较好。大文件上传最好用多段且可以断点续传的形式保证可靠性。海量数据处理注意内存和效率的平衡。
参考内容:
https://poi.apache.org/spreadsheet/how-to.html#XSSF+and+SAX+%28Event+API%29
http://svn.apache.org/repos/asf/poi/trunk/src/examples/src/org/apache/poi/xssf/eventusermodel/examples/FromHowTo.java