Kurs
PySpark-DataFrames sind echt wichtig, wenn du skalierbare Pipelines in Spark aufbaust. Wichtig ist, dass man sich merken sollte, dass DataFrames unveränderlich sind. Das heißt, wenn du mal einen hast, kannst du ihn nicht direkt ändern; du bekommst immer einen neuen DataFrame, wenn du was änderst. Hier kommt PySpark- withColumn()
, ins Spiel. Damit kannst du Spalten hinzufügen, aktualisieren oder ändern, aber da sich DataFrame nicht an Ort und Stelle ändern, musst du das Ergebnis immer einer neuen Variablen zuweisen.
In diesem Tutorial zeige ich dir, wie du mit „ withColumn()
“ deine Datensätze bearbeiten und optimieren kannst, egal ob du Funktionen erstellst, Typen bereinigst oder Logik hinzufügst.
Was macht die Funktion „withColumn()“ in PySpark?
Kurz gesagt, „ withColumn()
“ gibt einen neuen DataFrame mit einer hinzugefügten oder ersetzten Spalte zurück. Da DataFrame nicht mutiert, musst du den Rückgabewert „ df = df.withColumn(...)
“ zuweisen.
Im Hintergrund fügt jeder Aufruf von „ withColumn()
“ eine Projektion zum logischen Plan hinzu. Das ist okay, wenn du nur ein oder zwei machst, aber wenn du viele hintereinander machst, kann das den Plan überladen und Spark deutlich langsamer machen.
Bist du neu bei PySpark? In unserem Kurs „Einführung in PySpark“ lernst du, wie du riesige Datensätze verarbeitest, abfragst und optimierst, um leistungsstarke Analysen durchzuführen. So kannst du die Grundlagen für den Umgang mit Big Data ganz einfach meistern.
Die wichtigsten Anwendungen von PySpark mit column()
Schauen wir uns mal die wichtigsten Funktionen von „ withColumn()
“ an:
Eine Konstante Spalte hinzufügen
Angenommen, du möchtest einen Zeitstempel oder ein Flag hinzufügen. Benutz einfach lit()
oder typedLit()
von pyspark.sql.functions
. Zum Beispiel:
from pyspark.sql.functions import lit
df = df.withColumn("ingest_date", lit("2025-07-29"))
Eine Spalte aus vorhandenen Daten erstellen
Vielleicht willst du einen abgeleiteten Wert, Zeichenfolgen kombinieren oder eine Summe berechnen. Du kannst Folgendes tun:
from pyspark.sql.functions import col, expr
df = df.withColumn("full_name", col("first_name") + expr(" ' ' + last_name"))
Arithmetische oder ausdrucksbasierte Transformationen passen auch hier rein.
Eine vorhandene Spalte überschreiben
Wenn du schon eine Spalte hast und sie ändern willst, ersetzt „ withColumn()
“ sie einfach:
df = df.withColumn("age", col("age").cast("integer"))
Du musst nichts löschen und neu hinzufügen.
Datentypen umwandeln
Den Typ einer Spalte zu ändern ist echt einfach:
df = df.withColumn("price", col("price").cast("decimal(10,2)"))
Ich finde das besonders praktisch, wenn ich lose JSON- oder CSV-Dateien lese, bei denen die Typen als Zeichenfolgen kommen.
Wenn du mehr Beispiele dafür suchst, was PySpark ist und wie du es nutzen kannst, schau dir doch mal unser Tutorial „Erste Schritte mit PySpark” an.
Bedingte Logik und when()-Ausdrücke
Manchmal musst du mehr als nur einfache Rechenaufgaben machen. Vielleicht erstellst du eine Statusspalte, die auf einer Punktzahl basiert. Oder Einträge anhand einer Kombination von Regeln markieren. Hier kommt „ when()
” von pyspark.sql.functions
ins Spiel. Stell dir das wie eine „ IF
“-Anweisung vor. Du kannst es mit „ otherwise()
“ kombinieren, um mehrere Pfade abzudecken.
So sieht es aus:
from pyspark.sql.functions import when
df = df.withColumn(
"grade",
when(col("score") >= 90, "A")
.when(col("score") >= 80, "B")
.when(col("score") >= 70, "C")
.otherwise("F")
)
Es liest sich fast wie normales Englisch: Wenn die Punktzahl mindestens 90 ist, dann A. Wenn sie 80 ist, dann B. Mach weiter... und wenn nichts davon passt, gib eine 6. Es ist echt ausdrucksstark, und Spark macht aus dieser Logik hinter den Kulissen einen effizienten Ausdruck. Keine verschachtelten Schleifen, keine „ apply()
“-Aufrufe, nur saubere DAGs und klare Ausführungspläne.
Das ist praktisch, wenn du nicht auf SQL umsteigen oder deinen Code mit UDFs überladen willst.
In unserem Tutorial „Einführung in Spark SQL in Python“ kannst du lernen, wie du Datenbearbeitung und Feature-Sets für maschinelles Lernen in Spark mit SQL in Python erstellen kannst.
Spalten mit integrierten und benutzerdefinierten Funktionen umwandeln
Oft musst du Spalten formatieren oder umstrukturieren, Zeichenfolgen in Großbuchstaben umwandeln, sie in Teile aufteilen, Werte verknüpfen und so weiter. PySpark hat eine umfangreiche Bibliothek mit integrierten Funktionen, die direkt in Python laufen. withColumn()
.
Hier ein Beispiel:
from pyspark.sql.functions import upper, concat_ws, split
df = df.withColumn("full_caps", upper(col("name")))
df = df.withColumn("city_state", concat_ws(", ", col("city"), col("state")))
df = df.withColumn("first_word", split(col("description"), " ").getItem(0))
Eingebaute Funktionen sind echt super. Schnell, nativ und optimiert. Aber manchmal gibt's halt Ausnahmen, die in keine Schublade passen. Da kommen benutzerdefinierte Funktionen (UDFs) ins Spiel.
Mit einer UDF arbeiten
Angenommen, du willst die Länge einer Zeichenfolge berechnen und sie kennzeichnen:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def label_length(x):
return "short" if len(x) < 5 else "long"
label_udf = udf(label_length, StringType())
df = df.withColumn("name_length_label", label_udf(col("name")))
Ganz einfach. Aber pass auf, UDFs bringen einen gewissen Aufwand mit sich. Sie holen Daten aus der optimierten Engine, lassen sie in Python laufen und packen sie dann wieder ein. Das ist okay, wenn's nötig ist, aber für Aufgaben mit vielen Daten solltest du lieber integrierte Funktionen oder SQL-Ausdrücke nehmen, wenn das geht.
Lerne in unserem Tutorial „Effektive Verwendung von PySpark-UDFs und Pandas-UDFs“, wie du PySpark-UDFs, einschließlich Pandas-UDFs, erstellen, optimieren und verwenden kannst, um benutzerdefinierte Datenumwandlungen effizient zu verarbeiten und die Spark-Leistung zu verbessern.
Leistungsaspekte und fortgeschrittene Praktiken
Irgendwann stößt jeder PySpark-Nutzer auf dieses Problem: Man stapelt immer mehr „ withColumn()
“-Aufrufe und die Pipeline wird immer langsamer. Der Grund? Jeder Aufruf fügt dem logischen Plan eine neue Ebene hinzu, die Spark analysieren, optimieren und ausführen muss.
Wenn du nur ein oder zwei Spalten hinzufügst, ist das okay. Aber wenn du fünf, sechs oder mehr hintereinander machst, solltest du anfangen, anders zu denken.
Benutzen select()
wenn du viele Spalten hinzufügst
Anstatt immer wieder „ withColumn()
“ aufzurufen, erstelle lieber eine neue Spaltenliste mit „ select()
“:
df = df.select(
"*",
(col("salary") * 0.1).alias("bonus"),
(col("age") + 5).alias("age_plus_five")
)
Dieser Ansatz erstellt den logischen Plan in einem Schritt.
Mehr über „ select()
” und andere Methoden erfährst du in unserem PySpark-Spickzettel „ ”: Spark in Python – Tutorial zur Datenverarbeitung.
Was ist mit withColumns()
?
Mit der in Spark 3.3 eingeführten Funktion „ withColumns()
” kannst du mehrere Spalten in einem Durchgang hinzufügen. Keine wiederholten Anrufe mehr. Es ist eine Methode im Wörterbuchstil:
df = df.withColumns({
"bonus": col("salary") * 0.1,
"age_plus_five": col("age") + 5
})
Noch hat nicht jeder Spark 3.3+ installiert, aber wenn du es schon hast, dann probier's aus. Es ist sauberer, schneller und vermeidet das Problem „Tod durch Verkettung“.
Vollständiges PySpark-Beispiel mit Column()
Angenommen, du arbeitest an Daten zur Benutzeraktivität für eine Plattform, die auf Abonnements basiert. Dein roher DataFrame sieht ungefähr so aus:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("withColumn-demo").getOrCreate()
data = [
("Alice", "NY", 24, 129.99),
("Bob", "CA", 31, 199.95),
("Charlie", "TX", 45, 0.0),
("Diana", "WA", 17, 19.99)
]
columns = ["name", "state", "age", "purchase_amount"]
df = spark.createDataFrame(data, columns)
Du hast Namen, Bundesstaaten, Alter und wie viel sie ausgegeben haben. Ziemlich einfach. Aber in der echten Welt reicht das nie aus. Also, hier ist, was wir als Nächstes machen werden:
- Füge eine feste Spalte für das Erfassungsdatum hinzu.
- Mach eine neue Spalte, die zeigt, ob der Nutzer volljährig ist.
- Formatiere „
purchase_amount
“ mit zwei Dezimalstellen. - Kategorisiere die Nutzer nach ihrem Ausgabeverhalten.
- Wende eine benutzerdefinierte Funktion an, um Nutzer zu kennzeichnen.
- Benutz „
withColumns()
“, um zusätzliche Werte übersichtlicher zu verpacken.
Bereitest du dich auf dein nächstes Vorstellungsgespräch vor? Der Artikel „ Die 36 wichtigsten PySpark-Interviewfragen und -antworten für 2025” ist ein super Leitfaden für PySpark-Interviewfragen und -antworten, der alles von den Grundlagen bis hin zu fortgeschrittenen Techniken und Optimierungsstrategien abdeckt.
Schritt 1: Füge ein festes Erfassungsdatum hinzu
Es ist eine gute Idee, einen Lernpfad zu verfolgen, wann die Daten in dein System eingegeben werden.
from pyspark.sql.functions import lit
df = df.withColumn("ingest_date", lit("2025-07-29"))
Schritt 2: Erwachsene vs. Minderjährige markieren
Du hättest einfach col("age") >= 18
verwenden können, aber wenn du es mit when()
umschreibst, hast du die volle Kontrolle, falls die Logik mal komplizierter wird.
from pyspark.sql.functions import when, col
df = df.withColumn(
"is_adult",
when(col("age") >= 18, True).otherwise(False)
)
Schritt 3: Format Kaufbetrag
Das Umwandeln von Datentypen ist eine der häufigsten Aufgaben, die du erledigen musst, vor allem beim Lesen von CSV- oder JSON-Dateien.
df = df.withColumn("purchase_amount", col("purchase_amount").cast("decimal(10,2)"))
Schritt 4: Ausgabenhöhe kategorisieren
Nehmen wir mal an, du willst drei Gruppen: „keine“, „niedrig“ und „hoch“.
df = df.withColumn(
"spend_category",
when(col("purchase_amount") == 0, "none")
.when(col("purchase_amount") < 100, "low")
.otherwise("high")
)
So kannst du Nutzer segmentieren, ohne eine separate Abfrage laufen zu lassen.
Schritt 5: Benutzer mit einer benutzerdefinierten Funktion kennzeichnen
Jetzt zu einer erfundenen Regel. Nehmen wir mal an, du stempelst jemanden wegen der Länge seines Namens ab.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def user_label(name):
return "simple" if len(name) <= 4 else "complex"
label_udf = udf(user_label, StringType())
df = df.withColumn("label", label_udf(col("name")))
Schritt 6: Füge mehrere zusätzliche Spalten auf einmal hinzu
Vielleicht möchtest du noch ein paar mehr, das Alter in Monaten und eine Willkommensnachricht.
df = df.withColumns({
"age_in_months": col("age") * 12,
"welcome_msg": col("name") + lit(", welcome aboard!")
})
Viel sauberer, als zweimal „ withColumn()
“ aufzurufen.
So sieht das endgültige DataFrame aus, wenn du es anzeigst:
df.show(truncate=False)
Name |
Staat |
Alter |
purchase_amount |
ingest_date |
is_adult |
Ausgabenkategorie |
Etikett |
age_in_months |
welcome_msg |
Alice |
NY |
24 |
129,99 |
2025-07-29 |
Richtig |
hoch |
komplex |
288 |
Alice, willkommen an Bord! |
Bob |
CA |
31 |
199,95 |
2025-07-29 |
Richtig |
hoch |
einfach |
372 |
Bob, willkommen an Bord! |
Charlie |
TX |
45 |
0,00 |
2025-07-29 |
Richtig |
keine |
komplex |
540 |
Hey Charlie, willkommen an Bord! |
Diana |
WA |
17 |
19,99 |
2025-07-29 |
False |
low |
komplex |
204 |
Hey Diana, willkommen an Bord! |
Diese Art von Transformationsstapel ist bei der Feature-Entwicklung, der Berichterstellung oder der Bereinigung von Feeds von Drittanbietern üblich.
Lerne die Grundlagen der Arbeit mit Big Data mit PySpark in unserem Kurs „Big Data Fundamentals with PySpark ”.
withColumn() – Bewährte Vorgehensweisen und Fallstricke
PySpark's „ withColumn()
” sieht vielleicht einfach aus, kann aber selbst erfahrene Entwickler in die Irre führen, wenn man nicht aufpasst. Hier sind ein paar Dinge, die deine Pipeline unbemerkt durcheinanderbringen können, sowie ein paar Gewohnheiten, die dich vor bösen Überraschungen bewahren können.
Weise das Ergebnis immer neu zu
Das ist zwar ganz einfach, überrascht aber trotzdem viele Leute: Mit „ withColumn()
“ ändert man nicht den ursprünglichen DataFrame. Du bekommst ein neues. Wenn du vergisst, es neu zuzuweisen, geht deine Änderung verloren.
df.withColumn("new_col", lit(1)) # This won't do anything
df = df.withColumn("new_col", lit(1)) # This works
Pass auf, dass du nicht aus Versehen was überschreibst.
Nehmen wir mal an, dein DataFrame hat eine Spalte namens „status“. Du machst das so:
df = df.withColumn("Status", lit("Active"))
Sieht harmlos aus, oder? Aber Spark behandelt Spaltennamen standardmäßig als groß-/kleinschreibungsunabhängig. Das heißt, du hast gerade deine ursprüngliche Statusspalte überschrieben. Ohne es zu merken.
Eine Lösung ist, immer vorher und nachher df.columns
zu checken. Oder, wenn deine Pipeline das unterstützt, schalte die Groß-/Kleinschreibung ein mit:
spark.conf.set("spark.sql.caseSensitive", "true")
Verwende keine Python-Literale in Ausdrücken.
Das vergisst man leicht. Vermeide beim Hinzufügen von Konstanten rohe Python-Werte. Pack sie immer mit lit()
ein.
df = df.withColumn("region", "US") # Bad
df = df.withColumn("region", lit("US")) # Good
Warum? Weil „ withColumn()
“ einen Spaltenausdruck erwartet, keinen Rohwert. Wenn du einen Fehler machst, kann Spark eine wenig hilfreiche Fehlermeldung ausgeben oder, schlimmer noch, die nachgelagerte Logik stillschweigend unterbrechen.
Behandle Ausnahmen außerhalb von withColumn()
Manchmal werden Leute kreativ und packen ganze „ withColumn()
“-Blöcke in „try/except“-Blöcke. Es ist besser, riskante Teile (wie UDFs oder Datenlesungen) zu isolieren und Ausnahmen dort abzufangen. Halte deine Transformationsebene übersichtlich und vorhersehbar.
try:
def risky_udf(x):
if not x:
raise ValueError("Empty input")
return x.lower()
except Exception as e:
print("Error in UDF definition:", e)
Lass Spark frühzeitig scheitern, versteck es nicht hinter verschachtelten Try-Blöcken.
Mehr über Ausnahmen in Python erfährst du in unserem Tutorial „Ausnahme- und Fehlerbehandlung in Python ”.
Verwende lieber Einbauten als UDFs.
Klar, UDFs geben dir Power. Aber sie haben auch Nachteile: langsamere Leistung, schwierigeres Debugging und weniger Optimierung. Wenn es eine eingebaute Funktion gibt, die das erledigt, dann benutze sie.
Das hier:
df = df.withColumn("upper_name", upper(col("name")))
Ist viel schneller als das hier:
df = df.withColumn("upper_name", udf(lambda x: x.upper(), StringType())(col("name")))
Wann man withColumn() nicht verwenden sollte
Auch wenn withColumn()
echt flexibel ist, gibt's Momente, wo es einfach nicht das richtige Tool für die Aufgabe ist.
Du gestaltest gerade viele Spalten auf einmal um.
Wenn du dich dabei erwischst, wie du zehnmal hintereinander „ withColumn()
“ sagst, ist es Zeit, einen Gang zurückzuschalten. Benutz stattdessen „ select()
“ und schreib deine Transformationen als Teil einer neuen Projektion.
df = df.select(
col("name"),
col("age"),
(col("salary") * 0.15).alias("bonus"),
(col("score") + 10).alias("adjusted_score")
)
Es ist übersichtlicher, läuft besser und sorgt dafür, dass der Optimierer von Spark für dich arbeitet und nicht gegen dich.
Du willst Logik im SQL-Stil schreiben.
Wenn dein Team viel mit SQL arbeitet und du das DataFrame schon als temporäre Ansicht gespeichert hast, ist es oft einfacher, einfach eine SQL-Abfrage zu machen.
df.createOrReplaceTempView("users")
df2 = spark.sql("""
SELECT name, age,
CASE WHEN age >= 18 THEN true ELSE false END AS is_adult
FROM users
""")
Das kann für SQL-versierte Analysten oder Teams, die sowohl mit Spark als auch mit herkömmlichen Datenbanken arbeiten, einfacher sein.
Verbessere deine SQL-Kenntnisse mit interaktiven Kursen, Lernpfaden und Projekten, die von echten Experten mithilfe unserer SQL-Kurse zusammengestellt wurden .
Du bist schon auf Spark 3.3+
Wenn du Spark 3.3 oder neuer benutzt und mehrere Spalten hinzufügen musst, ist „ withColumns()
” genau das Richtige für dich. Das ist nicht nur praktisch, sondern kann auch schneller sein, weil es nur ein einziges logisches Plan-Update braucht.
Lerne in unserem Kurs „Grundlagen von PySpark”, wie du mit dem PySpark-Paket verteiltes Datenmanagement und maschinelles Lernen in Spark umsetzt.
Fazit
Die Funktion „ withColumn()
” von PySpark ist eines der vielseitigsten Tools in deinem Datenumwandlungsarsenal. Mit ihr kannst du direkt in einem DataFrame-zentrierten Arbeitsablauf Features hinzufügen, ändern und entwickeln. Von Casting-Typen und Injektionskonstanten bis hin zur Einbettung komplexer Logik mit Bedingungen und UDFs hilft dir „ withColumn()
“ dabei, unübersichtliche Daten in produktionsreife Pipelines umzuwandeln.
Aber mit dieser Macht kommt auch Verantwortung. Wenn du „ withColumn()
“ in langen Ketten zu oft benutzt, kann das die Leistung unbemerkt beeinträchtigen, weil es den logischen Plan aufbläht und es schwieriger macht, deine Spark-Jobs zu optimieren und zu debuggen. Deshalb kann es echt wichtig sein, zu wissen, wann man auf select()
, withColumns()
oder sogar SQL zurückgreifen sollte, um einen Job zu erledigen, der entweder nur so vor sich hin kriecht oder richtig skaliert.
Da Spark ständig weiterentwickelt wird, vor allem mit Funktionen wie „ withColumns()
” in Spark 3.3+, ist es wichtig, die internen Abläufe und die Kompromisse bei der Leistung der einzelnen Methoden zu verstehen, um saubereren, schnelleren und besser wartbaren Code zu schreiben.
Lerne in unserem Kurs „Feature Engineering mit PySpark” die Techniken hinter groß angelegten Spaltenumwandlungen, vermeide die Fallstricke der Planinflation und finde heraus, wie Profis Feature-Pipelines optimieren.
PySpark withColumn() – Häufig gestellte Fragen
Warum läuft mein Spark-Job langsam, wenn ich withColumn() oft benutze?
Wenn du mehrere „ withColumn()
“-Aufrufe aneinanderhängst, fügt Spark jeden einzelnen als separaten Schritt in den logischen Ausführungsplan ein. Mit der Zeit kann das zu einem aufgeblähten Plan führen, der schwieriger zu optimieren ist und langsamer läuft. Anstatt zehn „ withColumn()
“-Aufrufe zu stapeln, versuch doch mal, deine neuen Spalten innerhalb eines einzigen „ select()
“ zu erstellen, oder benutze „ withColumns()
“, um mehrere Spalten auf einmal hinzuzufügen.
Kann ich mit column() eine Spalte löschen?
Nein, mit „ withColumn()
“ kannst du nur Spalten hinzufügen oder ersetzen, aber nicht löschen. Wenn du eine Spalte löschen willst, nimm stattdessen „ drop()
“. Du kannst auch „ select()
“ verwenden, um nur die Spalten zu behalten, die du brauchst.
Warum kriege ich 'ne Fehlermeldung, wenn ich versuche, 'ne Zeichenfolge oder 'ne Zahl in withColumn() zu benutzen?
Das passiert meistens, wenn du einen rohen Python-Wert übergibst, anstatt ihn mit ` lit()
` zu umschließen. `withColumn()
` erwartet einen Spark-Spaltenausdruck. So geht's richtig:
from pyspark.sql.functions import lit
df = df.withColumn("new_col", lit(42))
