记录处理spark列空值的方法

2023/10/22 spark

以下是上述 Python 代码对应的 Scala 版本,完整实现 Spark DataFrame 的 null 值处理逻辑:

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, isnull, when, count, coalesce, lit, mean}
import org.apache.spark.ml.feature.Imputer

object NullHandling {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("NullHandling")
      .getOrCreate()
    import spark.implicits._

    // 假设 df 是已有的 DataFrame(示例数据)
    val df = spark.createDataFrame(Seq(
      (1, "Alice", 30, 50000.0),
      (2, null, null, null),
      (3, "Bob", null, 60000.0),
      (4, null, 45, null)
    ).toDF("id", "name", "age", "salary")

    // 1. 检测各列 null 数量
    println("原始数据各列 null 数量:")
    df.select(df.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show()

    // 2. 删除指定列的 null 行(示例:删除 name 列为 null 的行)
    val cleanedDf = df.na.drop(Seq("name"))

    // 3. 填充数值列 null 为均值(示例:填充 salary 列)
    val meanSalary = df.select(mean("salary")).head().getDouble(0)
    val filledDf = cleanedDf.na.fill(meanSalary, Seq("salary"))

    // 4. 使用 Imputer 填充多列(示例:填充 age 和 salary)
    val imputer = new Imputer()
      .setInputCols(Array("age", "salary"))
      .setOutputCols(Array("age_filled", "salary_filled"))
      .setStrategy("mean") // 可选 "median" 或 "mode"

    val model = imputer.fit(filledDf)
    val imputedDf = model.transform(filledDf)

    // 5. 替换字符串列的 null(示例:将 name 列的 null 替换为 "Unknown")
    val finalDf = imputedDf.withColumn("name",
      coalesce(col("name"), lit("Unknown"))
    )

    // 6. 验证处理结果
    println("处理后的数据各列 null 数量:")
    finalDf.select(finalDf.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show(truncate = false)

    spark.stop()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 关键代码解析:

  1. 检测 Null 值

    df.select(df.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show()
    
    1

    使用 map 遍历所有列,统计每列的 null 数量。

  2. 删除 Null 行

    df.na.drop(Seq("name")) // 删除 name 列为 null 的行
    
    1

    指定列名列表(Seq),仅删除这些列的 null 行。

  3. 填充 Null 值

    • 单列填充
      df.na.fill(0.0, Seq("salary")) // 用 0 填充 salary 列
      
      1
    • 多列填充
      df.na.fill(Map("age" -> 30, "name" -> "Unknown")) // 按列指定填充值
      
      1
  4. Imputer 自动填充数值列

    val imputer = new Imputer()
      .setInputCols(Array("age", "salary"))
      .setOutputCols(Array("age_filled", "salary_filled"))
      .setStrategy("mean") // 或 "median"/"mode"
    
    1
    2
    3
    4

    自动计算均值/中位数/众数填充数值列,生成新列。

  5. 条件替换 Null

    df.withColumn("name", coalesce(col("name"), lit("Unknown")))
    
    1

    使用 coalescewhen 逻辑替换 null 值。

# 运行结果示例:

原始数据各列 null 数量:
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
|  0|   2|  2|     2|
+---+----+---+------+

处理后的数据各列 null 数量:
+---+----+----+------+----------+-------------+
|id |name|age |salary|age_filled|salary_filled|
+---+----+----+------+----------+-------------+
|0  |0   |0   |0     |0         |0            |
+---+----+----+------+----------+-------------+
1
2
3
4
5
6
7
8
9
10
11
12
13

# 注意事项:

  1. 类型一致性:填充数值列时需确保填充值类型与列类型一致(如 Double 填充 Double 列)。
  2. 性能优化
    • 使用 cache() 缓存中间结果:
      val cachedDf = df.cache()
      
      1
    • 调整分区数:
      val repartitionedDf = df.repartition(4)
      
      1
  3. 错误处理:若列名不存在或类型不匹配,会抛出 AnalysisException,需提前校验 Schema。

通过以上 Scala 代码,可以高效处理 Spark DataFrame 中的 null 值,适用于大规模数据集。