使用Golang在数秒内读取16GB的文件

By Ohm Patel

当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,而且只用于分析和故障解决目的。因此,组织倾向于将其存储在文件中,这些文件驻留在本地磁盘存储中。

我们将使用Golang从16 GB的.txt或.log文件中提取数百万行日志。

Lets Code…! 开始编码...!

让我们先打开文件。我们将使用标准的Go os.File用于任何文件IO。

 f, err := os.Open(fileName) if err != nil {
       fmt.Println("cannot able to read the file", err)
       return
}// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦文件被打开,我们有以下两个选项继续进行

  1. 逐行读取文件,这有助于减少对内存的压力,但将花费更多的时间在IO。
  2. 一次将整个文件读入内存并处理该文件,这会消耗更多内存,但会显著增加时间。

当文件太大时,比如16GB,我们无法将整个文件加载到内存中。但是第一个选项对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧…!在将整个文件加载到内存时,我们将使用bufio.NewReader()块加载文件,在Go中可用。

r := bufio.NewReader(f)for {buf := make([]byte,4*1024) //the chunk sizen, err := r.Read(buf) //loading chunk into buffer
       buf = buf[:n]if n == 0 {
       
         if err != nil {
           fmt.Println(err)
           break
         }
         if err == io.EOF {
           break
         }
         return err
      }
}

一旦我们有了数据块,我们将fork一个线程,即Go例程,来与其他数据块并发地处理每个数据块。以上代码将更改为-

//sync pools to reuse the memory and decrease the preassure on //Garbage CollectorlinesPool := sync.Pool{New: func() interface{} {
            lines := make([]byte, 500*1024)
            return lines
    }}stringPool := sync.Pool{New: func() interface{} {
              lines := ""
              return lines
    }}slicePool := sync.Pool{New: func() interface{} {
               lines := make([]string, 100)
               return lines
    }}r := bufio.NewReader(f)var wg sync.WaitGroup //wait group to keep track off all threadsfor {
         
         buf := linesPool.Get().([]byte)
         n, err := r.Read(buf)
         buf = buf[:n]if n == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
         }nextUntillNewline, err := r.ReadBytes('\n')//read entire line
         
         if err != io.EOF {
             buf = append(buf, nextUntillNewline...)
         }
         
         wg.Add(1)
         go func() { 
          
            //process each chunk concurrently
            //start -> log start time, end -> log end time
            
            ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)wg.Done()
         
         }()
    }wg.Wait()}

上面的代码引入了两个新的优化:-

  1. sync.Pool是一个强大的实例池,可以重用它来减少垃圾收集器的压力。我们将重新使用分配给各个片的内存。它帮助我们减少内存消耗,使我们的工作速度显著加快。
  2. 帮助我们并行处理缓冲区块的 Go Routines ,大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理日志行,这些日志行是这种格式的

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志。

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {//another wait group to process every chunk further                             
          var wg2 sync.WaitGrouplogs := stringPool.Get().(string)logs = string(chunk)linesPool.Put(chunk) //put back the chunk in pool//split the string by "\n", so that we have slice of logs
          logsSlice := strings.Split(logs, "\n")stringPool.Put(logs) //put back the string poolchunkSize := 100 //process the bunch of 100 logs in threadn := len(logsSlice)noOfThread := n / chunkSizeif n%chunkSize != 0 { //check for overflow 
             noOfThread++
          }length := len(logsSlice)//traverse the chunk
         for i := 0; i < length; i += chunkSize {
             
             wg2.Add(1)//process each chunk in saperate chunk
             go func(s int, e int) {
                for i:= s; i<e;i++{
                   text := logsSlice[i]if len(text) == 0 {
                      continue
                   }
               
                logParts := strings.SplitN(text, ",", 2)
                logCreationTimeString := logParts[0]
                logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)if err != nil {
                     fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                     return
                }// check if log's timestamp is inbetween our desired period
              if logCreationTime.After(start) && logCreationTime.Before(end) {
              
                fmt.Println(text)
               }
            }
            textSlice = nil
            wg2.Done()
         
         }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
       //passing the indexes for processing}  
       wg2.Wait() //wait for a chunk to finish
       logsSlice = nil}

上面的代码使用16GB的日志文件进行基准测试。

提取日志所需的时间约为25秒。

下面是整个项目的代码.

func main() {
    
        s := time.Now()
        args := os.Args[1:]
        if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
            fmt.Println("Please give proper command line arguments")
            return
        }
        startTimeArg := args[1]
        finishTimeArg := args[3]
        fileName := args[5]
    
        file, err := os.Open(fileName)
        
        if err != nil {
            fmt.Println("cannot able to read the file", err)
            return
        }
        
        defer file.Close() //close after checking err
        
        queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
        if err != nil {
            fmt.Println("Could not able to parse the start time", startTimeArg)
            return
        }
    
        queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
        if err != nil {
            fmt.Println("Could not able to parse the finish time", finishTimeArg)
            return
        }
    
        filestat, err := file.Stat()
        if err != nil {
            fmt.Println("Could not able to get the file stat")
            return
        }
    
        fileSize := filestat.Size()
        offset := fileSize - 1
        lastLineSize := 0
    
        for {
            b := make([]byte, 1)
            n, err := file.ReadAt(b, offset)
            if err != nil {
                fmt.Println("Error reading file ", err)
                break
            }
            char := string(b[0])
            if char == "\n" {
                break
            }
            offset--
            lastLineSize += n
        }
    
        lastLine := make([]byte, lastLineSize)
        _, err = file.ReadAt(lastLine, offset+1)
    
        if err != nil {
            fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
            return
        }
    
        logSlice := strings.SplitN(string(lastLine), ",", 2)
        logCreationTimeString := logSlice[0]
    
        lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
        if err != nil {
            fmt.Println("can not able to parse time : ", err)
        }
    
        if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
            Process(file, queryStartTime, queryFinishTime)
        }
    
        fmt.Println("\nTime taken - ", time.Since(s))
    }
    
    func Process(f *os.File, start time.Time, end time.Time) error {
    
        linesPool := sync.Pool{New: func() interface{} {
            lines := make([]byte, 250*1024)
            return lines
        }}
    
        stringPool := sync.Pool{New: func() interface{} {
            lines := ""
            return lines
        }}
    
        r := bufio.NewReader(f)
    
        var wg sync.WaitGroup
    
        for {
            buf := linesPool.Get().([]byte)
    
            n, err := r.Read(buf)
            buf = buf[:n]
    
            if n == 0 {
                if err != nil {
                    fmt.Println(err)
                    break
                }
                if err == io.EOF {
                    break
                }
                return err
            }
    
            nextUntillNewline, err := r.ReadBytes('\n')
    
            if err != io.EOF {
                buf = append(buf, nextUntillNewline...)
            }
    
            wg.Add(1)
            go func() {
                ProcessChunk(buf, &linesPool, &stringPool, start, end)
                wg.Done()
            }()
    
        }
    
        wg.Wait()
        return nil
    }
    
    func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
    
        var wg2 sync.WaitGroup
    
        logs := stringPool.Get().(string)
        logs = string(chunk)
    
        linesPool.Put(chunk)
    
        logsSlice := strings.Split(logs, "\n")
    
        stringPool.Put(logs)
    
        chunkSize := 300
        n := len(logsSlice)
        noOfThread := n / chunkSize
    
        if n%chunkSize != 0 {
            noOfThread++
        }
    
        for i := 0; i < (noOfThread); i++ {
    
            wg2.Add(1)
            go func(s int, e int) {
                defer wg2.Done() //to avaoid deadlocks
                for i := s; i < e; i++ {
                    text := logsSlice[i]
                    if len(text) == 0 {
                        continue
                    }
                    logSlice := strings.SplitN(text, ",", 2)
                    logCreationTimeString := logSlice[0]
    
                    logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
                    if err != nil {
                        fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
                        return
                    }
    
                    if logCreationTime.After(start) && logCreationTime.Before(end) {
                        //fmt.Println(text)
                    }
                }
                
    
            }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
        }
    
        wg2.Wait()
        logsSlice = nil
    }

你可以通过ohm.patel1997@gmail.com联系我。

任何疑问和改进是最受欢迎的。😉

你也可以发表评论下面进一步怀疑和赞扬总是受欢迎的。😁🙈

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容