Kurs
CSV-Dateien sind eine wichtige Säule in der Datenverarbeitung und -analyse. Fast jeder Datenexperte, vom Dateningenieur über den Datenwissenschaftler bis hin zum PySpark-Entwickler, hat irgendwann einmal mit CSV-Dateien zu tun. Wenn die Datenmengen jedoch von Megabyte auf Gigabyte oder sogar auf Terabyte und mehr anwachsen, können einfache Tools wie Pandas oder die Standard-Python-Bibliotheken die Last nicht mehr bewältigen. An dieser Stelle werden Apache Spark und PySpark für die Verwaltung großer CSV-Dateien in verteilten Rechenumgebungen wichtig.
In diesem Artikel erfährst du alles, was du beim Lesen großer CSV-Dateien mit PySpark wissen musst.
Stelle sicher, dass du PySpark installiert hast und mit den Grundlagen vertraut bist, indem du unser Tutorial Erste Schritte mit PySpark befolgst.
Was ist das Lesen von CSV in PySpark?
Mit PySpark kannst du CSV-Dateien in verteilte DataFrames einlesen. DataFrames in PySpark sind den Pandas DataFrames sehr ähnlich und bieten eine vertraute Schnittstelle. Unter der Oberfläche jedoch verteilen die PySpark DataFrames die Berechnungen und den Speicher auf mehrere Knoten und bieten so eine außergewöhnliche Leistung für große Datenmengen.
PySpark bietet überzeugende Vorteile für große CSV-Dateien, wie zum Beispiel:
- Verteiltes Laden von Daten,
- Robuste Handhabung von Nullwerten,
- Flexibilität bei der Definition des Schemas und
- Unkomplizierte Möglichkeiten, mehrere oder komprimierte CSV-Dateien zu verwalten.
Dennoch gibt es gemeinsame Herausforderungen, wie z. B. die effiziente Handhabung von Kopfzeilen, das genaue Ableiten oder Spezifizieren von Schemata und der Umgang mit schlecht geformten oder inkonsistenten Datensätzen. Lies weiter, um zu erfahren, wie du mit all diesen Herausforderungen umgehen kannst.
Das Lesen von CSV-Dateien in PySpark ist eines der Themen, die du verstehen musst, um ein PySpark-Interview zu bestehen. Unsere Top 36 PySpark-Interview-Fragen und -Antworten für 2025 bieten einen umfassenden Leitfaden für PySpark-Interview-Fragen und -Antworten, der Themen von grundlegenden Konzepten bis zu fortgeschrittenen Techniken und Optimierungsstrategien abdeckt.
Grundlagen des Lesens von CSV-Dateien in PySpark
Das Lesen von CSV-Daten ist oft einer der ersten und wichtigsten Schritte in PySpark-Workflows und bildet die Grundlage für nachfolgende Transformationen, explorative Analysen und maschinelle Lernaufgaben. Wenn du diesen Schritt richtig machst, werden die Daten sauberer verarbeitet und die nachgelagerte Leistung verbessert.
Konzeptioneller Rahmen
PySpark liest CSV-Dateien innerhalb des verteilten Modells von Spark. Anstatt die CSV-Daten auf einem einzigen Rechner komplett in den Speicher zu lesen, verteilt Spark große Datenaufgaben auf mehrere Clusterknoten. Der in Spark integrierte Catalyst-Optimierer verbessert die Leistung weiter, indem er die zugrunde liegenden Operationen, die während der CSV-Ingestion erforderlich sind, effizient ausführt.
Kernlesesyntax
Der einfachste Weg, CSV-Dateien zu lesen, sind die eingebauten Funktionen von Spark:
spark.read.csv("file_path", header=True, inferSchema=True)
Oder ausdrücklich:
spark.read.format("csv").option("header", "True").load("file_path")
Die wichtigsten Parameter sind:
file_path
- Speicherort der CSV-Dateien.header
- Spaltennamen aus CSV-Kopfzeilen setzen, wennTrue
.inferSchema
- leitet automatisch die Datentypen der Spalten ab.delimiter
- Zeichen zur Trennung der Spalten; Standard ist das Komma.
In unserem Tutorial PySpark von Grund auf lernen 2025 erfährst du mehr über die Grundlagen von PySpark und wie du es lernen kannst.
CSV-Dateien lesen: Optionen und Konfigurationen
PySpark bietet umfangreiche Optionen, mit denen du den Prozess des CSV-Lesens genau steuern kannst.
Kopfzeile und Schema-Inferenz
Die Einstellung header=True
weist Spark an, die erste CSV-Zeile als Spaltennamen zu verwenden.
inferSchema=True
lässt Spark durch das Scannen deiner Daten automatisch Spaltentypen erraten:
spark.read.csv("customers.csv", header=True, inferSchema=True)
Während die Schema-Inferenz anfangs bequem und effektiv ist, leidet die Leistung bei großen Datensätzen, da Spark die Daten wiederholt überfährt, um die Datentypen zu bestimmen.
Benutzerdefinierte Schema-Spezifikation
Die explizite Definition deines Schemas verbessert die Leistung erheblich, da die wiederholten Datenscans von Spark entfallen. Ein definiertes Schema kommuniziert Spaltennamen und -typen im Voraus.
Hier erfährst du, wie du ein eigenes Schema in PySpark definierst:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("score", DoubleType(), True),
])
df = spark.read.csv("customers.csv", schema=schema, header=True)
Als Nächstes wollen wir uns die Handhabung von Trennzeichen in PySpark beim Lesen von CSV-Dateien ansehen.
Umgang mit Begrenzungszeichen und Sonderzeichen
Viele CSV-Dateien verwenden andere Begrenzungszeichen als Kommas, z. B. Pipes oder Tabs. In PySpark kann das Trennzeichen explizit angegeben werden:
spark.read.csv("customers.csv", header=True, delimiter="|")
Außerdem können Escape- und Anführungszeichen konfiguriert werden, um spezielle Zeichenszenarien zu behandeln:
spark.read.csv("data.csv", header=True, escape='\"', quote='"')
Umgang mit ungültigen und fehlenden Werten
In der realen Welt enthalten CSV-Daten häufig Inkonsistenzen oder unvollständige Datensätze. PySpark macht den Umgang mit Nullwerten einfach, indem es benutzerdefinierte Platzhalter in Nullwerte übersetzt:
spark.read.csv("customers.csv", header=True, schema=schema, nullValue="NA")
Dadurch werden ungültige Werte geklärt, was die manuelle Datenbereinigung später erheblich reduziert.
Lass uns andere Strategien für den Umgang mit Nullwerten erkunden.
Du kannst entscheiden, ob du die Nullwerte herausfiltern willst:
# Filter rows where Age is not null
df_filtered = df.filter(df["Age"].isNotNull())
df_filtered.show()
Damit wird der DataFrame so gefiltert, dass er nur Zeilen enthält, in denen die Spalte Age
nicht null ist. Die Ausgabe sollte wie folgt aussehen:
+---+-----+---+------+
| ID| Name|Age|Salary|
+---+-----+---+------+
| 1| John| 25| 50000|
| 3| Bob| 30| NULL|
| 4|Carol| 28| 55000|
+---+-----+---+------+
Die andere Strategie ist, die Nullwerte zu füllen:
# Replace null values in Age and Salary with default values
df_filled = df.na.fill({"Age": 0, "Salary": 0})
df_filled.show()
Die Ausgabe sieht dann so aus:
+---+-----+---+------+
| ID| Name|Age|Salary|
+---+-----+---+------+
| 1| John| 25| 50000|
| 2|Alice| 0| 60000|
| 3| Bob| 30| 0|
| 4|Carol| 28| 55000|
| 5|David| 0| 48000|
+---+-----+---+------+
Mehrere Dateien und Verzeichnisse lesen
PySpark eignet sich hervorragend für die Verwaltung großer Datensätze, die aus mehreren Dateien bestehen. Anstatt Dateien aus einem Verzeichnis manuell nacheinander zu laden und zusammenzuführen, unterstützt PySpark Wildcard-Muster für schnelles, effizientes Massenladen:
spark.read.csv("/data/sales/*.csv", header=True, schema=schema)
Auf diese Weise werden zahlreiche CSV-Dateien in einem einzigen, rationellen Vorgang zu einem DataFrame zusammengefasst.
Sobald die Daten in PySpark geladen sind, umfassen die nächsten Schritte das Wrangling, das Feature Engineering und die Erstellung von Machine Learning-Modellen. Unser Kurs "Feature Engineering mit PySpark " deckt diese Konzepte in aller Tiefe ab.
Optimierungstechniken für effizientes CSV-Lesen
Beim Umgang mit großen CSV-Daten ist es wichtig, die Optimierungsstrategien von PySpark zu nutzen.
Strategien zur Aufteilung
Die Partitionierung hat einen großen Einfluss auf die Leistung, da die Daten gleichmäßig auf die Clusterknoten verteilt werden. Spark ermöglicht eine explizite Kontrolle über die Größe und Anzahl der Partitionen während des Dateningestions, um die nachfolgenden Operationen zu beschleunigen:
df = spark.read.csv("data.csv", header=True, schema=schema).repartition(20)
.repartition(20)
teilt den DataFrame in 20 Partitionen in deinem Spark-Cluster auf. Da Spark Daten in Chunks verarbeitet, können mehr Partitionen verwendet werden:
- Parallelität verbessern
- Arbeitslast über den Cluster verteilen
- Beschleunigung von Transformationen und Schreibvorgängen
Wenn du auf einem Cluster mit vielen Kernen arbeitest, kannst du sie so optimal nutzen. Aber wenn du es übertreibst (z.B. 1000 Partitionen auf einem kleinen Datensatz), kann es zu Verzögerungen kommen.
Entdecke weitere Spark-Funktionen wie repartition
mit unserem PySpark Cheat Sheet: Spark in Python. Es wird ausführlich auf die Initialisierung von Spark in Python, das Laden von Daten, das Sortieren und die Repartitionierung eingegangen.
Caching und Persistenz
Wenn du in deinem Arbeitsablauf wiederholt auf denselben Datensatz zugreifen musst, kann das Zwischenspeichern deines DataFrame im Speicher oder auf der Festplatte die Leistung erheblich steigern:
df.cache()
Bedenke jedoch, dass das Caching ausreichend Systemressourcen erfordert; wäge immer den Speicherverbrauch gegen die Leistungssteigerung ab.
Faule Auswertung und Auslösen von Aktionen
PySpark setzt auf ein faules Bewertungsmodell: DataFrame-Operationen wandeln Pläne um, anstatt sie sofort auszuführen. Das tatsächliche Lesen der Dateien wird nur bei Bedarf ausgeführt, ausgelöst durch Befehle wie show()
, count()
oder collect()
:
# no reading yet
df = spark.read.csv("data.csv", header=True, schema=schema)
# actual read triggered here
df.show(5)
Erweiterte Anwendungsfälle und Überlegungen
Wir wollen uns nun komplexere Szenarien ansehen, die dir beim Lesen von CSV-Dateien begegnen können:
Komprimierte CSV-Dateien lesen
Spark verwaltet komprimierte CSV-Dateien wie gz
oder .bz2
effizient und transparent ohne zusätzliche Konfigurationen:
spark.read.csv("logs.csv.gz", header=True, schema=schema)
Umgang mit fehlerhaften Datensätzen
CSV-Datensätze können falsch geformte Zeilen enthalten. PySpark bietet mehrere Optionen, die dir helfen, Fehler oder fehlerhafte Datensätze elegant zu verwalten:
mode="PERMISSIVE"
(Standard): schließt fehlerhafte Zeilen mit null gefüllten Spalten ein.mode="DROPMALFORMED
: überspringt missgebildete Datensätze stillschweigend. Dieser Modus wird von den eingebauten CSV-Funktionen nicht unterstützt.mode="FAILFAST"
: löst eine Ausnahme aus, wenn ein fehlerhafter Datensatz gefunden wird.
spark.read.csv("data.csv", header=True, schema=schema, mode="FAILFAST")
Gebietsschema und Kodierungseinstellungen
Manchmal werden für CSV-Daten nicht standardisierte Kodierungen verwendet. PySpark kann über den Parameter encoding leicht mit verschiedenen Kodierungen umgehen:
spark.read.csv("data_utf8.csv", header=True, encoding="UTF-8")
Zu den unterstützten Kodierungen gehören, US-ASCII
ISO-8859-1
, , , , und . UTF-8
UTF-16BE
UTF-16LE
UTF-16
Bewährte Praktiken und häufige Fallstricke
Hier sind einige bewährte Methoden, die du beim Lesen von CSV-Dateien mit Spark beachten solltest:
- Gib das Schema explizit an, wenn die Struktur des Datensatzes bekannt ist.
- Kontrolliere die Partitionierung, um die Arbeitslast effizient zu verteilen.
- Strategischer Cache für häufig genutzte DataFrames.
- Löse bewusst Leseaktionen aus und achte auf eine faule Bewertung.
Achte darauf, dass du sie vermeidest:
- Einstellen von
header=False
für CSVs mit Kopfzeilen. - Verlasse dich ausschließlich auf
inferSchema
in großen oder wiederholt abgerufenen Datensätzen. - Ignorieren kritischer Trennzeichen- oder Kodierungseinstellungen.
Fazit
Die leistungsstarken CSV-Ingestionsfunktionen von PySpark richtig zu verstehen und zu nutzen, ist für eine effektive Big Data-Verarbeitung unerlässlich. Mit einer klaren Schemaspezifikation, dem Umgang mit benutzerdefinierten Nullformaten, einer effizienten Partitionierung und der Verwaltung von komprimierten oder mehreren Dateien wird dein Arbeitsablauf rationalisiert und performant.
Denke daran, dass PySpark zwar enorme Vorteile für große Datenaufgaben bietet, einfachere Tools wie Pandas für kleine Datensätze aber trotzdem ausreichen können. Verwende PySpark, wenn du mit Daten arbeitest, die die Möglichkeiten eines einzelnen Rechners übersteigen, und berücksichtige immer die Vorteile des verteilten Rechnens.
Wenn du mehr über PySpark erfahren möchtest, schau dir unsere ausführlichen Spark-Kurse an, z. B:
PySpark CSV lesen FAQ
Wie kann ich große CSV-Dateien am besten in PySpark lesen?
Am besten gibst du ein eigenes Schema mit StructType an, anstatt dich auf inferSchema zu verlassen. Diese Methode verbessert die Leistung, da wiederholte Datenabfragen vermieden werden.
Kann PySpark CSV-Dateien mit unterschiedlichen Trennzeichen verarbeiten?
Ja. Du kannst ein benutzerdefiniertes Trennzeichen mit der Option delimiter in spark.read.csv()
festlegen. Verwende zum Beispiel delimiter="|" für durch Pipes getrennte Werte.
Wie lese ich mehrere CSV-Dateien gleichzeitig in einem Verzeichnis?
Du kannst einen Platzhalterpfad wie spark.read.csv("/data/*.csv", ...)
verwenden, um mehrere Dateien auf einmal in einen einzigen DataFrame zu laden.
Was bewirkt `mode="DROPMALFORMED"` beim Lesen von CSV-Dateien?
Sie weist Spark an, fehlerhafte Datensätze in der CSV-Datei zu überspringen und zu ignorieren, anstatt sie einzuschließen oder Fehler zu verursachen.
Kann PySpark komprimierte CSV-Dateien wie `.gz` oder `.bz2` lesen?
Ja. PySpark dekomprimiert und liest komprimierte CSV-Dateien automatisch und ohne zusätzliche Konfiguration, wenn du den richtigen Dateipfad angibst.
