背景
多线程写入文件,要考虑线程同步问题,实现数据完整落盘磁盘备份。
操作系统:
win10:没问题
centos7:有问题
public static void writeFileLock(String content, String filePath) {
File file = new File(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
raf.seek(raf.length());
raf.write(content.getBytes());
fileLock.release();
fileChannel.close();
raf.close();
} catch (Exception e) {
log.error("写文件异常", e);
log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
}
}
RandomAccessFile建立文件连接符,raf获取文件管道,文件管道获取文件锁,tryLock方法有两个特点:第一、非阻塞,调用后立刻返回;第二、没拿到锁可能返回null,也可以能抛出异常,所以if判断循环获取,异常块捕获异常再重新尝试获取锁,注意Thread.sleep(0)的作用并不是睡0秒,而是马上加入到可执行队列,等待cpu的时间分片。
这段代码承载线上的kafka多线程备份消息的任务,用lock协调多线程的写入同步,埋点监控发现,备份数据偶发遗漏,大概2.3亿数据,会有5条偏差,就是漏了。
下面记录压测思路及过程。
准备
压测代码:
private static final ExecutorService FILE_THREADS = Executors.newFixedThreadPool(100);
public void execute(String... strings) throws Exception {
int cnt = 100 * 100 * 100;
int idx = 1;
long begin = 1574305200000L;
while (idx <= cnt) {
Map<String, Object> map = new HashMap<>();
map.put("id", idx);
map.put("time", begin);
String timeDirectory = DateUtil.getBeforeOneHour("yyyyMMddHHmm", 8, begin);
String mm = DateUtil.getBeforeOneHour("mm", 0, begin).concat(".txt");
String json = JsonUtil.getJosnString(map).concat(System.getProperty("line.separator"));
FILE_THREADS.execute(new PersistThread(timeDirectory, mm , json));
if (idx % 10000 == 0) {
begin += 60000L;
}
idx++;
}
}
private class PersistThread extends Thread {
String time;
String filename;
String content;
PersistThread(String time, String filename, String content) {
this.time = time;
this.filename = filename;
this.content = content;
}
@Override
public void run() {
String folder = "/data/job_project/txt/" + time + "/";
FileUtil.createDirectory(folder);
FileUtil.writeFileIO(content, folder + filename);
}
}
创建100个线程的线程池,提交写入文件Thread任务,实现多线程写入文件,且文件目录、文件是动态创建的(模拟线上),id每自增1万创建一个时间戳目录,格式是:yyyyMMddHHmm,在目录下创建一个文件,写入1万行数据,相当于100个线程,动态写入100个目录下的100个文件中,每个文件写入1万行。
首先怀疑创建目录和文件:
代码如下:
public static File createDirectory(String path) {
File file = new File(path);
if (!file.exists() && !file.isDirectory()) {
file.mkdirs();
}
return file;
}
public static File createFile(String file) {
File f = null;
try {
f = new File(file);
if (!f.exists()) {
f.createNewFile();
}
} catch (Exception e) {
e.printStackTrace();
}
return f;
}
创建目录和文件,逻辑都是先检查再创建,显然不是原子的,所以怀疑有没有可能是多线程环境中,目录重复创建导致,所以把代码优化成两次判断的同步方式,如下:
public static File createDirectory(String path) {
File file = new File(path);
if (!file.exists() && !file.isDirectory()) {
synchronized (FileUtil.class) {
if (!file.exists() && !file.isDirectory()) {
file.mkdirs();
}
}
}
return file;
}
public static File createFile(String file) {
File f = null;
try {
f = new File(file);
if (!f.exists()) {
synchronized (FileUtil.class) {
if (!f.exists()) {
f.createNewFile();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return f;
}
压入100w数据,观察结果,大失所望:
/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982
事实是绝大部分文件都漏了,下面把所有的目录和文件全部规划好,再试。
规划目录脚本:
#!/bin/sh
txt=/data/job_project/txt/*
for folder in $txt;do
filename=${folder##*/}
if [[ $filename = "f.sh" ]] || [[ $filename = "search.sh" ]];then
echo "$filename is a shell file"
else
filename=${filename:10}
filepath=${folder}/${filename}.txt
#rm -f $filepath
#touch $filepath
lines=$(wc -l ${filepath} | awk '{print $1}')
if [ $lines -ne 10000 ];then
echo "$filepath lines: $lines"
fi
fi
done
结果仍然会漏数据。
为了彻底屏蔽创建目录和文件带来的影响,下面的压测前都创建好了文件和目录。
使用RandomAccessFile的rws方式同步写入文件。
测试结果:
/data/job_project/txt/201911211101/01.txt lines: 9998
/data/job_project/txt/201911211106/06.txt lines: 9999
/data/job_project/txt/201911211107/07.txt lines: 9999
/data/job_project/txt/201911211109/09.txt lines: 9999
/data/job_project/txt/201911211112/12.txt lines: 9999
/data/job_project/txt/201911211116/16.txt lines: 9998
/data/job_project/txt/201911211119/19.txt lines: 9999
/data/job_project/txt/201911211120/20.txt lines: 9998
...
压测过程十分缓慢,写入性能非常差,但是结果震惊,仍然漏了,仔细看了官网api注解:
* <p>The <tt>"rwd"</tt> mode can be used to reduce the number of I/O
* operations performed. Using <tt>"rwd"</tt> only requires updates to the
* file's content to be written to storage; using <tt>"rws"</tt> requires
* updates to both the file's content and its metadata to be written, which
* generally requires at least one more low-level I/O operation.
*
* <p>If there is a security manager, its {@code checkRead} method is
* called with the pathname of the {@code file} argument as its
* argument to see if read access to the file is allowed. If the mode
* allows writing, the security manager's {@code checkWrite} method is
* also called with the path argument to see if write access to the file is
* allowed.
rwd模式同步文件内容,rws模式同步文件内容和文件元数据,压测首选当然选择更严格的rws,结果仍然遗漏,此时已经开始怀疑jdk源码了。
调整close顺序,校验lock
第一处改动:
if (fileLock != null) {
break;
}
多加一层校验,改成
if (fileLock != null && fileLock.isValid()) {
break;
}
第二处改动:
fileLock.release();
fileChannel.close();
raf.close();
调整close顺寻,改成:
fileLock.release();
raf.close();
fileChannel.close();
测试结果:
/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982
...
结果显示,反而漏了更多数据,此时已经自闭了,但是还要接着撸。
使用channel写入缓冲区
public static void writeFileLock(String content, String filePath, String time) {
File file = createFile(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null && fileLock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
fileChannel.write(ByteBuffer.wrap(content.getBytes()), fileChannel.size());
fileLock.release();
raf.close();
fileChannel.close();
} catch (Exception e) {
log.error("写文件异常", e);
log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
}
}
改变写入方式,用nio的管道channel写入数据,结果仍然失望。
日志埋点——使用redis计数器
埋点代码:
public static void writeFileLock(String content, String filePath, String time) {
File file = createFile(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
redisHelper.incr("filelock0:".concat(time));
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null && fileLock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
redisHelper.incr("filelock1:".concat(time));
raf.seek(raf.length());
redisHelper.incr("filelock2:".concat(time));
raf.write(content.getBytes());
redisHelper.incr("filelock3:".concat(time));
fileLock.release();
redisHelper.incr("filelock4:".concat(time));
raf.close();
redisHelper.incr("filelock5:".concat(time));
fileChannel.close();
redisHelper.incr("filelock6:".concat(time));
} catch (Exception e) {
log.error("写文件异常", e);
log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
}
}
此时对这段代码彻底失望,得找到数据在哪个位置漏掉的,所以使用了redis计数器,incr是线程安全得,所以能够很快发现到底哪里出问题了,问题马上浮出水面,心中窃喜。
再说明一下:redis的key包含目录名称,即一个目录一个文件一个key,埋点的密集显示出来必胜的信心。
结果是所有key的value都是完美的10000,毫无破绽,心如死灰,于是有同事提议,搞个反查,看看RangdomAccessFile的指针到底有没有更新。
判断RandomAccessFile的文件指针,是不是有没更新指针的情况
long filelength = raf.length();
raf.seek(filelength);
raf.write(content.getBytes());
if(filelength == raf.length()){
log.error ( "errorrrrrrrrrrrrr: "+ content);
}
如果write方法没有写入文件,那么文件指针必然没有更新,调用write后再反查文件指针是否更新,就能判断write是否有写入。结果仍然失望,预期的日志没有打印,说明write确实更新了文件指针,但是就是漏掉了几行数据,结合上述redis计数器埋点和文件指针判断,压测已经走进了死胡同,所有的情况都试过了,至少可以说两点:第一、文件锁没有问题,锁的线程没有逃逸出while循环;第二、测试的每一行代码都执行了到位了,没有哪一行没有执行的。百思不得其解,那就下班,次日再战。
java.io包+可重入锁的方式
昨天的压测可以说把所有情况都试过了,还有试过lock阻塞方式,fileChannel方式写入缓冲区,此处不表。今天决定换个思路,拒绝花里胡哨,就用jdk1.0版本的java.io包+ReentrantLock可重入锁的方式写,代码如下:
public static void writeSyncFile(String content, String filePath) {
try {
fileLock.lock();
File file = createFile(filePath);
FileWriter fw = new FileWriter(file, true);
BufferedWriter bw = new BufferedWriter(fw);
bw.write(content);
bw.flush();
fw.close();
bw.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
fileLock.unlock();
}
}
结果可想而知,每个目录的每个文件,都是完美的10000行,且由于使用了缓冲区,文件写入效率大幅提升,具体提升幅度没有严格计算,使用同步块的方式+写入buffer的方式大概2分钟就能写完,而使用上述方式可能要1小时以上,效率杠杠的。普通的文件io方式没有问题,于是同事提议,用FileOutputStream替代RandomAccessFile看看。
替换RandomAccessFile,使用FileOutputStream获取channel
决定抛弃RandomAccessFile,使用FileOutputStream获取channel,代码如下:
public static void writeFileIO(String content, String path) {
FileLock lock = null;
try {
FileChannel channel = new FileOutputStream(path, true).getChannel();
while (true) {
try {
lock = channel.lock();
if (lock != null && lock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(10);
}
}
channel.write(ByteBuffer.wrap(content.getBytes()));
lock.release();
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
RandomAccessFile是任意读写的类,而FileOutputStream没有这个功能,要想追加写入文件末尾,在构造方法加个true就行,同样能实现我们想要的功能,第一次压测后,3分钟就出结果,100w数据压入100个文件,每个文件10000行,与预期结果完全相符,完美!乘胜追亚,再压1000w发现数据有误,结果是oom,压入的数据全部写入线程池的阻塞队列中了,于是调大内存到6g,还是如此,奈何机器资源有限,改压400w,结果数据与预期完全符合,此时水落石出,没有想到坑在RandomAccessFile这里,回过头来看这个类,虽然这个类的注释已经被看烂了,比较诡异的是jdk1.0就出的,但是作者未知,可能怕被人喷,嘿嘿嘿。
总结
1、代码不是复制粘特,光搜索谷歌百度,往往很多噪音。
2、高并发场景要多次严格压测,保证数据质量。
3、千万区分windows系统和linux系统,二者的文件系统完全不同,上述代码在windows完全没问题,但是linux就是状况百出。
4、怀疑精神,代码都是人写的,就会有bug,测试用例覆盖所有场景,测试各种可能性。