一种高效的大型CSV文件行数统计方法
郝伟 2022/06/01

1 简介

CSV是一种常用的数据格式,所有数据以行为单位,以逗号为分隔符,组织存放在文件中。因此,对于总记录数,其行数非常重要。然而,在在统计大文件的时候,尤其是文件大小超过内存容量时,工具无法一次性加载,从而无法处理过大的文件。例如,对于ZD的数据,最大的达到500GB(总行数为52,107,551)单个文件无法加载处理。或者也有某些工具(如pandas的DataFrame)支持分块加载,但是通常效率较低,为解决CSV文件行数统计问题,本文介绍了一种使用Java实现的CSV行数的统计方法,经实际工程数据结果显示,在单机处理6.5TB的数据时,仅用时6.2小时,平均处理速度达到 297MB/s。

2 原理

由于一些值本身含有换行符,所以CSV格式为了区分,使用一对引号将带有换行符的内容包起来,例如 "a\nb" 表示 ab 两个字符中有个换行符,属于值的一部分,不是CSV中数据的行,因此在进行解决时,必需区分换行符是在引号中还是引号外。
本方法在处理时,通过流式读取,逐字符进行处理。通过定义一个布尔型变量 inquotes 表示当前字符的状态,如果遇到引号,就反转 inquotes。在遇到换行符时,根据 inquotes 的状态进行累计,即只有 inquotesfalse 时,才进行累加,具有参见源代码第81-94行。

3 大数据量时的执行方法

在本文结束给出了源代码。由于数据量比较大时,需要有专门的执行方法,具体如下:

3.1 生成全路径

具体细节可以参考 Linux脚本生成全目录文件信息 介绍的方法,以目标文件夹 /data/udisk/ 为例,生成目标文件夹所有文件的全路径命令如下。

$ find /data/udisk/ -iname '*\.csv' > /root/filelist.txt

3.2 使用 screen 工具执行

为了保证长时间执行不断开,可以使用 screen 命令建立可断开窗口:

$ screen -S count_lines

3.3 执行任务脚本

假定源代码存储在 /root/java_code/CsvCounter.java 文件中,则通过以下命令对 /data/udisk 中的所有文件夹和子文件夹中的 CSV 文件进行行数统计。其中,输入文件列表文件为 /root/filelist.txt,输出的结果保存在 /root/result_20220608.txt 中。

$ cd /root/java_code
$ javac CsvCounter.java
$ cat /root/filelist.txt | xargs -I {} java CsvCounter {} >> /root/result_20220608.txt

3.4 断开与恢复连接

4 执行结果

本文是以知道创宇的7.2TB数据为分析目标,测量结果 ZDCY数据总行数分析 所示。

5 源代码

以下是Java源代码,使用 Java 11 运行。

import java.io.File;
import java.io.RandomAccessFile;

public class CsvCounter {

        public static void main(String[] args) throws Exception {
                if (args.length == 0) {
                        String file = "C:\\data\\simple1.csv";
                        file = "C:\\data\\wmap_all_16_17_au.csv";
                        count_lines(file);
                        return;
                }

                File fin = new File(args[0]);
                if (!fin.exists()) {
                        System.out.printf("The input file '%s' does not exist.\n", fin.getAbsolutePath());
                        return;
                }

                if (fin.isFile()) {
                        count_lines(args[0]);
                } else if (fin.isDirectory()) {
                        for (String subfile : fin.list()) {
                                String fullpath = args[0] + "/" + subfile;
                                if (new File(fullpath).isFile()) {
                                        count_lines(fullpath);
                                }
                        }
                }

        }

        public static long count_lines(String filepath) throws Exception {
                return count_lines(filepath, 100 * 1024);
        }


        /**
         * 统计CSV文件中的总行数(去除了值中的换行符)
         * @param filepath 输入文件路径。
         * @param size 缓冲区大小。
         * @return
         * @throws Exception
         */
        public static long count_lines(String filepath, int size) throws Exception {

                /****************************
                 * Preparing Process
                 **********************************/
                byte[] buffer = new byte[size];
                int line_count = 0;
                long newline_pos = -1;
                boolean inquotes = false;
                char cur = '_'; // 当前的是什么不重要,因为第一次读取后就会被新值更新
                long processTime = 0; // 统计总处理时间
                long readingTime = 0; // 统计总读取时间
                long absStartPos = 0; // 每次开始的绝对位置,每次完成读取后更新位置。
                int last_readcount = 0;

                // start processing
                long start_time = System.currentTimeMillis();
                RandomAccessFile infile = new RandomAccessFile(filepath, "r");
                long filesize = infile.length();
                /****************************
                 * Looping Process
                 **********************************/
                while (true) {
                        // Reading bytes from file
                        absStartPos += last_readcount;
                        // 读取文件, 统计读取时间,并累加到 reading 上
                        long t0 = System.currentTimeMillis();
                        last_readcount = infile.read(buffer);
                        readingTime += System.currentTimeMillis() - t0;

                        // end loop if no more byte read
                        if (last_readcount <= 0)
                                break;


                        // 统计换行符
                        for (int i = 0; i < last_readcount; i++) {
                                cur = (char) buffer[i];
                                // 遇到引号就反转状态
                                if (cur == '"') {
                                        inquotes = !inquotes;
                                }

                                if (cur == '\n' && !inquotes) {
                                        line_count++;
                                        newline_pos = absStartPos + i;
                                        // System.out.println("line pos: " + newline_pos);
                                }
                        }
                }

                /****************************
                 * PostProcess
                 **********************************/
                if (newline_pos < infile.length() - 1)
                        line_count++;
                infile.close();

                // 打印统计结果,包括处理时间和速度
                processTime = System.currentTimeMillis() - start_time;
                double size_gb = 1.0 * filesize / 1024/1024/1024;
                double proc_s = processTime / 1000.0;
                double proc_speed = size_gb / proc_s; // GB/s
                double read_s = readingTime / 1000.0;
                double read_speed = size_gb / read_s;    // GB/s 
                StringBuilder sb  = new StringBuilder();
                sb.append(filepath + ", "); // filepath 
                sb.append(String.format("size=%.2fGB, lines=%d, ", size_gb, line_count));
                sb.append(String.format("total=%.2fs:%.2fGB/s, ", proc_s, proc_speed));
                sb.append(String.format("read=%.2fs:%.2fGB/s\n", read_s, read_speed));
                System.out.println(sb.toString());
                
                return line_count;
        }
}
// 以下是一些测试结果
// v1: /data/udisk/firstData/data/AU/xmap_au.csv, size=80GB, lines=24375815, readingTime=58.8s, speed=1.38GB/s
// v2: /data/udisk/firstData/data/AU/xmap_au.csv, filesize=81.20GB, lines=24375815, readingTime=61.5s, speed=1.32GB/s
// v3: /data/udisk/firstData/data/AU/xmap_au.csv, size=81.20GB, lines=24375815, total=409.8s:101.51GB/s, read=0.8s:0.20GB/s
// v4: /data/udisk/firstData/data/AU/xmap_au.csv, size=79.30GB, lines=24375815, total=363.5s:0.22GB/s, read=61.6s:1.29GB/s
//     /data/udisk/firstData/data/AU/xmap_au.csv, size=79.30GB, lines=24375815, total=252.0s:0.31GB/s, read=49.7s:1.59GB/s