Publish AI, ML & data-science insights to a global community of data professionals.

PySpark for Beginners: Beyond the BasicsPySpark 入门:超越基础

Take the next step to building real workflows with Spark on your laptop 迈出下一步,在你的笔记本电脑上用 Spark 构建真实工作流

Image by AIAI 生成图片

If you’ve read my first article如果你读过我的第一篇文章 in this series, PySpark for Beginners: Mastering the Basics then you already understand the heart of Spark: distributed data, DataFrames, and lazy execution. You’ve installed PySpark, stood up a SparkSession, read a CSV and performed simple manipulations of the data in a Dataframe. I’ll leave a link to that story at the end of this one.如果你读过本系列的第一篇文章《PySpark 入门:掌握基础》,那么你已经了解了 Spark 的核心:分布式数据、DataFrame 和惰性执行。你已经安装了 PySpark,创建了 SparkSession,读取了 CSV 文件,并对 DataFrame 中的数据进行了简单操作。我会在本文末尾留下那篇文章的链接。

One thing worth repeating from that original article is that I often use the terms PySpark and Spark interchangeably, but strictly speaking, Spark is the overarching distributed computing framework (written in Scala), and PySpark is a dedicated Python API to Spark.从第一篇文章中值得重复的一点是,我经常交替使用 PySpark 和 Spark 这两个术语,但严格来说,Spark 是底层的分布式计算框架(用 Scala 编写),而 PySpark 是 Spark 的专用 Python API。

Beyond the basics超越基础

Now, something interesting happens when you get past that beginner stage. You quickly realise your second PySpark project requires a slightly different mindset:现在,当你跨过初学者的阶段后,有趣的事情发生了。你很快会意识到,你的第二个 PySpark 项目需要一种稍微不同的思维方式:

  • You want to read/write data in a safer, faster, more predictable way.你希望以更安全、更快速、更可预测的方式读写数据。
  • You want to combine datasets without feeling uncertain about joins.你希望合并数据集时不再对连接操作感到不确定。
  • You want to understand why Spark is behaving the way it does — and how to nudge it gently in the right direction.你希望理解 Spark 为什么表现出某种行为——以及如何温和地引导它走向正确的方向。

This article takes you through those next steps. It’s deliberately slow‑paced and practical. No deep internals. No cluster tuning. No complicated Spark optimisations. Just the things real beginners need to know when they move from toy examples to small, real-world work.本文将带你走过这些进阶步骤。它刻意保持慢节奏和实用性。不涉及深层内部原理。不涉及集群调优。不涉及复杂的 Spark 优化。只讲真正的初学者从玩具示例转向小型实际工作时需要知道的内容。

We’re using open‑source Spark, running locally, just like before.和之前一样,我们使用在本地运行的开源 Spark。

1. Taking the next step: reading data properly1. 迈出下一步:正确地读取数据

In my first article, we used the simplest possible CSV loader:在我的第一篇文章中,我们使用了最简单的 CSV 加载方式:

df = spark.read.csv("sales.csv", header=True, inferSchema=True)

It works - and it’s fine for early experiments - but it hides a subtle problem.它能用——对于早期实验来说没问题——但它隐藏了一个微妙的问题。

Spark is guessing your data typesSpark 正在猜测你的数据类型

When you use the inferSchema=True directive, Spark looks at a small sample of your file and uses that information to guess whether a column is an integer, string, boolean, or double. That means:当你使用 inferSchema=True 指令时,Spark 会查看文件的一小部分样本,并据此猜测某一列是整数、字符串、布尔值还是双精度浮点数。这意味着:

  • If 99 rows appear to be numeric and the 100th row is blank, Spark might interpret the column as a string.如果 99 行看起来是数字,而第 100 行是空白的,Spark 可能会将该列解释为字符串。
  • If someone edits the file next week and accidentally adds £23.50 instead of 23.50, Spark might treat the entire column differently.如果有人下周编辑文件时不小心添加了 £23.50 而不是 23.50,Spark 可能会以不同的方式处理整列。
  • If your file is large, the sample Spark uses won’t represent the whole dataset.如果你的文件很大,Spark 使用的样本将无法代表整个数据集。

This can lead to mysterious behaviour later , the kind of bugs beginners find hardest to diagnose.这可能导致后续出现神秘的行为——那种初学者最难诊断的 bug。

A better beginner habit: define a schema for your data更好的初学者习惯:为你的数据定义 schema

Think of a schema as Spark’s version of a blueprint for reading data. Before building anything, you tell Spark things like:把 schema 想象成 Spark 版本的读取数据蓝图。在开始构建任何东西之前,你告诉 Spark 类似这样的信息:

The names of the columns
What data type they should be
Whether or not a column value is optional.
列的名称它们应该是什么数据类型某一列的值是否可选。

Here’s what it looks like for our sales data example. Recall that the data looked like this:对于我们的销售数据示例,它看起来是这样的。回想一下,数据看起来像这样:

transaction_id,customer_name,net_amount,tax_amount, is_member
101,Alice,250.50,25.05,true
102,Bob,120.00,6.00, false
103,Charlie,450.75,25.07,true
104,David,89.99,5.73,false

To specify the types of the above fields in Spark, we define our schema using code like the following.要在 Spark 中指定上述字段的类型,我们使用如下代码定义 schema。

from pyspark.sql import types as T

schema = T.StructType([
T.StructField("transaction_id", T.IntegerType(), False),
T.StructField("customer_name", T.StringType(), False),
T.StructField("net_amount", T.DoubleType(), True),
T.StructField("tax_amount", T.DoubleType(), True),
T.StructField("is_member", T.BooleanType(), True),
])

The column names and type parameters are self-explanatory. The True[False] parameter indicates that there may [not] be NULL values in the column. Note, the True/False nullability flag is mostly schema metadata and optimisation info. It’s not always strictly enforced for every data source the way a database NOT NULL constraint is.列名和类型参数不言自明。True[False] 参数表示该列中可能[不]存在 NULL 值。注意,True/False 的可空性标志主要是 schema 元数据和优化信息。它并不总是像数据库的 NOT NULL 约束那样对每个数据源都严格强制执行。

More useful options when reading CSV data读取 CSV 数据时更有用的选项

There’s a bunch of handy CSV read options you can combine with the schema directive that make loading CSV data even more reliable.有一系列实用的 CSV 读取选项,你可以与 schema 指令结合使用,使 CSV 数据的加载更加可靠。

The more common options include:更常见的选项包括:

  • mode=”PERMISSIVE”: keeps bad rows as much as possiblemode="PERMISSIVE":尽可能保留不良行
  • mode=”DROPMALFORMED”: drops malformed rowsmode="DROPMALFORMED":丢弃格式错误的行
  • mode=”FAILFAST”: errors immediatelymode="FAILFAST":立即报错
  • header= True[False]: Does the file contain [or not] a header recordheader= True[False]:文件是否[不]包含标题记录
  • nullValue: what text should replace null values in the inputnullValue:用什么文本替换输入中的空值
  • dateFormat / timestampFormatdateFormat / timestampFormat

Now we can load the sales_data into a Dataframe like this:现在我们可以像这样将 sales_data 加载到 DataFrame 中:

df = (
    spark.read
    .option("header", True)
    # Other modes: "PERMISSIVE" and "DROPMALFORMED".
    .option("mode", "FAILFAST")
    .option("nullValue", "N/A")
    .schema(schema)
    .csv("sales_data.csv")
)

Why is this important for beginners?为什么这对初学者很重要?

  1. You know what the data types are before you start working.你在开始工作之前就知道数据类型是什么。
  2. If specified, Spark will reject weird rows instead of silently interpreting them.如果指定了,Spark 会拒绝异常行,而不是静默地解释它们。
  3. Your transformations become more predictable.你的转换变得更加可预测。
  4. If you join two datasets later, type mismatches won’t surprise you.如果你稍后连接两个数据集,类型不匹配不会让你措手不及。

2. Understanding data transformations 2. 理解数据转换

Recall, in my previous article, in our first steps with manipulating dataframes with PySpark, we added an extra, derived column to our Dataframe using code like this:回想一下,在我之前的文章中,在我们用 PySpark 操作 DataFrame 的初步步骤中,我们使用如下代码向 DataFrame 添加了一个额外的派生列:

df2 = df.withColumn("gross_amount", df.net_amount + df.tax_amount)

I explained that this line doesn’t calculate anything yet. It simply adds a step to Spark’s internal plan:我解释过,这一行还不会计算任何东西。它只是向 Spark 的内部计划添加了一个步骤:

1. Read the CSV  
2. Add a new column (gross_amount = net + tax)

Then you might add more steps like this:然后你可能会添加更多步骤,像这样:

df3 = df2.withColumn("tax_percentage", df2.tax_amount / df2.gross_amount * 100)

Still, no computation has happened. Only when you perform an action like …仍然没有发生任何计算。只有当你执行一个动作,比如……

df3.show()

… does Spark say:……Spark 才会说:

“Okay, now I need to actually run all these steps.”“好的,现在我需要实际运行所有这些步骤。”

This is what “lazy execution” means, but the important bit for beginners isn’t the name. It’s the effect, and it means,这就是“惰性执行”的含义,但对初学者来说重要的不是这个名字。而是它的效果,这意味着:

  • You can chain many transformations without “paying” for them until you need the result.你可以链式组合多个转换,直到需要结果时才“付费”。
  • Spark can rearrange the order internally to run things efficiently.Spark 可以在内部重新排列顺序以高效运行。
  • You don’t waste time doing intermediate steps on data you might filter out later.你不会浪费时间对之后可能过滤掉的数据执行中间步骤。

Think of it like you would an everyday task, like making a sandwich:把它想象成日常任务,比如做三明治:

  • You gather all the ingredients.你收集所有食材。
  • You assemble it in your mind.你在脑海中组装它。
  • You only actually start cutting and preparing once you know precisely what you’re making.只有当你确切知道要做什么时,你才真正开始切割和准备。

3. Cleaning data before it causes problems3. 在数据造成问题之前清理它

Real data is usually messy and often contains missing values, blank strings, duplicate records, or placeholder values like “N/A” and “unknown”.真实数据通常是混乱的,经常包含缺失值、空字符串、重复记录,或像“N/A”和“unknown”这样的占位值。

In PySpark, the goal is to catch and deal with obvious problems early so the rest of your workflow behaves predictably. PySpark has a number of useful functions that enable you to do this.在 PySpark 中,目标是尽早发现并处理明显的问题,以便工作流的其余部分可预测地运行。PySpark 有许多有用的函数可以帮助你做到这一点。

Dropping rows with missing values删除包含缺失值的行

The simplest cleaning function is dropna().最简单的清理函数是 dropna()。

df_clean = df.dropna()

This removes any row that contains a null value in any column. That can be useful, but it is often too aggressive.这会删除任何列中包含 null 值的行。这可能有用,但通常过于激进。

More commonly, you only drop rows where important columns in that particular row are missing:更常见的是,你只删除特定行中重要列缺失的行:

df_clean = df.dropna(subset=["net_amount", "tax_amount"])

This means:这意味着:

Keep the row as long as net_amount and tax_amount are present.只要 net_amount 和 tax_amount 存在,就保留该行。

Other columns may still contain nulls, and that might be fine.其他列可能仍然包含 null,这可能没问题。

Filling missing values填充缺失值

Sometimes you don’t want to remove rows. You just want to replace missing values with something sensible.有时你不想删除行。你只想用合理的值替换缺失值。

That is where fillna() is useful.这就是 fillna() 有用的地方。

df_clean = df.fillna({"city": "Unknown"})

You can also fill numeric columns:你也可以填充数值列:

df_clean = df.fillna({"tax_amount": 0.0})

This is useful when a missing value has a clear meaning. For example, a missing discount amount might reasonably become 0.0. But be careful. Filling missing values can change the meaning of your data if you choose the wrong default.这在缺失值有明确含义时很有用。例如,缺失的折扣金额可以合理地变为 0.0。但要小心。如果选择了错误的默认值,填充缺失值可能会改变数据的含义。

Changing column types with cast()使用 cast() 更改列类型

Sometimes Spark reads a column as the wrong type, especially when working with CSV files. If so, you can convert a column using the cast() operator:有时 Spark 会将列读取为错误的类型,尤其是在处理 CSV 文件时。如果是这样,你可以使用 cast() 运算符转换列:

from pyspark.sql import functions as F 
df_clean = df.withColumn("net_amount",F.col("net_amount").cast("double") )

This is especially common when dates, numbers, or booleans have been read as strings.这在日期、数字或布尔值被读取为字符串时尤其常见。

Removing duplicate rows删除重复行

Duplicate rows can appear when files are exported more than once, joined incorrectly, or combined from multiple sources. You can remove exact duplicates like this:当文件被多次导出、连接错误或从多个来源合并时,可能会出现重复行。你可以像这样删除完全重复的行:

df_clean = df.dropDuplicates()

Or remove duplicates based on one or more selected columns.或者基于一个或多个选定列删除重复项。

df_clean = df.dropDuplicates(["transaction_id"])

That second version is often more useful because it says:第二种版本通常更有用,因为它表示:

Each transaction ID should only appear once.每个交易 ID 应该只出现一次。

A small data cleaning example一个小的数据清理示例

Putting those ideas together:将这些想法组合起来:

from pyspark.sql import functions as F


df_clean = (
    df
    # Remove transactions missing required values.
    .dropna(subset=["transaction_id", "net_amount"])
    # Supply defaults for optional values.
    .fillna(
        {
            "city": "Unknown",
            "tax_amount": 0.0,
        }
    )
    # Apply the expected numeric types.
    .withColumn(
        "net_amount",
        F.col("net_amount").cast("double"),
    )
    .withColumn(
        "tax_amount",
        F.col("tax_amount").cast("double"),
    )
    # Keep one row for each transaction.
    .dropDuplicates(["transaction_id"])
)

4. Joining datasets in PySpark without getting lost4. 在 PySpark 中连接数据集而不迷失方向

If you’ve worked with databases before, you’ve probably written SQL statements that join two or more tables together. Joins in Spark work the same way, but on Dataframes.如果你以前使用过数据库,你可能写过将两个或多个表连接在一起的 SQL 语句。Spark 中的连接方式相同,但作用于 DataFrame。

What is a join?什么是连接?

If the concept of a join is new to you, they are a way to match rows from one DataFrame with related rows from another DataFrame. In other words, it answers a question like:如果连接的概念对你来说是新的,它们是一种将一个 DataFrame 中的行与另一个 DataFrame 中相关行匹配的方法。换句话说,它回答这样的问题:

“Which rows in this DataFrame correspond to rows in that DataFrame?”“这个 DataFrame 中的哪些行与那个 DataFrame 中的行对应?”

That is the main idea behind every join in PySpark. Once that part is clear, the syntax and join types become much easier to understand.这就是 PySpark 中每个连接背后的主要思想。一旦这部分清楚了,语法和连接类型就变得容易理解多了。

If you have two Dataframes like this:如果你有两个像这样的 DataFrame:

sales_data.csvsales_data.csv

transaction_id, customer_name, net_amount, tax_amount
101, Alice,    250.50, 25.05
102, Bob,      120.00, 6.00

customers.csvcustomers.csv

customer_name, city, loyalty_level
Alice, New York, Gold
Bob,   London,   Silver

You can join them on their common customer_name field like this:你可以像这样在它们共同的 customer_name 字段上连接它们:

df_sales = spark.read.csv("sales_data.csv", header=True)
df_customers = spark.read.csv("customers.csv", header=True)
df_joined = df_sales.join(df_customers, on="customer_name", how="inner")
df_joined.show()


# Output
+-------------+--------------+----------+----------+--------+-------------+
|customer_name|transaction_id|net_amount|tax_amount|city    |loyalty_level|
+-------------+--------------+----------+----------+--------+-------------+
|Alice        |101           |250.50    |25.05     |New York|Gold         |
|Bob          |102           |120.00    |6.00      |London  |Silver       |
+-------------+--------------+----------+----------+--------+-------------+

Which join should beginners use?初学者应该使用哪种连接?

There are several different types of joins available in Spark. For 99% of beginner use‑cases, you’ll use one of the following:Spark 中有几种不同类型的连接可用。对于 99% 的初学者用例,你会使用以下之一:

  • inner  — show only matching rowsinner — 只显示匹配的行
  • left  — show everything in the left table, plus matchesleft — 显示左表中的所有内容,加上匹配项
  • outer  — show all rows from both tablesouter — 显示两个表中的所有行

And of these, the inner join will be far and away the most common type of join you’ll use in your day-to-day work在这些连接中,inner join 将是你日常工作中使用最频繁的连接类型

Don’t worry about “broadcast”, “sort‑merge”, “shuffle‑hash”, or any other advanced join strategy yet. As your experience of Spark grows, you can read up on these at your leisure.不要担心“broadcast”、“sort-merge”、“shuffle-hash”或任何其他高级连接策略。随着你 Spark 经验的增长,你可以在闲暇时研究这些。

Just remember:只需记住:

Joins are computaionally more expensive than simple column operations, so use them when necessary — but not casually.连接在计算上比简单的列操作更昂贵,所以必要时才使用——但不要轻易使用。

5. Reading & Writing data out in the “Spark way”: Parquet5. 以“Spark 方式”读写数据:Parquet

Most beginners stick with CSV because it’s familiar. But CSV is slow, rigid, and lacks support for data types, and in real life, Parquet is Spark’s native data format. Parquet is a columnar, compressed data format ideally suited for data analytics, data reporting and read-heavy workloads.大多数初学者坚持使用 CSV,因为它很熟悉。但 CSV 速度慢、僵化,且缺乏对数据类型的支持,而在实际工作中,Parquet 是 Spark 的原生数据格式。Parquet 是一种列式、压缩的数据格式,非常适合数据分析、数据报告和读取密集型工作负载。

When Spark reads a Parquet data set:当 Spark 读取 Parquet 数据集时:

  • It only loads the columns you actually need.它只加载你实际需要的列。
  • It understands every data type.它理解每种数据类型。
  • It loads significantly faster than CSV.它比 CSV 加载速度快得多。

You write out the Dataframe contents in Parquet format files like this:你可以像这样将 DataFrame 内容写入 Parquet 格式文件:

df_joined.write.mode("overwrite").parquet("output/enriched_sales")

Then you can read it back instantly like this,然后你可以像这样立即读回它,

df_fast = spark.read.parquet("output/enriched_sales")
df_fast.show()

NB. Using Parquet for file input and output is the single easiest performance “upgrade” for any Spark beginner.注意:使用 Parquet 进行文件输入和输出是任何 Spark 初学者最简单的性能“升级”。

6. Thinking in PySpark workflows6. 以 PySpark 工作流的方式思考

Once you understand how to read data, clean it, transform it, join it, and write it back out, the next step is learning how to organise those actions into a simple workflow. A beginner PySpark project usually follows this sequence:一旦你理解了如何读取数据、清理数据、转换数据、连接数据以及将其写回,下一步就是学习如何将这些操作组织成简单的工作流。一个初学者 PySpark 项目通常遵循以下顺序:

Read data
  -> check and clean it
  -> add useful columns
  -> combine with other data
  -> write the result

That may sound obvious, but it is an important shift. You are no longer just experimenting with one DataFrame at a time. You are building a repeatable process.这听起来可能很明显,但这是一个重要的转变。你不再只是逐个 DataFrame 地实验。你正在构建一个可重复的过程。

Keep each stage simple保持每个阶段简单

A useful beginner habit is to give each stage of your workflow a clear purpose. For example:一个有用的初学者习惯是给你的工作流的每个阶段一个明确的目的。例如:

df_raw = spark.read.schema(schema).csv("sales_data.csv", header=True)

df_clean = df_raw.dropna(subset=["net_amount", "tax_amount"])

df_enriched = df_clean.withColumn(
    "gross_amount",
    F.col("net_amount") + F.col("tax_amount")
)

df_final = df_enriched.join(df_customers, on="customer_name", how="left")

df_final.write.mode("overwrite").parquet("output/final_dataset")

This style is slightly more verbose than chaining everything into one long expression, but it is much easier to read when you are learning.这种风格比把所有内容链式组合成一个长表达式稍微冗长一些,但在学习时更容易阅读。

Each DataFrame name tells you where you are in the workflow:每个 DataFrame 名称告诉你工作流中的位置:

df_raw       -> the data as it arrived
df_clean     -> the data after basic cleaning
df_enriched  -> the data after adding new meaning
df_final     -> the dataset ready to save

Why this matters为什么这很重要

When something goes wrong, this structure makes debugging much easier.当出现问题时,这种结构使调试容易得多。

You can inspect each stage by looking at the data:你可以通过查看数据来检查每个阶段:

df_raw.show()
df_clean.show()
df_enriched.show()

You can check row counts:你可以检查行数:

df_raw.count()
df_clean.count()
df_final.count()

This helps to answer useful questions like:这有助于回答有用的问题,比如:

Did rows disappear unexpectedly during cleaning?
Did the join create more rows than expected?
Did a calculated column produce nulls?

The simple mental model of: Inputs → preparation → combination → output will take you surprisingly far in your PySpark journey.输入 → 准备 → 组合 → 输出的简单心智模型会在你的 PySpark 之旅中带你走得很远。

7. A gentle introduction to the Spark UI7. Spark UI 的温和介绍

Spark has a nice little web UI that switches on when you run an action like .count() or .write(). When your Spark job is running locally, visit:Spark 有一个不错的小网页 UI,当你运行 .count() 或 .write() 等动作时会自动开启。当 Spark 作业在本地运行时,访问:

http://localhost:4040

You should see something like this displayed.你应该会看到类似这样的显示。

It looks a bit overwhelming, but you don’t need to understand every tab. At this stage, you only need to know that the UI exists and why it’s useful. And it’s useful because it helps you see which Spark jobs have run or are currently running. 它看起来有点让人不知所措,但你不需要理解每个标签页。在这个阶段,你只需要知道 UI 存在以及它为什么有用。它有用是因为能帮助你查看哪些 Spark 作业已经运行或正在运行。

And, as your experience in Spark grows, the UI can help you understand why jobs failed or are taking longer to run than expected. But that comes much later. For now, treat the Spark UI like the dashboard in your car — you don’t need to understand the engine to notice when something looks odd.而且,随着你在 Spark 方面经验的增长,UI 可以帮助你理解作业为什么失败或运行时间比预期长。但那是以后的事了。现在,把 Spark UI 想象成你车上的仪表盘——你不需要理解发动机就能注意到某些地方看起来不对劲。

Summary: You’re now ready for your first real PySpark project总结:你现在准备好开始你的第一个真正的 PySpark 项目了

At this point, you’ve moved beyond “I can run Spark” into “I can build a clean, simple Spark pipeline.”在这一点上,你已经从“我能运行 Spark”进阶到“我能构建一个干净、简单的 Spark 管道。”

You now know how to:你现在知道如何:

  • read data safely,安全地读取数据,
  • clean and prepare it,清理和准备数据,
  • enrich it with new columns,用新列丰富数据,
  • combine multiple datasets,组合多个数据集,
  • save the result efficiently,高效地保存结果,
  • and observe Spark just enough to stay confident.以及恰到好处地观察 Spark 以保持信心。

Nothing in this article required a cluster. Nothing required advanced tuning. This is exactly how many real PySpark projects begin.本文中的内容都不需要集群。都不需要高级调优。这正是许多真实 PySpark 项目的开始方式。

When you’re more experienced, you may want to build on your knowledge by researching some of these topics.当你更有经验时,你可能想通过研究以下一些主题来扩展你的知识。

  • reading execution plans阅读执行计划
  • understanding shuffles理解 shuffle
  • managing partitions管理分区
  • other join types其他连接类型
  • simple performance tuning简单的性能调优

These are some of the topics I hope to cover in a future article, but for now, you’ve mastered your next major milestone, and you can build something meaningful and useful with PySpark. 这些是我希望在未来文章中涵盖的一些主题,但现在,你已经掌握了下一个重要的里程碑,你可以用 PySpark 构建有意义且有用的东西了。


BTW, here is that link to the first article in this series,
PySpark for Beginners: Mastering the Basics, which I mentioned at the start.
顺便说一下,这是本系列第一篇文章《PySpark 入门:掌握基础》的链接,我在开头提到过。


Written By作者

Share This Article分享这篇文章

Towards Data Science is a community publication. Submit your insights to reach our global audience and earn through the TDS Author Payment Program.Towards Data Science 是一个社区出版物。提交你的见解以触达我们的全球受众,并通过 TDS 作者付费计划获得收益。

Write for TDS

Related Articles相关文章