java实现日志文件自定义断点续传

30岁了,一直做游戏开发,最近想去做电商之类,却发现好难。一切都限于没有电商经验,挺对自己抱不平的,游戏开发早些年的技术能力是优于Web开发的,但这几年游戏行业的技术却没什么大的创新,不过,业务驱动技术的增长嘛,业务没变,那技术就自然这样了。
但是,Web技术也没啥,最近学了一大圈,发现都离不开游戏里面早几百年都玩过的概念。
负载均衡->游戏多服;人数一达限制,由玩家选择一个新的服。
消息队列->游戏内部事件系统;模块解耦,防止阻止玩家线程的解决办法嘛;
Redis缓存之类->内存缓存;玩家数据缓存在内存,降低数据库IO;
而这些系统独立出去,又离不开网络IO,文件IO的各种封装,这有啥啊,我们做游戏的早就对其各种把玩了。
哎,无可奈何,心之叹息;

场景

由于要对接某数据平台,而数据平台提供的上报数据的方式中,也只有Filebeat加Logstash之类,这两种工具在给运维增加部署复杂度的同时,也难以自定义其对错误的处理方式。
并且,Filebeat此类,当初始启动的时候,会瞬间读取大量的日志数据,同时传给Logstash解析,这种方式会在某一瞬间增加服务器的压力。而我们的服务器场景,是对Cpu以及网络及其敏感的,如果部署到正式服务器,可能会对线上情况造成不稳定性。
另一方面,使用第三方工具难以对流程控制,比如,某些数据格式解析错误,导致数据平台返回异常,这种情况,难以直接控制停止解析,或者做其他兼容。
综合上面提到的点,自己实现一套反而来的比较直接与简单。

工程路径

[github地址](https://github.com/yourwafer/logbeat) https://github.com/yourwafer/logbeat

日志解析与断点重传 LogTask

File file = new File(path);
        if (!file.exists()) {
            // 找不到文件,则认定今天日志已经消费完成
            logPosition.updateTime(lastExecute);
            logPosition.setPosition(-1);
            save.accept(logPosition);

            return path;
        }

        try {
            // 使用只读的方式打开文件
            this.randomAccessFile = new RandomAccessFile(path, "r");
            this.filePath = path;
        } catch (FileNotFoundException e) {
            // 文件暂时不存在,添加一个-1的标记,用于标识由于日志不存在而没有继续解析
            log.debug("日志[{}]不存在", path);
            logPosition.updateTime(lastExecute);
            logPosition.setPosition(-1);
            save.accept(logPosition);

            return path;
        }
        // 更新为是第一次读取文件,标记日志读取位置为0
        LocalDate pre = logPosition.getLastExecute();
        if (!pre.equals(lastExecute)) {
            logPosition.updateTime(lastExecute);
        }
        if (logPosition.getPosition() < 0) {
            logPosition.setPosition(0);
        }

开始解析处理

// 拿到日志上次解析的时间
LocalDate lastExecute = logPosition.getLastExecute();
        LocalDate now = LocalDate.now();
        // 如果是前一天
        for (; !lastExecute.isAfter(now); lastExecute = lastExecute.plusDays(1)) {
            // 最外层控制当前任务运行状态
            if (!running) {
                log.info("任务终止[" + this + "]");
                return;
            }
            // 初始化日志路径以及文件句柄
            String path = initAndClosePreFile(lastExecute);
            if (path != null) {
                log.info("日志文件不存在,忽视[{}]", path);
                continue;
            }
            // 拿到上次解析的文件位置
            long position = logPosition.getPosition();
            if (position < 0) {
                log.error("异常逻辑代码,文件[{}][{}]", this.filePath, position);
                break;
            }
            int read = -1;
            try {
                // 重置文件解析位置,这里是核心
                randomAccessFile.seek(position);
                log.info("重置位置[{}],开始解析处理文件[{}],", filePath, position);
            } catch (IOException e) {
                log.error("文件位置异常[{}]", position, e);
                break;
            }
            // 这里可以理解为网络的粘包和拆包,由于一次性读取的数据可能不是一个完整的数据行,此时需要将数据缓存起来
            //与下一次读取的数据组合起来,构成一个完整的数据行
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE * 2);
            do {
                if (!running) {
                    log.info("任务终止[" + this + "]");
                    return;
                }
                // 初始化buffer,这一行其实逻辑上是不需要的,但是为了断点方便,看内存数据,可以去掉的
                Arrays.fill(buffer, (byte) 0);
                long curFilePosition = 0;
                long startTime = System.nanoTime();
                try {
                    // 保留文件句柄读取前的位置,并读取数据
                    curFilePosition = randomAccessFile.getFilePointer();
                    read = randomAccessFile.read(buffer);
                } catch (IOException e) {
                    log.error("读取日志数据异常[{}]", filePath, e);
                }
                ReportUtils.readBytes(read, (System.nanoTime() - startTime));
                if (read == -1) {
                    break;
                }
                int start = 0;
                int cur;

                long newPosition = logPosition.getPosition();
                List<String> lines = new ArrayList<>();
                for (int i = 0; i < read; ++i) {
                    byte b = buffer[i];
                    // 判断是否为换行符,如果是,那么代表读取到了一行
                    // \n\r
                    if (b != N && b != R) {
                        continue;
                    }
                    cur = i;

                    int size = cur - start;
                    if (byteBuffer.position() > 0) {
                        size += byteBuffer.position();
                    }
                    if (size == 0) {
                        // 排除当前字节(因为是换行符)
                        start = i + 1;
                        continue;
                    }
                    String line;
                    if (byteBuffer.position() > 0) {
                        // 判断是否有上一个包遗留的字节数据,如果有,跟此次数据合并组合起来
                        byteBuffer.put(buffer, start, (cur - start));
                        byteBuffer.flip();
                        byte[] bytes = new byte[byteBuffer.remaining()];
                        byteBuffer.get(bytes);
                        line = new String(bytes, StandardCharsets.UTF_8);
                        byteBuffer.clear();
                    } else {
                        line = new String(buffer, start, (cur - start), StandardCharsets.UTF_8);
                    }

                    start = i + 1;
                    // 解析到一行
                    lines.add(line);

                    int newPos = start;
                    if ((i + 1) < read) {
                        // 忽视下一个空格,windows下一般\r\n都是组合用的
                        if (buffer[i + 1] == N || buffer[i + 1] == R) {
                            newPos += 1;
                        }
                    }
                    // 记录新的位置
                    newPosition = curFilePosition + newPos;

                    if (!running) {
                        log.info("任务终止[" + this + "]");
                        return;
                    }
                }
                // 交给消费者消费
                lineConsumer.accept(lines);

                logPosition.setPosition(newPosition);
                logPosition.addRow(lines.size());
                log.trace("变更文件位置[{}][{}]", logPosition.getPosition(), filePath);
                save.accept(logPosition);

                if (start < read) {
                    byteBuffer.put(buffer, start, read - start);
                }
            } while (read > 0);
            if (byteBuffer.position() > 0 && !lastExecute.equals(now)) {
                // 如果当前文件日期不是今天,那么代表文件已经读取到最后,此时数据又没有换行符,说明日志记录那边的问题,但是此时,依然要把最后的数据做解析处理
                byteBuffer.flip();
                int remaining = byteBuffer.remaining();
                byte[] remainBytes = new byte[remaining];
                byteBuffer.get(remainBytes);
                String line = new String(remainBytes, StandardCharsets.UTF_8);
                log.info("[{}]日志文件[{}]最后一行[{}]没有换行符", filePath, remaining, line);
                lineConsumer.accept(Collections.singletonList(line));
                logPosition.addRow(1);
                save.accept(logPosition);
            }
        }

数据消费者

使用配置的方式确定激活哪种消费方式
暂时实现的方式有,控制台,文件,按大小拆分的文件,http

@Configuration(proxyBeanMethods = false)
public class PushConfiguration {

    public final static String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "xa.config.pushType.console", havingValue = "true")
    @Order(Ordered.HIGHEST_PRECEDENCE)
    ConsoleEventPush console() {
        return new ConsoleEventPush();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "xa.config.pushType.fileDebug", havingValue = "true")
    @Order(Ordered.HIGHEST_PRECEDENCE)
    FileDebugEventPush fileDebug() {
        return new FileDebugEventPush();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "xa.config.pushType.file", havingValue = "true")
    @Order(Ordered.HIGHEST_PRECEDENCE)
    FileEventPush file() {
        return new FileEventPush();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "xa.config.pushType.http", havingValue = "true")
    @Order(Ordered.HIGHEST_PRECEDENCE)
    HttpEventPush http() {
        return new HttpEventPush();
    }

    @Bean
    @ConditionalOnMissingBean(EventPush.class)
    @Order
    ConsoleEventPush consoleDefault() {
        return new ConsoleEventPush();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335