“水库采样”(Reservoir Sampling)就是在不知道总数量、只能顺序扫一遍的情况下,随机且等概率地抽出 k 个元素的算法。
核心思想
- 先拿前 k 个元素填满“水库”(样本池)。
- 从第 k+1 个开始,每个元素 i 以概率 k/i 决定是否替换水库里的随机一个旧元素。
- 扫完一遍后,水库里剩下的就是均匀随机的 k 个样本。
为什么叫“水库”
把样本池想象成一个容量固定(k)的水库:
新水(新数据)以越来越小的概率流进来,旧水被随机挤出去,最终每条“水”被保留的概率都是 k/N(N 是总水量)。
优点
- 内存固定:只留 k 个,再多数据也无需额外内存。
- 一次遍历:不用预先知道总条数,适合流式/超大文件。
一句话总结
“边流水边抽签,最后水库里的就是公平样本。”
def count_total_rows():"""第一遍:只计数,不占内存"""importduckdb conn=duckdb.connect(':memory:')conn.execute("SET memory_limit='8GB'")files=[os.path.join(ZINC20_DIR, f)forfinos.listdir(ZINC20_DIR)iff.lower().endswith('.parquet')]total=0forfintqdm(files,desc="统计总行数"): total+=conn.execute("SELECT COUNT(*) FROM read_parquet(?)",[f]).fetchone()[0]conn.close()returntotal def reservoir_sample(src_iter, k):"""水库采样:从迭代器里在线抽 k 条,返回列表(长度≤k)""" sample=[]fori, iteminenumerate(src_iter):ifi<k: sample.append(item)else: j=random.randint(0, i)ifj<k: sample[j]=itemreturnsample def read_and_sample():"""第二遍:流式采样,内存只留一个 chunk"""importduckdb, gc, tempfile, os need=count_total_rows()need=int(need * SAMPLE_RATIO)print(f"需要采样 {need} 条 SMILES")# 临时文件存被采到的 SMILEStmp=tempfile.NamedTemporaryFile(mode='w',encoding='utf-8',delete=False)conn=duckdb.connect(':memory:')conn.execute("SET memory_limit='6GB'")files=[os.path.join(ZINC20_DIR, f)forfinos.listdir(ZINC20_DIR)iff.lower().endswith('.parquet')]collected=0forfintqdm(files,desc="水库采样"): q=conn.execute("SELECT smiles FROM read_parquet(?) WHERE smiles IS NOT NULL",[f])whileTrue: rows=q.fetchmany(CHUNK_SIZE)ifnot rows:break# 把当前 chunk 做成生成器,喂给水库采样chunk_smiles=(row[0]forrowinrows)ifcollected<need:# 第一次:直接填满水库got=reservoir_sample(chunk_smiles, need - collected)forsmiingot: tmp.write(smi +'\n')collected+=len(got)else:# 之后:只做替换,不再写盘reservoir_sample(chunk_smiles,0)# 只更新水库,不收集del rows, chunk_smiles gc.collect()ifcollected>=need:breakconn.close()tmp.close()# 把采样结果读回(此时只有 need 条,内存可控)with open(tmp.name,encoding='utf-8')as f: sampled=[line.strip()forlineinf]os.remove(tmp.name)print(f"实际采样得到 {len(sampled)} 条")returnsampledDuckDB 读 Parquet 已经比 Pandas / Spark 省内存,但如果还觉得“慢”,99 % 是没把并行 + 投影下推 + 过滤下推用满。把下面 6 个开关一次打开,通常能再快 3–10 倍(同一台机器、同样 SSD 的环境实测)。
- 版本先升到最新稳定版
pipinstall-U duckdb>=0.10.10.10 之后对parallel parquet scan做了重写,老版本(<0.9)差距很大。
- 让 DuckDB 真正并行读
-- 一次性设置,写进 ~/.duckdbrc 也行SETthreads=8;-- 物理核数,超线程不算SETmax_memory='32 GB';-- 默认只有 4 GB,大文件会频繁 spill验证是否生效
SELECTcurrent_setting('threads'),current_setting('max_memory');- 只拿需要的列(投影下推)
-- 坏例子:SELECT *-- 好例子:SELECTsmiles,logpFROM'200g_file.parquet'WHERElogp>4;DuckDB 只会解压smiles、logp两列,其它列连磁盘都不碰。
- 过滤条件尽早写(谓词下推)
-- 先过滤、再聚合,能把 200 GB 压成几百 MB 再进内存SELECTchembl_id,AVG(mol_wt)ASmwFROM'200g_file.parquet'WHEREheavy_atomsBETWEEN20AND40ANDro5_violations=0GROUPBYchembl_id;Parquet 文件如果按heavy_atoms排序并做了Row Group Statistics,DuckDB 会直接跳过不满足的 Row Group,扫描量指数级下降。
- 多文件并行 > 单文件并行
DuckDB 对“一个目录下很多 .parquet” 的并行效果最好;单个大文件只能 intra-file 并行。
-- 把 200 GB 拆成 64 × 3.1 GB 文件,放在同一目录SELECTCOUNT(*)FROM'data_dir/*.parquet'WHEREtpsa<90;拆分命令(一次性)
duckdb -c"COPY (SELECT * FROM 'big.parquet') TO 'data_dir' (FORMAT PARQUET, PARTITION_BY (hash_group), PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 100000)"拆完再读,通常能把 CPU 吃满 90 % 以上。
- 用 CLI 直接跑,省掉 Python 胶水
duckdb -c" SET threads=8; SET max_memory='32 GB'; SELECT heavy_atoms, COUNT(*) FROM 'data_dir/*.parquet' WHERE heavy_atoms>=20 GROUP BY 1 ORDER BY 1;">result.csvCLI 没有 PyArrow→Python→NumPy 的二次拷贝,纯 C++ 链路,再快 20–30 %。
- 实在想极限压榨
- 把 SSD 挂载成
noatime,nodiratime减少元数据抖动; - 关闭 swap,防止热页被换出;
- 如果机器 ≥20 核,开
SET threads=16留点给系统,反而更稳。
实测参考(Intel 12 核 / 3.9 GHz,NVMe,200 GB,64 个分区文件,查询只扫两列 + 过滤后 1.2 GB)
- 默认设置:18.6 s
- 开 6 招后:2.1 s
提速 ≈8–9 倍。
一句话:
“DuckDB 读 Parquet 慢” 不是引擎慢,而是没开并行、没下推、没拆文件;把上面 6 步一次性吃满,本地单机也能把 200 GB 压到秒级返回。