Kurs
Hast du schon einmal versucht, einen Spark-Job zu debuggen, der plötzlich fehlgeschlagen ist, und festgestellt, dass du völlig verloren bist, weil der Spark-Kaninchenbau so tief geht?
Als ich zum ersten Mal mit Apache Spark gearbeitet habe, dachte ich, ich müsste nur ein paar PySpark-Transformationen schreiben und Spark würde auf "magische Weise" über den Cluster skalieren. Ich habe mich geirrt. Die Leistung von Spark hängt ganz davon ab, dass du verstehst, was hinter den Kulissen passiert.
Dieser Leitfaden ist für alle, die Spark nicht wie eine Blackbox behandeln wollen. Wir gehen die Architektur von Spark durch, vom Master-Worker-Modell und dem Ausführungsworkflow bis hin zur Speicherverwaltung und den Fehlertoleranzmechanismen.
Wenn du schnelle, fehlertolerante und effiziente Big-Data-Anwendungen erstellen willst, bist du hier genau richtig!
Grundlegende Architektur von Apache Spark
Bevor du deine erste Zeile in PySpark schreibst, hat Spark bereits einige Architekturentscheidungen für dich getroffen. Spark ist nicht nur wegen des In-Memory-Computings schnell, sondern auch, weil es auf einer Master-Worker-Architektur basiert, die skalierbar ist und Chaos wie Knotenabstürzehes, Probleme mit der Java Virtual Machine (JVM) und inkonsistente Datenmengen übersteht.
Schauen wir uns die Kernarchitektur von Spark an und warum es in modernen Big-Data-Workflows immer noch so leistungsstark und präsent ist.
Das Paradigma des Meisterarbeiters
Das Herzstück von Spark ist das Master-Worker-Modell . Stell dir das folgendermaßen vor:
- Treiber (Master): Das ist das Gehirn von Spark. Sie führt deine
main()
Funktion aus, erstellt den Spark-Kontext, kümmert sich um die DAG-Planung und sagt dem Cluster, was er tun soll. - Vollstrecker (Arbeiter): Das sind die Muskeln. Sie führen deine Aufgaben aus, halten Daten im Speicher und melden sich beim Fahrer zurück.
So kannst du dich auf die Definition der Transformationen konzentrieren, und Spark entscheidet, wo und wie sie parallel auf den Executors ausgeführt werden.
Was mir an diesem Design gefällt, ist, dass es einsatzunabhängig ist. Derselbe Code läuft, unabhängig davon, ob du ihn auf deinem lokalen Rechner, in Kubernetes oder Mesos einsetzt. Das macht es einfach, sie lokal zu entwickeln und zu testen und dann auf Cluster zu skalieren, ohne deinen Code neu zu schreiben.
Und hier zeigt sich ein weiterer großer Vorteil der Trennung von Fahrer und Arbeiter bei Spark: Sie verbessert die Fehlerisolierung. Wenn ein Worker-Knoten während der Ausführung einer Aufgabe stirbt, kann Spark diese Aufgabe einem anderen Worker zuweisen, ohne dass deine Anwendung abstürzt.
Kernkomponenten
Schauen wir uns an, was innerhalb des Treibers und der Knotenpunkte passiert.
Spark-Architektur. Bild vom Autor.
Spark-Kontext
Wenn du SparkContext()
aufrufst oder SparkSession.builder.getOrCreate()
verwendest, öffnest du das Tor zu allen internen Zaubereien von Spark.
Der Spark-Kontext:
- Verbindet sich mit deinem Clustermanager
- Weist Vollstrecker zu
- Behält den Lernpfad über den Auftragsstatus und die Ausführungspläne
Spark erstellt im Hintergrund einen gerichteten azyklischen Graphen (DAG) von Transformationen. Diese DAG wird in Phasen und Aufgaben unterteilt und dann parallel ausgeführt.
Der DAG-Scheduler findet heraus, welche Aufgaben zusammen ausgeführt werden können, und der Task-Scheduler weist sie den Ausführenden zu. In der Zwischenzeit sorgt der Blockmanager dafür, dass die Daten je nach Bedarf zwischengespeichert, umgeschichtet oder neu geladen werden.
Dieses mehrschichtige Design macht Spark unglaublich flexibel, da du Arbeitsspeicher, Speicher und Rechenleistung unabhängig voneinander feinabstimmen kannst.
Wenn du mit Spark-Transformationen oder Feature Engineering arbeitest, solltest du dir den Kurs Feature Engineering mit PySpark ansehen, um diese Architektur in Aktion zu erleben.
Executor-Laufzeit
Die Ausführenden sind der Ort, an dem die Arbeit erledigt wird.
Jeder Executor läuft:
- Eine oder mehrere Aufgaben (threaded)
- Ein Teil des Speichers für das Zwischenspeichern von Daten und das Mischen von Ausgaben
- Eigene JVM-Instanz, isoliert von anderen
Du kannst konfigurieren, wie viel Arbeitsspeicher jeder Executor bekommt, wie viele Kerne er verwendet und ob er auf die Festplatte schreiben soll, wenn der Speicher knapp wird.
Aber sei vorsichtig: Wenn du nicht genug Arbeitsspeicher zuweist, bekommst du ständig Out-of-Memory-Fehler. Allerdings solltest du auch vermeiden, zu viel Speicher zuzuweisen, da dies Ressourcen verschwendet. Überwachung und Optimierung sind hier unerlässlich.
Ausführungsworkflow: Vom Code zum Cluster
Das Schreiben von PySpark-Code fühlt sich ganz einfach an. Du filterst einen DataFrame, führst einen Join durch, aggregierst etwas und drückst auf "Run". Aber hinter dieser sauberen API entwickelt Spark im Stillen eine Ausführungsmaschine, die die Arbeit auf mehrere Knotenpunkte verteilen kann.
Schauen wir uns an, was hinter den Kulissen passiert.
Umwandlung von logischen in physische Pläne
Was die meisten Spark-Nutzer zunächst nicht wissen, ist Folgendes: Wenn du PySpark-Code schreibst, führst du nicht sofort etwas aus. Du erstellst einen Plan, und der Catalyst Optimizer von Spark wandelt diesen Plan in eine effiziente Ausführungsstrategie um.
Es funktioniert in vier Phasen:
- Analyse: Spark löst Spaltennamen, Datentypen und Tabellenreferenzen auf und stellt sicher, dass alles gültig ist.
- Logische Optimierung: Hier wendet Spark Regeln wie Predicate Pushdown und Constant Folding an. Sie optimiert die Filter und kombiniert die Projektionen.
- Physische Planung: Spark berücksichtigt mehrere Ausführungsstrategien und wählt die effizienteste aus (je nach Datengröße, Partitionierung usw.).
- Codegenerierung: Zum Schluss wird mit Hilfe von Whole-Stage Code Generation JVM Bytecode erzeugt.
Spark's Catalyst Optimizer. Bild von Databricks.
Die Kette aus .select()
, .join()
und .groupBy()
läuft also nicht einfach Zeile für Zeile ab. Die Daten werden analysiert, optimiert und zu etwas kompiliert, das auf einem Cluster schnell läuft.
Schau dir das PySpark Cheat Sheet an, wenn du einen Spickzettel mit den nützlichsten PySpark-Befehlen brauchst.
DAG Scheduler & Bühnenerstellung
Wenn der Plan fertig ist, übernimmt der DAG-Scheduler.
Es unterteilt den Job in Phasen, die auf Shuffle Boundaries basieren, wobei Spark entscheidet, was sequentiell und was parallel ausgeführt werden kann.
Es gibt zwei Haupttypen von Stufen:
- ShuffleMapStage: Dabei handelt es sich um einen Shuffle, der in der Regel durch breite Transformationen wie
groupBy()
oderjoin()
verursacht wird. Die Daten werden dann aufgeteilt und über das Netzwerk gesendet. Dieser Stufentyp wird benötigt, um die ResultStage zu berechnen. - ResultStage: Diese Stufen erzeugen Ausgaben, wie das Schreiben auf die Festplatte oder die Rückgabe von Ergebnissen an den Treiber.
Eine wichtige Sache, die ich gelernt habe, ist, so wenig wie möglich zu mischen. Ein Shuffle muss stattfinden, bevor eine Etappe beendet ist und ist teuer. Du musst verstehen, wo sie in deiner DAG vorkommen und ob du deinen Code weiter optimieren kannst, um die Anzahl der Shuffles zu reduzieren.
Lebenszyklus der Aufgabenausführung
Sobald der DAG-Scheduler alle Phasen erstellt hat, können sie auf den verschiedenen Executors ausgeführt werden.
Der Lebenszyklus der Aufgabenausführung sieht in etwa so aus:
- Serialisierung von Aufgaben: Der Treiber serialisiert die Aufgabenanweisungen und sendet sie an die Executors.
- Die Schreibphase mischen: Spark schreibt die partitionierte Ausgabe auf die lokale Festplatte.
- Abholphase: Die Executors auf der nächsten Stufe holen sich die relevanten Shuffle-Dateien von anderen im Cluster.
- Deserialisierung und Ausführung: Executors deserialisieren die Daten, führen deine Logik aus und können die Ergebnisse zwischenspeichern oder schreiben.
- Müllabfuhr: Die JVM gibt automatisch den Speicher zurück, der von Spark-Anwendungen nicht mehr genutzt wird. Dieser Schritt ist wichtig, um Speicherlecks zu vermeiden und sicherzustellen, dass Spark-Anwendungen reibungslos laufen.
Ein kleiner Tipp aus eigener Erfahrung: Wenn dein Spark-Job nicht mehr läuft, obwohl er vorher gut lief, liegt das oft an Verzögerungen bei der Garbage Collection oder beim Shuffle Fetch. Überprüfe immer deinen Code und stelle sicher, dass du die Architektur von Spark verstehst, damit du diese Themen effektiv optimieren kannst.
Architektur der Speicherverwaltung
Die Speicherverwaltung von Spark ist ein sehr komplexes Thema und kann dich stundenlanges Debugging kosten, wenn du es nicht verstehst.
Schauen wir uns also an, wie Spark den Speicher unter der Haube verwaltet, damit du das weißt und stundenlanges Debuggen von langsamem Code oder Out-of-Memory-Fehlern vermeiden kannst.
Vereinheitlichtes Speichermodell
Vor Spark 1.6 war der Speicher streng zwischen der Ausführung (für Shuffles und Joins) und der Speicherung (für das Caching) aufgeteilt. Das änderte sich mit Spark 1.6, als das Unified Memory Model eingeführt wurde.
Im einheitlichen Speichermodell werden die Daten in drei wichtige Pools aufgeteilt:
- Reservierter Speicher: Ein kleiner Teil des Speichers wird für Spark-Interna und das System verwendet.
- Spark-Speicher: Dieser wird für die Speicherung von Ausführungsdaten und für das Caching verwendet. Sie wird dynamisch geteilt. Wenn dein Job mehr Speicher für Shuffles und weniger für Caching benötigt (oder umgekehrt), passt sich Spark an.
- Benutzerspeicher: Platz für benutzerdefinierte Datenstrukturen, die für die Ausführung von Benutzercode in Spark-Anwendungen benötigt werden.
Der Spark-Speicherpool ist in zwei weitere Pools unterteilt:
- Executor-Speicher: Speichert temporäre Daten, die während der Verarbeitungsphasen benötigt werden (z. B. Shuffles, Joins, Aggregationen, ...).
- Speicherpool: Wird zum Zwischenspeichern von Daten und zum Speichern interner Datenstrukturen verwendet.
Dank dieser Elastizität kann Spark flexibler auf unvorhersehbare Datenmengen reagieren.
Das bedeutet aber auch, dass du ein bisschen die Kontrolle verlierst, wenn du nicht weißt, was vor sich geht. Wenn du zum Beispiel cache()
einen großen DataFrame hast, aber in der gleichen Phase auch teure Aggregationen durchführst, kann es sein, dass Spark deine zwischengespeicherten Daten verdrängt, um Platz für den Shuffle zu schaffen.
Off-Heap & spaltenförmige Speicherung
Bei der Off-Heap- und Columnar-Speicherung von Spark kommt die Tungsten-Engine ins Spiel.
Mit Tungsten wurden mehrere Optimierungen eingeführt, die die Leistung von Spark verbessern:
- Off-Heap-Speicherverwaltung: Spark speichert jetzt einige Daten außerhalb des JVM-Heaps, was den Aufwand für die Garbage Collection reduziert und die Speicherverwaltung vorhersehbarer macht.
- Speicherung im Binärformat: Die Daten werden in einer kompakten, cachefreundlichen Binärform gespeichert, was die CPU-Auslastung verbessert und eine vektorisierte Ausführung ermöglicht.
- Algorithmen, die den Cache nutzen: Spark kann jetzt die CPU-Caches effektiver nutzen und vermeidet so unnötige Lesevorgänge aus dem RAM oder von der Festplatte.
Und wenn du mit DataFrames arbeitest, nutzt du diese Optimierungen bereits unter der Haube. Das ist einer der Gründe, warum ich darauf dränge, DataFrames und SQL-APIs anstelle von rohen RDDs zu verwenden. Du bekommst die volle Leistung von Catalyst und Tungsten ohne zusätzliches Tuning.
Wenn du mit Datenbereinigungspipelines arbeitest, wirst du dies in Cleaning Data with PySpark in Aktion sehen.
Mechanismen zur Fehlertoleranz
Wenn du mit verteilten Systemen arbeitest, weißt du eines ganz genau: Sie scheitern. Knotenpunkte stürzen ab. Netzwerkfehler passieren. Die Executors haben keinen Speicher mehr und werden abgeschaltet.
Aber Spark ist dafür gemacht, mit diesen Problemen umzugehen und stellt sicher, dass deine Jobs trotzdem erfolgreich sind.
Wir wollen uns genauer ansehen, wie Spark sicherstellt, dass deine Jobs auch dann noch erfolgreich sind, wenn einige Instabilitäten auftreten.
RDD-Lernpfad
Resilient Distributed Datasets (RDDs) sind die grundlegende Datenstruktur in Spark. Und sie werden nicht umsonst als widerstandsfähig bezeichnet.
Spark verwendet Lineage, um sicherzustellen, dass jedes RDD im Falle eines Knotenausfalls und Datenverlusts neu berechnet werden kann.
Wenn also ein Knoten ausfällt, berechnet Spark die verlorenen Daten einfach anhand des Abstammungsgraphen neu.
So funktioniert es in der Praxis:
- Enge Abhängigkeiten (wie
map()
oderfilter()
): Spark braucht nur die verlorene Partition zur Neuberechnung. - Breite Abhängigkeiten (wie
groupBy()
oderjoin()
): Spark muss möglicherweise Daten über mehrere Partitionen hinweg abrufen, da es die Ausgabe mehrerer Stufen benötigt.
Lineage vermeidet die Notwendigkeit, Fehler manuell zu behandeln. Wenn dein Abstammungsdiagramm jedoch zu lang wird, da es Hunderte von Transformationen enthalten kann, wird die Neuberechnung der verlorenen Daten teuer. Hier kommt das Checkpointing ins Spiel.
Checkpointing und vorausschauendes Schreiben von Protokollen
Bei komplexen Workflows oder Streaming-Jobs kann sich Spark nicht allein auf die Abstammung verlassen. Hier kommt das Checkpointing ins Spiel.
Du kannst rdd.checkpoint()
aufrufen, um den aktuellen RDD-Zustand an einem zuverlässigen Speicherort (wie HDFS) zu persistieren.
Spark schneidet dann den Stammbaum ab. Wenn ein Fehler auftritt, lädt es die Daten direkt neu, anstatt sie neu zu berechnen.
Beim strukturierten Streaming verwendet Spark außerdem Write-Ahead-Logs (WALs), um sicherzustellen, dass die Daten während der Übertragung nicht verloren gehen.
Das macht sie so stabil:
- Zuverlässige Empfänger: Sie schreiben eingehende Daten vor der Verarbeitung in Protokolle.
- Das Herz des Vollstreckers schlägt: Diese regelmäßigen Signale bestätigen, dass die Vollstrecker lebendig und gesund sind.
- Checkpoint-Verzeichnisse: Bei Streaming-Jobs halten sie Offsets, Metadaten und den Ausgabestatus fest, damit du dort weitermachen kannst, wo du aufgehört hast.
Checkpointing ist optional für Stapelverarbeitungsaufträge, aber erforderlich für Streaming-Pipelines.
Angenommen, ein Spark-Job ist nach 10 Stunden Laufzeit fehlgeschlagen, aber du kannst dank Checkpointing und WALs dort weitermachen, wo du aufgehört hast.
Erweiterte architektonische Merkmale
Inzwischen hast du gesehen, wie Spark Jobs verarbeitet und mit Speicher und Fehlern umgeht.
In diesem Abschnitt gehen wir auf einige der fortschrittlichen Architektur-Upgrades ein, die Spark dynamischer, echtzeitfähiger und anpassungsfähiger machen.
Adaptive Abfrageausführung (AQE)
AQE wurde in Spark 3.0 eingeführt und verbessert die Abfrageleistung, indem es die Ausführungspläne zur Laufzeit auf der Grundlage der während der Ausführung gesammelten Statistiken dynamisch anpasst.
Zu den Merkmalen von AQE gehören:
- Wechsle dynamisch die Verknüpfungsstrategien: Wenn dein Broadcast-Join nicht in den Speicher passt, wechselt AQE zu einem Sort-Merge-Join.
- Zusammenführen von Shuffle-Partitionen: Führe kleine Shuffle-Partitionen zu größeren zusammen, um den Overhead zu reduzieren.
- Umgang mit schiefen Daten: AQE kann schiefe Partitionen aufteilen, um die Ausführungszeit auszugleichen.
Diese Funktion ist ein entscheidender Vorteil, denn sie ermöglicht die Anpassung von Aufgaben in Echtzeit, die bisher manuelles Abstimmen und Ausprobieren erforderten.
Achte nur darauf, dass du es explizit über die Konfiguration aktivierst (spark.sql.adaptive.enabled = true
). Und wenn du mit Spark 3.0+ arbeitest, gibt es keinen Grund, dies nicht zu tun.
Strukturierte Streaming-Architektur
Structured Streaming nutzt die Spark-Engine und erweitert sie auf den Echtzeitbereich, ohne dass du eine neue API lernen musst.
Hinter den Kulissen wird immer noch Micro-Batching angewendet. Aber es funktioniert:
- Offset Management: Spark verfolgt genau, welche Daten aus deiner Quelle (Kafka, Socket, Datei, etc.) gelesen wurden. Bei richtiger Konfiguration bietet dies eine starke Exact-once-Garantie.
- Watermarking: Bei zeitbasierten Aggregationen verwendet Spark Wasserzeichen, um zu entscheiden, wann späte Daten zu spät sind, um noch berücksichtigt zu werden. Dies ist entscheidend für die Verarbeitung von Ereignissen.
- Staatsgeschäfte: Wenn du Windowed Aggregations oder Streaming Joins durchführst, behält Spark den Status über Micro-Batches hinweg bei. Dieser Zustand wird auf der Festplatte gespeichert und mit einem Checkpoint versehen, um Datenverluste zu vermeiden.
Der Clou dabei ist, dass sich Streaming wie Batching anfühlt. Du schreibst eine groupBy()
oder eine filter()
und Spark kümmert sich um alles andere, so dass Streaming-Analysen ohne eine spezielle Toolchain möglich sind.
Sicherheitsarchitektur
Wenn du Spark in der Produktion einsetzt, insbesondere im Finanzwesen, im Gesundheitswesen oder in ähnlichen Geschäftsbereichen, musst du wissen, wie Spark mit Authentifizierung, Verschlüsselung und Auditierbarkeit umgeht.
Lass uns also tiefer in diese Themen eintauchen und herausfinden, wie Spark sich um sie kümmert.
Authentifizierung & Verschlüsselung
Spark hat viele Sicherheitsfunktionen, die du zuerst aktivieren musst. Aber sobald es aktiviert ist, bietet Spark ein solides Instrumentarium für sichere Kommunikation und Authentifizierung:
- Authentifizierung (SASL): Spark verwendet den Simple Authentication and Security Layer (SASL), um sicherzustellen, dass nur autorisierte Benutzer und Dienste Jobs einreichen oder sich mit dem Cluster verbinden können.
- Verschlüsselung bei der Übertragung (AES-GCM, SSL/TLS): Spark verschlüsselt die Kommunikation zwischen den Knotenpunkten mit AES-GCM (authentifizierte Verschlüsselung) oder TLS. Dies schützt die Auftragsdaten vor dem Ausspähen, was besonders in Multi-Tenant- oder Cloud-Umgebungen wichtig ist.
- Kerberos-Integration: Wenn du mit Hadoop/YARN arbeitest, ist Spark für eine sichere Benutzerauthentifizierung mit Kerberos integriert. Dadurch werden deine Spark-Jobs direkt mit den Identitäts- und Zugriffsmanagementsystemen deines Unternehmens verknüpft.
- UI-Zugangskontrolle: Die Spark-Web-UI kann sensible Informationen (wie Logs, Eingabepfade, SQL-Abfragen) preisgeben, daher solltest du
spark.acls.enable=true
undspark.ui.view.acls
undspark.ui.view.acls.groups
einstellen, um sie einzuschränken.
Du kannst alle Sicherheitsfunktionen in der offiziellen Dokumentation von Spark nachlesen. Sieh es dir an und stelle sicher, dass du die Funktionen aktivierst, die du für die Sicherheit deiner Spark-Anwendungen brauchst.
Prüfung & Einhaltung
Die Aufzeichnung, wer was wann getan hat, ist ebenfalls wichtig.
Spark unterstützt:
- Ereignisprotokollierung: Wenn diese Funktion aktiviert ist (
spark.eventLog.enabled=true
), speichert Spark jedes Job-, Stage- und Task-Ereignis auf der Festplatte. Du kannst diese Protokolle verwenden, um den Arbeitsverlauf wiederzugeben oder um Prüfungsanforderungen zu erfüllen. - Rollenbasierte Zugriffskontrolle (RBAC): Spark bietet kein RBAC, aber wenn du Spark über eine Plattform wie Databricks, EMR oder OpenShift verwendest, erhältst du normalerweise RBAC auf der Infrastrukturebene. Spark übermittelt Aufträge mit einer bestimmten Identität, die den Zugriff auf Daten und Rechenressourcen kontrolliert.
- Datenmaskierung und Zugriffskontrolle an der Quelle: Spark liest aus vielen Quellen(Parquet, Delta Lake, Hive, etc.), und deine Zugriffskontrolle sollte dort durchgesetzt werden.
Muster für die Leistungsoptimierung
Spark ist ziemlich leistungsstark und schnell und kann sogar noch schneller werden, wenn du weißt, wo du die nötigen Anpassungen vornehmen musst.
Es gibt mehrere Bereiche, in denen du versuchen kannst, Spark zu optimieren, um das Beste aus ihm herauszuholen. Lass uns also tiefer in jeden Bereich eintauchen.
Shuffle-Optimierung
Wenn Spark einen Schwachpunkt hat, dann ist es der Shuffle. Shuffles entstehen, wenn Daten zwischen Partitionen verschoben werden müssen, typischerweise nach umfangreichen Transformationen wie groupByKey()
, distinct()
oder join()
.
Und wenn ein Shuffles schief geht, kann es zu massiver Festplatten-E/A, langen Pausen bei der Garbage Collection oder verzerrten Aufgaben kommen, die nie beendet werden.
Hier erfährst du, wie du die Shuffles verbessern kannst:
- Ziehe
reduceByKey()
gegenübergroupByKey()
vor:reduceByKey()
aggregiert lokal, bevor es gemischt wird.groupByKey()
sendet alles über das Netzwerk. - Verteile sie geschickt: Verwende
.repartition(n)
, um die Parallelität zu erhöhen, oder.coalesce(n)
, um sie zu verringern. Überlasse es nicht der Standardpartitionierung von Spark. - Verwende Broadcast-Joins (mit Bedacht): Wenn ein Datensatz klein genug ist, verteile ihn an alle Arbeitnehmer/innen. Stelle
spark.sql.autoBroadcastJoinThreshold
ein, um die Größenbegrenzung zu kontrollieren. - Vermeide
collect()
: Vermeide es, wenn möglich, da das Ziehen von Daten zum Treiber die Leistung beeinträchtigt.
Richtlinien für die Speicherkonfiguration
Die Einstellung des Speichers von Spark kann eine Wissenschaft für sich sein, aber mit der folgenden Checkliste kannst du es dir leichter machen:
- Weisen Sie ausreichend Speicherplatz zu: Beginne mit mindestens 6 GB Arbeitsspeicher für den Spark-Cluster und passe ihn an deine spezifischen Bedürfnisse an.
- Betrachte den Spark-Speicheranteil: Standardmäßig liegt der Speicheranteil in Spark bei 60 %. Erhöhe ihn, wenn deine Anwendungen stark auf DataFrame/Dataset-Operationen angewiesen sind oder wenn du mehr Arbeitsspeicher benötigst.
- Verwende die richtige Anzahl von Kernen pro Executor: Normalerweise sind 3-5 optimal. Zu wenige führen zu einer Unterauslastung, während zu viele zu Aufgabenkonflikten führen.
- Aktiviere die dynamische Zuweisung (falls unterstützt): Spark kann Executors je nach Arbeitslast hoch- oder runterskalieren.
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=20
- Passe die Speicherfraktion an: Wenn du mehr Caching benötigst, erhöhe den Wert
spark.memory.storageFraction
. - Überwache die Speichernutzung und erstelle ein Profil: Nutze Tools wie die Spark UI oder VisualVM, um den Speicherverbrauch zu verfolgen und Engpässe zu erkennen.
Die Anpassung der Speicherkonfiguration kann eine große Hilfe sein. Ich habe einmal einen 30-Minuten-Job auf 8 Minuten reduziert, indem ich die Speicherkonfiguration angepasst habe, ohne eine einzige Zeile Code zu ändern.
Formeln für die Clustergröße
Das ist der Teil, den die meisten Teams falsch machen, weil sie die Größe des Clusters schätzen, anstatt sie richtig einzuschätzen.
Aber du kannst es besser machen, wenn du die folgenden Formeln verwendest:
- Bestimme die Anzahl der Partitionen:
- Berechne die Anzahl der benötigten Partitionen anhand der Größe deiner Daten und der gewünschten Partitionsgröße.
- Eine Standardrichtlinie besagt, dass eine Partition pro 128 MB bis 256 MB unkomprimierter Daten eingerichtet werden sollte.
- Formula: Anzahl der Partitionen = Aufrunden (Gesamtdatengröße ÷ Partitionsgröße).
- Berechne die Gesamtzahl der Kerne:
- Die Anzahl der benötigten Kerne sollte ausreichen, um alle Partitionen parallel zu verarbeiten.
- Formula: Kerne insgesamt = Aufrunden (Anzahl der Partitionen ÷ Partitionen pro Kern).
- Bestimme den Speicher pro Executor:
- Berechne die Menge an Speicher, die jeder Executor benötigt, basierend auf seinen Kernen, der Partitionsgröße und dem Overhead.
- Formula: Speicher pro Executor = Basisspeicher × (1 + Overhead-Prozentsatz).
- Berechne die Anzahl der Vollstrecker:
- Bestimme die Anzahl der Executors anhand der Gesamtzahl der Kerne und der Kerne pro Executor.
- Formula: Anzahl der Executors = Aufrunden (Cores insgesamt ÷ Cores pro Executor).
- Berechne den Gesamtspeicher:
- Berechne den benötigten Gesamtspeicher für den Cluster anhand der Anzahl der Executors und des Speichers pro Executor.
- Formula: Gesamtspeicher = Anzahl der Executors × Speicher pro Executor.
Zum Beispiel:
- Eingabe: 500 GB an Daten und eine Partitionsgröße von ~128 MB
- Partitionen: ~4.000 Partitionen
- Kerne: 4.000 Partitionen / 4 Partitionen pro Kern = 1.000
- Speicherplatz pro Executor: Gehe von 8 GB pro Executor und 20% Overhead aus. 8 GB * 1,20 = 9,6 GB
- Vollstrecker: 1.000 Kerne / 4 Kerne pro Executor = 250 Executors
- Gesamtspeicher: 250 Executors * 9,6 GB = 2.400 GB
Aber denk daran: Dies ist nur eine Schätzung. Du kannst sie als Ausgangspunkt verwenden und dann durch Profiling weiter optimieren.
Aufkommende architektonische Trends
Spark gibt es schon seit einem Jahrzehnt, aber es ist immer noch auf dem neuesten Stand. Sie entwickelt sich dank Cloud-nativer Plattformen, GPU-Beschleunigung und engerer ML-Integration schneller als je zuvor.
Wenn du Spark heute noch genauso einsetzt wie vor drei Jahren, lässt du wahrscheinlich die Leistung auf der Tabelle liegen und verpasst tolle neue Funktionen.
Werfen wir einen Blick auf einige der neuesten.
Photon-Engine (Databricks)
Wenn du mit Databricks arbeitest, hast du wahrscheinlich schon mit Photon gearbeitet und davon gehört.
Wenn du mehr über Databricks erfahren möchtest, empfehle ich dir den Kurs Einführung in Databricks.
Photon ist die Engine der nächsten Generation auf der Databricks Lakehouse-Plattform, die eine schnelle Abfrageleistung zu niedrigen Kosten bietet. Sie ist mit den Spark-APIs kompatibel, sodass du deinen Spark-Code nicht anpassen musst, um sie nutzen zu können.
Es hilft dir, deinen SQL- und PySpark-Code deutlich zu verbessern.
Photon umfasst die folgenden Funktionen:
- Vektorisierte Ausführung: Photon verarbeitet Daten in spaltenförmigen Stapeln und nutzt SIMD (Single Instruction, Multiple Data) CPU-Befehle, um Operationen auf mehreren Werten gleichzeitig durchzuführen. Das traditionelle Spark verwendet eine zeilenweise Ausführung und verlässt sich bei der Speicherzuweisung und Garbage Collection stark auf die JVM.
- C++-Laufzeit (kein JVM-Overhead): Keine Java Garbage Collection, die bei großen Spark-Jobs ein Engpass sein kann. In C++ wird der Speicher mit Präzision verwaltet.
- Verbesserte Abfrageoptimierungen: Photon ist eng mit dem Catalyst Optimizer von Spark verbunden, beinhaltet aber auch dessen Optimierungen während der Ausführung (wie Laufzeitfilterung, adaptive Codepfade, Join- und Aggregationsoptimierungen).
- Hardware-Beschleunigung: Unterstützung für moderne Hardware (wie NVIDIA-GPUs, AVX-512-Befehlssätze für Intel-CPUs, Graviton (ARM)-Prozessoren auf AWS).
Serverloses Spark
Serverless ist fantastisch, denn es bedeutet, dass du keine Cluster verwalten und keine Ressourcen im Voraus bereitstellen musst und dass du nur für die Zeit bezahlst, in der Spark läuft.
Und Serverless für Spark ist bereits in Diensten wie Databricks Serverless, AWS Glue und GCP Dataproc Serverless verfügbar.
Und hier ist der Grund, warum es so unglaublich ist:
- Automatische Skalierung: Die Plattform skaliert die Rechenleistung auf der Grundlage der tatsächlichen Anforderungen deines Auftrags, d.h. du musst nicht schätzen, wie viele Knoten du brauchst.
- Kosteneffizienz: Du zahlst nur für das, was du nutzt. Du musst nicht mehr für ungenutzte Server bezahlen.
- Einfachheit: Du musst dich nicht um die Einrichtung, Konfiguration oder Wartung von Clustern kümmern, da dies für dich erledigt wird.
- Leistung: Schnellere Ausführungszeiten sind möglich, da die Konfiguration und Einrichtung für dich optimiert sind.
Serverless Spark ist ideal für interaktive Analysen, Ad-hoc-Jobs oder unvorhersehbare Arbeitslasten.
Aber Vorsicht: Langlaufende, konsistente Pipelines können auf festen Clustern immer noch günstiger sein. Miss immer sowohl die Kosten als auch die Latenzzeit.
MLflow-Integration
Wenn du maschinelles Lernen in großem Maßstab betreibst und Modelle in die Produktion bringen willst, ist Spark allein nicht genug. Du brauchst MLOps-Prinzipien, wie z.B. Lernpfad, Modellversionierung und Reproduzierbarkeit. Hier kommt MLflow ins Spiel.
MLflow lässt sich jetzt in Spark integrieren und bietet einen kompletten MLOPs-Stack für deine Pipelines.
Das kannst du:
- Lernpfad-Experimente: Protokolliere Parameter, Metriken und Artefakte von Spark ML-Jobs mit
mlflow.log_param()
undmlflow.log_metric()
. - Version Modelle: Speichere Modelle von
pyspark.ml
odersklearn
direkt in die Modellregistrierung von MLflow. - Modelle servieren: Setze die trainierten Modelle mithilfe des MLflow Model Serving auf REST-Endpunkten ein.
Du brauchst das Werkzeug nicht zu wechseln. Du nutzt Spark weiterhin für Training, Feature Engineering und Scoring, während du MLflow für MLOPs-Aufgaben verwendest.
Fazit
Wenn du nicht viel über Spark weißt, ist es wie eine riesige Blackbox. Du schreibst etwas PySpark-Code, drückst auf "Ausführen" und hoffst, dass es funktioniert.
Manchmal hat das gut funktioniert, manchmal hat es zu langen Debugging-Sitzungen geführt, um herauszufinden, was falsch gelaufen ist.
Erst als ich anfing, hinter die Kulissen zu schauen, ergab alles einen Sinn für mich. Und es hat eine ganze Weile gedauert, bis ich verstanden habe, was hier los ist.
Hier ist, worauf ich mich konzentrieren würde, wenn ich noch einmal von vorne anfangen würde:
- Lerne, wie Spark deinen Code in Jobs, Stages und Tasks aufteilt.
- Verstehe das Gedächtnis.
- Achte auf Shuffles.
- Fang klein an und lass die Dinge im lokalen Modus laufen. Mach dir die Hände schmutzig.
Das ist genau das, was wir in diesem Artikel gelernt haben.
Wenn du weiter lernen willst, empfehle ich dir hier ein paar anfängerfreundliche Ressourcen:
- Einführung in PySpark: Ein toller praktischer Ausgangspunkt, wenn du dich noch nicht ganz sicher fühlst.
- Daten bereinigen mit PySpark: Lerne, Daten zu bereinigen, denn Daten sind in der realen Welt immer unordentlich.
- Die 20 wichtigsten Spark-Interview-Fragen: Das ist nicht nur für Vorstellungsgespräche, sondern um dein Verständnis zu vertiefen.
- Die 4 wichtigsten Apache Spark-Zertifizierungen im Jahr 2025: Falls du deine Fähigkeiten durch Zertifizierungen anerkennen lassen willst.
PySpark von Grund auf lernen
FAQs
Wie wähle ich den richtigen Clustermanager für meinen Spark-Einsatz?
Spark unterstützt verschiedene Clustermanager (YARN, Mesos, Kubernetes und Standalone). Deine Entscheidung hängt von der vorhandenen Infrastruktur, dem Bedarf an gemeinsamer Nutzung von Ressourcen und dem betrieblichen Know-how ab: YARN lässt sich gut in Hadoop-Cluster integrieren, Kubernetes bietet Portabilität in Containern und Mesos zeichnet sich durch eine mandantenfähige Isolierung aus.
Was ist der externe Zuführungsdienst und wie verbessert er die Leistung?
Der externe Shuffle-Dienst entkoppelt die Bereitstellung von Shuffle-Dateien von den Lebenszyklen der Executors und ermöglicht so eine dynamische Zuweisung und reduziert den Datenverlust bei der Evakuierung der Executors. Die Shuffle-Dateien bleiben auch nach dem Herunterfahren der Executors verfügbar, was die Wiederholungen der Stages beschleunigt und die Festplatten-E/A bei hoher Last schont.
Wie funktionieren Broadcast Joins intern und wann sollte ich sie verwenden?
Bei Broadcast-Joins sendet Spark eine kleine Tabelle an jeden Executor, um zu vermeiden, dass die Daten komplett durcheinander gewürfelt werden. Verwende sie, wenn eine Seite des Joins unterhalb der spark.sql.autoBroadcastJoinThreshold
(Standardwert 10 MB) liegt, da sie die Netzwerk-E/A drastisch reduzieren und die Joins bei schiefen Schlüsselverteilungen beschleunigen.
Was sind die besten Methoden zur Optimierung der JVM-Garbage Collection in Spark?
Überwache GC-Pausen über die Spark-Benutzeroberfläche oder Tools wie VisualVM und bevorzuge den G1GC-Kollektor wegen seiner geringen Pausenzeiten. Weise dem Executor Speicher mit Spielraum für den Overhead zu (spark.executor.memoryOverhead
) und stimme -XX:InitiatingHeapOccupancyPercent
so ab, dass GC früher ausgelöst wird, um lange Stop-the-World-Pausen zu vermeiden.
Wie kann ich die GPU-Beschleunigung nutzen, um Spark-Jobs zu beschleunigen?
Nutze den NVIDIA RAPIDS Accelerator für Apache Spark, um SQL- und DataFrame-Operationen transparent auf GPUs zu verlagern. Sie fügt sich in die Ausführungsengine von Spark ein, ersetzt CPU-basierte Operatoren durch GPU-beschleunigte Äquivalente und bietet eine bis zu 10-fach schnellere Verarbeitung für geeignete Workloads.
Was ist der Unterschied zwischen statischer und dynamischer Ressourcenzuweisung in Spark?
Die statische Zuteilung legt die Anzahl der Executors für die gesamte Lebensdauer des Jobs fest und bietet damit Vorhersehbarkeit auf Kosten potenziell ungenutzter Ressourcen. Mit der dynamischen Zuweisung kann Spark je nach anstehenden Aufgaben und Arbeitslast Executors anfordern oder freigeben und so die Clusterauslastung bei schwankenden Aufgaben verbessern - ideal für gemeinsam genutzte Umgebungen.
Wie sollte ich Spark für eine optimale Leistung auf Cloud-Speichersystemen wie S3 konfigurieren?
Aktiviere die S3-Übertragungsbeschleunigung, stimme spark.hadoop.fs.s3a.connection.maximum
ab und verwende die konsistente Ansicht (S3A v2), um eventuelle Konsistenz zu gewährleisten. Führe kleine Dateien vor dem Schreiben zusammen und berücksichtige die S3A-Committer, um den Overhead bei der Listenbearbeitung zu reduzieren und den Schreibdurchsatz zu verbessern.
Wie kann ich die Spark-Kommunikation mit Kerberos und TLS sichern?
Aktiviere TLS für RPC (spark.ssl.enabled
) und konfiguriere SASL/Kerberos (spark.authenticate and spark.kerberos.keytab
), um eine gegenseitige Authentifizierung zu erzwingen. Speichere deine Zugangsdaten in einer sicheren Keytab, auf die du über HDFS zugreifen kannst, und schränke den Zugriff auf die Spark-Benutzeroberfläche über ACL-Einstellungen ein, um zu verhindern, dass Unbefugte Daten einsehen können.
Was sind Pandas UDFs und wann sind sie effizienter als normale UDFs?
Pandas UDFs (vektorisierte UDFs) nutzen Apache Arrow für den Batch-Austausch von Daten zwischen der JVM und Python, wodurch der Serialisierungs-Overhead drastisch reduziert wird. Sie sind den traditionellen zeilenweisen UDFs bei komplexen numerischen Operationen überlegen, insbesondere bei der Verarbeitung großer spaltenförmiger Stapel in PySpark.
Welche Vorteile bietet die DataSource V2 API gegenüber V1 für benutzerdefinierte Datenquellen?
DataSource V2 bietet eine übersichtlichere, modularere Oberfläche, die Push-Down-Filter, Partition Pruning und Streaming-Quellen nativ unterstützt. Sie ermöglicht eine feinkörnige Lese-/Schreibkontrolle und eine bessere Integration mit dem Catalyst-Optimierer von Spark, was zu einer höheren Leistung und leichteren Wartbarkeit für maßgeschneiderte Konnektoren führt.
Ich bin ein Cloud-Ingenieur mit fundierten Kenntnissen in den Bereichen Elektrotechnik, maschinelles Lernen und Programmierung. Meine Karriere begann im Bereich Computer Vision mit dem Schwerpunkt Bildklassifizierung, bevor ich zu MLOps und DataOps wechselte. Ich bin spezialisiert auf den Aufbau von MLOps-Plattformen, die Unterstützung von Data Scientists und die Bereitstellung von Kubernetes-basierten Lösungen zur Optimierung von Machine Learning-Workflows.