Ubuntu TechHive
rust-and-data-processing-with-polars.md
使用 Rust 和 Polars 进行数据处理
article.细节

使用 Rust 和 Polars 进行数据处理

reading.进展 7 分钟阅读数

Rust 基础知识快速入门,以及使用 Polars 进行数据处理

使用 Polars 进行 Rust 数据处理

Rust 的独特之处

  • *编译型*且速度快 — 编译为原生机器码,无运行时/垃圾回收 (GC)
  • 内存安全 — 编译器在程序运行*之前*就能阻止整类错误(如空指针错误、数据竞争)
  • 强静态类型 — 每个值在编译时都有确定的类型;编译器会尽早捕获类型不匹配的问题

变量与可变性

变量*默认是不可变的*。你可以使用 mut 关键字来显式声明可变性。

let x = 5;          // 不可变 -- 无法重新赋值
let mut y = 10;     // 可变
y = 20;             // 因为有 `mut`,所以可以修改
// x = 6;           // 编译错误:无法对 `x` 进行二次赋值

const MAX: u32 = 100_000;  // 常量:始终不可变,必须指定类型

这种默认设置颠覆了通常的预期:你必须预先说明哪些内容允许更改,这使得代码逻辑更容易推导。

基本数据类型

标量类型

  • 整数: i32, i64, u32, u64 … (i = 有符号, u = 无符号; 数字 = 位数)。=i32= 是默认类型。
  • 浮点数: f64 (默认), f32
  • 布尔值: bool -> true / false
  • 字符: char -> 单个 Unicode 字符,使用单引号
let count: i64 = 42;
let price: f64 = 19.99;
let is_ready: bool = true;
let letter: char = 'A';

复合类型

  • 元组 (Tuple): 固定大小的混合类型集合
  • 数组 (Array): 固定大小,所有元素类型相同
let person: (i32, f64, char) = (30, 5.9, 'M');
let height = person.1;          // 通过索引访问 -> 5.9

let nums: [i32; 3] = [1, 2, 3]; // 包含 3 个 i32 的数组
let first = nums[0];            // -> 1

字符串:两种类型

  • &str — “字符串切片”,通常是固定的/借用的字符串字面量
  • String — 拥有所有权、可增长的字符串,可以修改
let literal: &str = "hello";          // 固定文本
let mut owned: String = String::from("hello");
owned.push_str(", world");            // 因为拥有所有权,所以可以增长

函数

  • 使用 fn 声明
  • 参数类型是*必须的*;返回类型写在 -> 之后
  • 最后一个表达式(不带分号)即为返回值
fn add(a: i32, b: i32) -> i32 {
    a + b          // 没有分号 = 这是返回值
}

fn greet(name: &str) {   // 没有 `->` 表示不返回任何值
    println!("Hello, {name}!");
}

fn main() {
    let sum = add(2, 3);     // 每个程序都从 main() 开始
    println!("Sum: {sum}");
    greet("Aziz");
}

注意:=println!= 是一个*宏*(末尾的 ! 是标志),而不是函数。

控制流

if / else(它是一个表达式!)

let n = 7;
if n % 2 == 0 {
    println!("even");
} else {
    println!("odd");
}

// 因为 `if` 会返回一个值,所以可以用它进行赋值:
let label = if n > 5 { "big" } else { "small" };

循环

// loop: 无限循环,直到遇到 `break`
let mut i = 0;
loop {
    if i >= 3 { break; }
    i += 1;
}

// while
let mut c = 3;
while c > 0 {
    println!("{c}");
    c -= 1;
}

// for: 最常用 -- 遍历范围或集合
for k in 0..3 {            // 0, 1, 2 (不包含 3)
    println!("k = {k}");
}

所有权:核心理念

Rust 的主打特性。三条规则:

  1. 每个值都有一个*所有者*
  2. 同一时间只能有一个所有者
  3. 当所有者超出作用域时,值会被清理
let s1 = String::from("hi");
let s2 = s1;              // 所有权移动 (MOVE) 到 s2
// println!("{s1}");      // 错误:s1 不再有效

// 若要让其他函数使用值而不获取所有权,
// 可以使用 &(引用)进行“借用”:
fn length(s: &String) -> usize {
    s.len()              // 读取 s,但不拥有它
}
let word = String::from("rust");
let n = length(&word);   // 借用它;`word` 在此之后仍可使用

这就是 Rust 在没有垃圾回收的情况下保证内存安全的原因。这是最需要时间适应的部分。

结构体 (Structs):自定义数据类型

struct Order  {
    id: i64,
    amount: f64,
    shipped: bool,
}

let o = Order { id: 1, amount: 42.5, shipped: true };
println!("Order {} costs {}", o.id, o.amount);

枚举 (Enums) 与模式匹配

枚举允许一个值是多个变体之一;=match= 用于处理每种情况。

enum Status {
    Pending,
    Shipped,
    Cancelled,
}

let s = Status::Shipped;
match s {
    Status::Pending   => println!("waiting"),
    Status::Shipped   => println!("on the way"),
    Status::Cancelled => println!("nope"),
}

match 必须是*穷尽的* — 必须处理所有情况,否则代码无法编译。这是编译器防止遗漏的另一种方式。

Option 和 Result:无空值,无静默错误

Rust 没有 =null=。取而代之的是:

  • Option — 一个值要么是 Some(x)=,要么是 =None
  • Result — 要么是 =Ok(x)=,要么是 =Err(e)=(这是 Polars 示例中所有错误处理的基础)
fn divide(a: f64, b: f64) -> Option<f64> {
    if b == 0.0 { None } else { Some(a / b) }
}

match divide(10.0, 2.0) {
    Some(result) => println!("Got {result}"),
    None         => println!("Can't divide by zero"),
}

? 操作符:错误处理的简写

Result 上,=?= 的意思是: “给我结果值,或者从当前函数返回错误。”

use std::num::ParseIntError;

fn parse_and_double(text: &str) -> Result<i32, ParseIntError> {
    let n = text.parse::<i32>()?;  // 如果解析失败,返回 Err
    Ok(n * 2)                      // 否则继续执行
}

这就是为什么 read_orders(...)? 写起来很简洁:=?= 会静默地传播任何失败,而不是强迫你写一个巨大的 match 块。

常用集合

  • Vec — 可增长列表(类似于 Python 的 list)
  • HashMap — 键值对映射(类似于 Python 的 dict)
let mut v: Vec<i32> = Vec::new();
v.push(1);
v.push(2);
for item in &v { println!("{item}"); }

use std::collections::HashMap;
let mut scores = HashMap::new();
scores.insert("alice", 10);
scores.insert("bob", 7);

Cargo:Rust 的构建工具与包管理器

基本命令:

cargo new my_project   # 创建新项目
cargo build            # 编译
cargo run              # 编译并运行
cargo test             # 运行测试
cargo add polars       # 在 Cargo.toml 中添加依赖

依赖项(称为“crates”)在 Cargo.toml 中声明,并从 crates.io 获取。

注意事项:

  • 所有权/借用&mut 的博弈。刚开始会感到困难,多练习就会豁然开朗。
  • 两种字符串类型 (String vs &str) — 使用 .to_string()String::from(...) 进行转换。
  • 默认不可变 — 忘记加 mut 是初学者最常见的错误。
  • 编译器是你的朋友 — Rust 的错误信息非常出色。请仔细阅读,它们通常会告诉你确切的修复方法。
  • 宏与函数println!, vec!, df!! 结尾,其行为与普通函数略有不同。

什么是 Polars

  • 一个用于处理表格数据(行和列)的 DataFrame 库 — 想象一下代码中的电子表格或数据库表
  • 使用 Rust 编写,基于 *Apache Arrow*(一种列式内存格式)
  • *列式存储*:按列而非按行存储数据 — 这就是列操作和分析速度快的原因
  • *默认多线程*:无需额外配置即可利用所有 CPU 核心
  • 可直接在 Rust 中使用,也可通过绑定在 Python 中使用

两个核心类型

  • Series — 单列数据,所有元素类型相同
  • DataFrame — Series 的集合;即表格本身
use polars::prelude::*;

// Series 是一个有名字的列。
let s = Series::new("amount".into(), &[42.5, 17.0, 9.99]);

// DataFrame 由列构建。df! 宏是最简单的方法。
let df = df!(
    "order_id" => &[1, 2, 3],
    "amount"   => &[42.5, 17.0, 9.99],
)?;
println!("{df}");

注意 df!! 结尾 — 它是一个宏,就像 println!vec! 一样。

一切皆返回 Result

几乎所有的 Polars 操作都可能失败(类型错误、列缺失、文件损坏),因此它们返回 PolarsResult=。这就是为什么你在研讨会中到处看到 =? 的原因 — 它传播错误,而不是让错误静默发生。

fn build() -> PolarsResult<DataFrame> {
    let df = df!("a" => &[1, 2, 3])?;   // ? 解包或返回错误
    Ok(df)
}

这直接关联到 Rust 的 Result?: 坏数据会变成你必须处理的错误,而不是静默的 =NaN=。

读取和写入数据

议程中的四种格式:

// 读取 CSV
let df = CsvReadOptions::default()
    .with_has_header(true)
    .try_into_reader_with_file_path(Some("orders.csv".into()))?
    .finish()?;

// 写入 Parquet
let mut file = std::fs::File::create("orders.parquet")?;
ParquetWriter::new(&mut file).finish(&mut df)?;

// 读取 Parquet
let mut f = std::fs::File::open("orders.parquet")?;
let df = ParquetReader::new(&mut f).finish()?;

核心理念:Parquet 在*文件内部存储了模式 (schema) 和类型*,因此读取时无需猜测。CSV 是文本格式,必须进行推断或显式指定模式。

模式 (Schemas):契约

Schema 预先声明了每一列的名称和类型。将其提供给读取器,坏数据会引发明显的错误,而不是破坏列数据。

let mut schema = Schema::default();
schema.with_column("order_id".into(), DataType::Int64);
schema.with_column("amount".into(), DataType::Float64);

常见的 DataType=:=Int64, Float64, String, Boolean, =Date=。

选择与过滤

你使用*表达式*来描述操作 — col(...) 指代一列,你可以链式调用转换。

let result = df
    .clone()
    .lazy()
    .filter(col("status").eq(lit("shipped")))   // 保留匹配行
    .select([col("order_id"), col("amount")])    // 选择列
    .collect()?;                                  // 执行
  • col("x") — 指代 x 列
  • lit("shipped") — 用于比较的字面量值
  • .eq, .gt, .lt — 表达式上的比较运算符

连接 (Joins):合并表格

根据共享键匹配两个 DataFrame 的行。

let joined = orders.join(
    &customers,
    ["customer_id"],                 // 左表中的键
    ["customer_id"],                 // 右表中的键
    JoinArgs::new(JoinType::Inner),  // Inner / Left / Anti / ...
    None,
)?;

值得了解的连接类型:

  • Inner — 仅保留两表匹配的行
  • Left — 保留所有左表行,不匹配处为 null
  • Anti — 保留左表中*没有*匹配的行(非常适合作为数据质量检查)

Eager (立即执行) vs Lazy (惰性执行):重大区别

  • Eager — 每个操作立即运行 (DataFrame)。简单,适合小数据和探索。
  • Lazy — 你构建一个*查询计划*,在调用 .collect() 之前什么都不会运行。Polars 会*优化*整个计划(下推过滤器,仅读取必要的列)。
// Lazy: scan_* 和 .lazy() 返回 LazyFrame -- 这是一个计划,还不是数据。
let plan = LazyCsvReader::new(PlPath::new("orders.csv"))
    .with_has_header(true)
    .finish()?
    .filter(col("status").eq(lit("shipped")))
    .select([col("order_id"), col("amount")]);

println!("{}", plan.clone().explain(true)?);  // 查看计划
let df = plan.collect()?;                       // 现在才执行

explain(true) 会打印优化后的计划 — 你可以在消耗计算资源前看到引擎决定做什么。

常用操作速查表

df.height();                 // 行数
df.width();                  // 列数
df.column("amount")?;        // 获取一列 (Series)
df.head(Some(5));            // 前 5 行
df.get_column_names();       // 列名
df.column("amount")?.dtype();// 列的数据类型

为什么选择 Polars (对比 pandas / Spark)

  • vs pandas — 快得多,多线程,惰性优化,内存表现更好;类型更严格(静默错误更少)
  • vs Spark — 单机工作负载无需集群;许多“我们需要 Spark”的任务其实是“pandas 在单机上太慢了”
  • Polars 在没有分布式系统开销的情况下,为你提供了*性能与正确性*。

它与 Rust 基础的联系

  • PolarsResult? = Rust 的 Result + ? 操作符
  • 连接中的 &customers = *借用*(读取而不获取所有权)
  • 写入 Parquet 时的 &mut df = 可变借用
  • df!, col! 风格的宏 = ! 宏语法
  • Schema 和 DataType = Rust 的“一切皆有已知类型”理念在表格列上的应用

演示

一个故意不完美的 CSV

使用一个包含混合类型列、null 值和坏行的文件:

order_id,customer_id,amount,status
1,100,42.50,shipped
2,101,,pending
3,102,17.00,shipped
4,bad_id,9.99,shipped

第 4 行有一个非数字的 =customer_id=。在宽松的管道中,这会变成静默的 NaN 或对象列。我们希望它能报错。

使用推断模式进行 Eager 读取(简单但危险的路径)

use polars::prelude::*;

fn main() -> PolarsResult<()> {
    let df = CsvReadOptions::default()
        .with_has_header(true)
        .try_into_reader_with_file_path(Some("orders.csv".into()))?
        .finish()?;

    println!("{df}");
    Ok(())
}

这能运行 — 但推断过程查看了样本并*猜测*了类型。在不同的文件或更多行数下,猜测可能会改变。推断虽然方便但不确定;这种组合在生产环境中会坑你。

显式模式(可靠性课程)

停止猜测。声明契约:

use polars::prelude::*;
use std::sync::Arc;

fn read_orders(path: &str) -> PolarsResult<DataFrame> {
    let mut schema = Schema::default();
    schema.with_column("order_id".into(), DataType::Int64);
    schema.with_column("customer_id".into(), DataType::Int64);
    schema.with_column("amount".into(), DataType::Float64);
    schema.with_column("status".into(), DataType::String);

    CsvReadOptions::default()
        .with_has_header(true)
        .with_schema(Some(Arc::new(schema)))
        .try_into_reader_with_file_path(Some(path.into()))?
        .finish()
}

现在 customer_id 被声明为 Int64=。坏行 (=bad_id) 再也无法作为文本混入 — Polars 返回 =Err=,而不是静默破坏列。失败发生在读取时,且原因明确,而不是在三次转换之后。

这就是 Rust + Polars 的意义

  • 模式即*代码* — 它像任何其他契约一样被版本化、审查和测试
  • finish() 返回 PolarsResult=。你无法意外忽略解析失败 — =? 强制你处理或传播它
  • 对比动态类型管道,其中坏解析会变成 NaN 并静默向下游流动。在这里,*类型系统和错误类型使得静默失败成为不可能。*

将错误处理视为一等公民

展示两种行为,让听众感受差异:

fn main() {
    match read_orders("orders.csv") {
        Ok(df) => println!("Loaded {} rows\n{df}", df.height()),
        Err(e) => eprintln!("CSV failed its contract: {e}"),
    }
}

在管道中,=Err= 意味着任务*在此处*停止,并伴有明确消息 — 而不是在凌晨三点、处理了四千万行数据后才崩溃。

用测试锁定它

可靠性主题的具体化 — 一个断言契约的测试,确保格式错误的上游文件在 CI 中失败,而不是在生产中:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn schema_is_enforced() {
        let df = read_orders("tests/data/orders_good.csv").unwrap();
        assert_eq!(df.height(), 3);
        assert_eq!(
            df.column("amount").unwrap().dtype(),
            &DataType::Float64
        );
    }

    #[test]
    fn bad_types_are_rejected() {
        // 包含 `bad_id` 的文件绝不能静默加载。
        assert!(read_orders("tests/data/orders_bad.csv").is_err());
    }
}

bad_types_are_rejected 是整个哲学的缩影:我们断言坏数据*必须失败*。大多数管道从不编写此类测试,因为在它们的栈中,坏数据不会失败 — 它会扩散。

有意处理 Null 值(而非意外)

第 2 行的空 amount 是一个真正的 null。决定它的含义,而不是让猜测来决定:

use polars::prelude::*;

fn parse_options() -> CsvParseOptions {
    CsvParseOptions::default()
        .with_null_values(Some(NullValues::AllColumns(
            vec!["".into(), "NA".into(), "null".into()].into(),
        )))
}

操作清晰度:null 是代码中记录在案的决策,而不是解析器随心所欲的结果。

本节要点

  • CSV 默认是无类型且不安全的 — 将每次读取视为必须验证的边界
  • 显式模式将“希望它能解析”转变为“要么解析成功,要么报错” — 确定性优于便利性
  • PolarsResult 使得忽略失败在编译时成为不可能
  • 一个测试 (bad_types_are_rejected) 展示了整个可靠性论点
  • Rust + Polars 在此处的意义不在于速度,而在于它使静默数据损坏在结构上变得困难