一种高效的大型CSV文件行数统计方法
郝伟 2022/06/01
CSV是一种常用的数据格式,所有数据以行为单位,以逗号为分隔符,组织存放在文件中。因此,对于总记录数,其行数非常重要。然而,在在统计大文件的时候,尤其是文件大小超过内存容量时,工具无法一次性加载,从而无法处理过大的文件。例如,对于ZD的数据,最大的达到500GB(总行数为52,107,551)单个文件无法加载处理。或者也有某些工具(如pandas的DataFrame)支持分块加载,但是通常效率较低,为解决CSV文件行数统计问题,本文介绍了一种使用Java实现的CSV行数的统计方法,经实际工程数据结果显示,在单机处理6.5TB的数据时,仅用时6.2小时,平均处理速度达到 297MB/s。
由于一些值本身含有换行符,所以CSV格式为了区分,使用一对引号将带有换行符的内容包起来,例如 "a\nb" 表示 ab 两个字符中有个换行符,属于值的一部分,不是CSV中数据的行,因此在进行解决时,必需区分换行符是在引号中还是引号外。
本方法在处理时,通过流式读取,逐字符进行处理。通过定义一个布尔型变量 inquotes 表示当前字符的状态,如果遇到引号,就反转 inquotes。在遇到换行符时,根据 inquotes 的状态进行累计,即只有 inquotes 为 false 时,才进行累加,具有参见源代码第81-94行。
在本文结束给出了源代码。由于数据量比较大时,需要有专门的执行方法,具体如下:
具体细节可以参考 Linux脚本生成全目录文件信息 介绍的方法,以目标文件夹 /data/udisk/ 为例,生成目标文件夹所有文件的全路径命令如下。
$ find /data/udisk/ -iname '*\.csv' > /root/filelist.txt
为了保证长时间执行不断开,可以使用 screen 命令建立可断开窗口:
$ screen -S count_lines
假定源代码存储在 /root/java_code/CsvCounter.java 文件中,则通过以下命令对 /data/udisk 中的所有文件夹和子文件夹中的 CSV 文件进行行数统计。其中,输入文件列表文件为 /root/filelist.txt,输出的结果保存在 /root/result_20220608.txt 中。
$ cd /root/java_code $ javac CsvCounter.java $ cat /root/filelist.txt | xargs -I {} java CsvCounter {} >> /root/result_20220608.txt
Ctrl+a,d (按住 Ctrl 后,再先点击 a 再点击 d)断开连接,程序会切换到后台执行。而命令行会回到上一次操作的界面,这时可以安全退出当前SSH登陆。screen -r count_lines 即可恢复至之前的操作界面。本文是以知道创宇的7.2TB数据为分析目标,测量结果 ZDCY数据总行数分析 所示。
以下是Java源代码,使用 Java 11 运行。
import java.io.File; import java.io.RandomAccessFile; public class CsvCounter { public static void main(String[] args) throws Exception { if (args.length == 0) { String file = "C:\\data\\simple1.csv"; file = "C:\\data\\wmap_all_16_17_au.csv"; count_lines(file); return; } File fin = new File(args[0]); if (!fin.exists()) { System.out.printf("The input file '%s' does not exist.\n", fin.getAbsolutePath()); return; } if (fin.isFile()) { count_lines(args[0]); } else if (fin.isDirectory()) { for (String subfile : fin.list()) { String fullpath = args[0] + "/" + subfile; if (new File(fullpath).isFile()) { count_lines(fullpath); } } } } public static long count_lines(String filepath) throws Exception { return count_lines(filepath, 100 * 1024); } /** * 统计CSV文件中的总行数(去除了值中的换行符) * @param filepath 输入文件路径。 * @param size 缓冲区大小。 * @return * @throws Exception */ public static long count_lines(String filepath, int size) throws Exception { /**************************** * Preparing Process **********************************/ byte[] buffer = new byte[size]; int line_count = 0; long newline_pos = -1; boolean inquotes = false; char cur = '_'; // 当前的是什么不重要,因为第一次读取后就会被新值更新 long processTime = 0; // 统计总处理时间 long readingTime = 0; // 统计总读取时间 long absStartPos = 0; // 每次开始的绝对位置,每次完成读取后更新位置。 int last_readcount = 0; // start processing long start_time = System.currentTimeMillis(); RandomAccessFile infile = new RandomAccessFile(filepath, "r"); long filesize = infile.length(); /**************************** * Looping Process **********************************/ while (true) { // Reading bytes from file absStartPos += last_readcount; // 读取文件, 统计读取时间,并累加到 reading 上 long t0 = System.currentTimeMillis(); last_readcount = infile.read(buffer); readingTime += System.currentTimeMillis() - t0; // end loop if no more byte read if (last_readcount <= 0) break; // 统计换行符 for (int i = 0; i < last_readcount; i++) { cur = (char) buffer[i]; // 遇到引号就反转状态 if (cur == '"') { inquotes = !inquotes; } if (cur == '\n' && !inquotes) { line_count++; newline_pos = absStartPos + i; // System.out.println("line pos: " + newline_pos); } } } /**************************** * PostProcess **********************************/ if (newline_pos < infile.length() - 1) line_count++; infile.close(); // 打印统计结果,包括处理时间和速度 processTime = System.currentTimeMillis() - start_time; double size_gb = 1.0 * filesize / 1024/1024/1024; double proc_s = processTime / 1000.0; double proc_speed = size_gb / proc_s; // GB/s double read_s = readingTime / 1000.0; double read_speed = size_gb / read_s; // GB/s StringBuilder sb = new StringBuilder(); sb.append(filepath + ", "); // filepath sb.append(String.format("size=%.2fGB, lines=%d, ", size_gb, line_count)); sb.append(String.format("total=%.2fs:%.2fGB/s, ", proc_s, proc_speed)); sb.append(String.format("read=%.2fs:%.2fGB/s\n", read_s, read_speed)); System.out.println(sb.toString()); return line_count; } } // 以下是一些测试结果 // v1: /data/udisk/firstData/data/AU/xmap_au.csv, size=80GB, lines=24375815, readingTime=58.8s, speed=1.38GB/s // v2: /data/udisk/firstData/data/AU/xmap_au.csv, filesize=81.20GB, lines=24375815, readingTime=61.5s, speed=1.32GB/s // v3: /data/udisk/firstData/data/AU/xmap_au.csv, size=81.20GB, lines=24375815, total=409.8s:101.51GB/s, read=0.8s:0.20GB/s // v4: /data/udisk/firstData/data/AU/xmap_au.csv, size=79.30GB, lines=24375815, total=363.5s:0.22GB/s, read=61.6s:1.29GB/s // /data/udisk/firstData/data/AU/xmap_au.csv, size=79.30GB, lines=24375815, total=252.0s:0.31GB/s, read=49.7s:1.59GB/s