以下是上述 Python 代码对应的 Scala 版本,完整实现 Spark DataFrame 的 null 值处理逻辑:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, isnull, when, count, coalesce, lit, mean}
import org.apache.spark.ml.feature.Imputer
object NullHandling {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("NullHandling")
.getOrCreate()
import spark.implicits._
// 假设 df 是已有的 DataFrame(示例数据)
val df = spark.createDataFrame(Seq(
(1, "Alice", 30, 50000.0),
(2, null, null, null),
(3, "Bob", null, 60000.0),
(4, null, 45, null)
).toDF("id", "name", "age", "salary")
// 1. 检测各列 null 数量
println("原始数据各列 null 数量:")
df.select(df.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show()
// 2. 删除指定列的 null 行(示例:删除 name 列为 null 的行)
val cleanedDf = df.na.drop(Seq("name"))
// 3. 填充数值列 null 为均值(示例:填充 salary 列)
val meanSalary = df.select(mean("salary")).head().getDouble(0)
val filledDf = cleanedDf.na.fill(meanSalary, Seq("salary"))
// 4. 使用 Imputer 填充多列(示例:填充 age 和 salary)
val imputer = new Imputer()
.setInputCols(Array("age", "salary"))
.setOutputCols(Array("age_filled", "salary_filled"))
.setStrategy("mean") // 可选 "median" 或 "mode"
val model = imputer.fit(filledDf)
val imputedDf = model.transform(filledDf)
// 5. 替换字符串列的 null(示例:将 name 列的 null 替换为 "Unknown")
val finalDf = imputedDf.withColumn("name",
coalesce(col("name"), lit("Unknown"))
)
// 6. 验证处理结果
println("处理后的数据各列 null 数量:")
finalDf.select(finalDf.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show(truncate = false)
spark.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 关键代码解析:
检测 Null 值:
df.select(df.columns.map(c => count(when(isnull(col(c)), c).alias(c)): _*).show()
1使用
map
遍历所有列,统计每列的 null 数量。删除 Null 行:
df.na.drop(Seq("name")) // 删除 name 列为 null 的行
1指定列名列表(
Seq
),仅删除这些列的 null 行。填充 Null 值:
- 单列填充:
df.na.fill(0.0, Seq("salary")) // 用 0 填充 salary 列
1 - 多列填充:
df.na.fill(Map("age" -> 30, "name" -> "Unknown")) // 按列指定填充值
1
- 单列填充:
Imputer 自动填充数值列:
val imputer = new Imputer() .setInputCols(Array("age", "salary")) .setOutputCols(Array("age_filled", "salary_filled")) .setStrategy("mean") // 或 "median"/"mode"
1
2
3
4自动计算均值/中位数/众数填充数值列,生成新列。
条件替换 Null:
df.withColumn("name", coalesce(col("name"), lit("Unknown")))
1使用
coalesce
或when
逻辑替换 null 值。
# 运行结果示例:
原始数据各列 null 数量:
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
| 0| 2| 2| 2|
+---+----+---+------+
处理后的数据各列 null 数量:
+---+----+----+------+----------+-------------+
|id |name|age |salary|age_filled|salary_filled|
+---+----+----+------+----------+-------------+
|0 |0 |0 |0 |0 |0 |
+---+----+----+------+----------+-------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 注意事项:
- 类型一致性:填充数值列时需确保填充值类型与列类型一致(如
Double
填充Double
列)。 - 性能优化:
- 使用
cache()
缓存中间结果:val cachedDf = df.cache()
1 - 调整分区数:
val repartitionedDf = df.repartition(4)
1
- 使用
- 错误处理:若列名不存在或类型不匹配,会抛出
AnalysisException
,需提前校验 Schema。
通过以上 Scala 代码,可以高效处理 Spark DataFrame 中的 null 值,适用于大规模数据集。