Cours
PySpark est l'API Python pour Apache Spark, conçue pour le traitement de données parallèles et distribuées sur des clusters. Le choix de l'opération d'assemblage la mieux adaptée à vos besoins influe sur la vitesse de travail, la consommation de ressources et la réussite globale.
Travailler avec de vastes ensembles de données implique souvent d'intégrer des données provenant de plusieurs sources. Il est essentiel de combiner efficacement ces informations pour obtenir une analyse précise et des informations utiles. Les jointures PySpark offrent des moyens puissants de combiner des ensembles de données distincts sur la base de clés partagées.
Cependant, des jointures inefficaces peuvent avoir un impact important sur la vitesse et la fiabilité du traitement de vos données lorsque vous traitez de vastes ensembles de données contenant des millions, voire des milliards d'enregistrements sur des clusters distribués.
Avez-vous fait l'expérience de jointures qui échouent en cas d'analyse lente ou même de données à grande échelle ? Si c'est le cas, ce guide est fait pour vous. Il aidera les data scientists, les développeurs et les ingénieurs de niveau intermédiaire à acquérir la maîtrise et la confiance nécessaires pour réaliser des jointures PySpark.
Si vous cherchez des exercices pratiques pour vous familiariser avec PySpark, consultez notre parcours de compétences Le cursus Big Data avec PySpark.
Qu'est-ce que PySpark Joins ?
Comme vous le découvrirez dans notre Apprendre PySpark à partir de rien l'une des principales fonctionnalités de PySpark est la possibilité de fusionner de grands ensembles de données. Les opérations de jointure de PySpark sont essentielles pour combiner de grands ensembles de données basés sur des colonnes partagées, ce qui permet une intégration, une comparaison et une analyse efficaces des données à grande échelle.
Ils jouent un rôle essentiel dans le traitement des données volumineuses en aidant à découvrir les relations et à extraire des informations significatives à partir de sources de données distribuées.
PySpark propose plusieurs types de jointures, telles que les jointures internes, les jointures externes, les jointures gauche et droite, les jointures semi, et anti jointureschacune servant des objectifs analytiques différents.
Cependant, l'utilisation de jointures sur de grands ensembles de données peut poser des problèmes tels que des données asymétriques, des frais généraux de brassage et des contraintes de mémoire, ce qui rend crucial le choix de la bonne stratégie de jointure en termes de performances et de précision.
Principes de base des jonctions PySpark
Comprendre les opérations internes de PySpark est essentiel pour joindre efficacement de grands ensembles de données, comme nous le montrons dans notre cours Introduction à PySpark. Cette section présente le cadre conceptuel de l'exécution des jointures de PySpark et le rôle de son optimiseur.
Cadre conceptuel
PySpark effectue des jointures dans un environnement informatique distribué, ce qui signifie qu'il divise de grands ensembles de données en partitions plus petites et les traite en parallèle sur une grappe de machines. Lorsqu'unejointure est déclenchée, PySpark répartit le travail entre ces nœuds, en visant un calcul équilibré afin d'accélérer l'exécution et d'éviter les goulets d'étranglement.
Au cœur de ce processus se trouve Catalyst, le puissant moteur d'optimisation des requêtes de Spark. Catalyst détermine comment les opérations de jointure sont exécutées. Il décide des stratégies de partitionnement, détecte les cas où le brassage des données est nécessaire et applique des optimisations de performances telles que le pushdown des prédicats et les jointures de diffusion. Cette optimisation automatisée garantit une exécution efficace sans nécessiter de réglage manuel de la part du développeur.
Syntaxe de jonction de base
Pour effectuer des jointures dans PySpark, vous suivez une syntaxe simple qui consiste à spécifier le paramètre DataFrame, a condition de jointureet le type de jointure à exécuter.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("PySpark Joins Example").getOrCreate()
# Perform a join
joined_df = df1.join(df2, on="key", how="inner")
Dans cet exemple :
df1
etdf2
sont les DataFrames à joindre.df1.key
==df2.key
est la condition de jointure, qui détermine comment les lignes de chaque DataFrame doivent être mises en correspondance.inner' specifies the join type, which could also be
left,
right,,
outer,
semi, or
anti` en fonction du résultat souhaité.
Suivez notre tutoriel "Getting Started with PySpark " pour savoir comment installer PySpark et exécuter les jointures ci-dessus sans erreur.
Types de jointures et applications de PySpark
PySpark comprend différents types de jointures, chacun adapté à des tâches de données distinctes. Comprendre ces éléments vous permet de faire un choix judicieux.
Joints intérieurs
La jointure interne combine les lignes de deux DataFrame lorsque des clés correspondantes existent dans les deux. Les lignes sans clés correspondantes seront exclues. C'est le type de jointure le plus courant et par défaut dans PySpark.
Les scénarios d'utilisation typiques sont les suivants :
- Faire correspondre les commandes des clients aux profils des clients afin d'établir des rapports analytiques précis.
- Comparer précisément des ensembles de données provenant de sources différentes.
Joints extérieurs
PySpark prend en charge les jointures externes à gauche, à droite et complètes.
La jointure externe gauche renvoie toutes les lignes du DataFrame gauche et les lignes correspondantes du DataFrame droit. Lorsqu'aucune clé correspondante n'apparaît dans le DataFrame de droite, des valeurs nulles remplissent les colonnes de droite.
Cas d'utilisation :
- Trouver des clients qui n'ont pas fait d'achats.
- Identifier les informations manquantes dans les dossiers.
La jointure externe droite reflète la logique de la jointure gauche. Il renvoie toutes les lignes du DataFrame de droite et les lignes correspondantes du DataFrame de gauche, en remplissant les correspondances manquantes des colonnes de gauche par des zéros.
Exemple de scénario :
- Vérifier l'affectation des catégories de produits ou les erreurs d'étiquetage.
La jointure externe complète renvoie toutes les lignes disponibles des deux DataFrame, en remplissant les entrées non appariées par des zéros. Il permet de repérer les divergences ou les lacunes entre deux ensembles de données.
Joints spécialisés : Semi et anti joints
Les semi-joints et les anti-joints remplissent des fonctions spécialisées :
Une jointure partielle renvoie les lignes du DataFrame de gauche lorsque les clés correspondantes existent dans le DataFrame de droite. Il n'ajoute pas les colonnes du bon DataFrame et évite les doublons, ce qui le rend idéal pour les contrôles d'existence.
Exemple de cas d'utilisation :
- Vérification rapide de l'historique des connexions des utilisateurs sans détails supplémentaires.
Un anti Join renvoie les lignes d'un DataFrame qui n'ont pas de clé correspondante dans le second DataFrame.
Exemple de cas d'utilisation :
- Recherche d'enregistrements orphelins ou validation de l'intégrité des données.
Les jointures croisées créent des données en associant chaque ligne d'un DataFrame à chaque ligne d'un autre DataFrame. Sauf si vous le souhaitez expressément, évitez les jointures croisées, car elles élargissent considérablement les ensembles de données.
Utilisez-les avec précaution pour des scénarios spécifiques tels que
- Test de combinaisons de paramètres.
- Analyser toutes les combinaisons possibles dans un cadre expérimental.
Techniques d'optimisation pour des jointures PySpark efficaces
Les jonctions à grande échelle exigent une optimisation minutieuse. Considérez les techniques suivantes :
Stratégie d'adhésion à la radiodiffusion
Les jointures de diffusion améliorent les performances en répliquant des DataFrame plus petits sur tous les nœuds, ce qui élimine les mélanges coûteux.
Exemple de syntaxe :
from pyspark.sql.functions import broadcast
joined_df = large_df.join(broadcast(small_df), "id", "inner")
Tenez compte des limites de mémoire de votre système ; la diffusion de DataFrame trop volumineux ne fonctionnera pas efficacement.
Techniques d'atténuation des biais
L'asymétrie des données signifie que les données ne sont pas réparties uniformément entre les partitions, ce qui crée des goulets d'étranglement. Les techniques permettant de réduire l'asymétrie sont les suivantes :
- Salage des clés : attachement de préfixes aléatoires à des clés de jointure fortement asymétriques afin d'assurer une distribution uniforme.
- Partitionnement tenant compte de l'obliquité : Spark équilibre automatiquement les données entre les partitions, en gérant proactivement le skew dès le départ.
Stratégies de partitionnement
L'efficacité des jointures dans PySpark dépend fortement de la minimisation de la surcharge de mélange, qui se produit lorsque les données doivent être déplacées d'un nœud à l'autre pour satisfaire les conditions de jointure. L'un des moyens les plus efficaces de réduire cette surcharge consiste à aligner les partitions des DataFrames à joindre.
Meilleures pratiques pour le cloisonnement
Gardez à l'esprit ces bonnes pratiques :
- Pour minimiser les mouvements de données inutiles, partitionnez les deux DataFrame en utilisant la ou les mêmes colonnes que celles utilisées dans la condition de jointure.
- Évitez les colonnes asymétriques : N'effectuez pas de partitionnement sur des colonnes comportant quelques valeurs dominantes, car cela entraîne une répartition déséquilibrée de la charge de travail entre les nœuds.
- Utilisez
repartition()
oubroadcast()
à bon escient: Utilisezrepartition()
pour les grands ensembles de données où une distribution uniforme est possible, etbroadcast()
pour les tableaux beaucoup plus petits afin d'éviter complètement le brassage. - Contrôlez la taille et la distribution des données: La compréhension des caractéristiques sous-jacentes de vos données (par exemple, la cardinalité et la distribution de fréquence des clés de jointure) vous aide à choisir la stratégie de partitionnement la plus équilibrée.
Stratégies et cas d'utilisation avancés de PySpark Join
PySpark comprend des mécanismes spécialisés pour les cas les plus exigeants, permettant une efficacité encore plus grande :
Gamme jointe
Les jointures de plages sont utilisées pour faire correspondre des lignes sur la base de conditions numériques ou temporelles, plutôt que sur la base d'une égalité exacte. Ces jointures sont courantes dans les scénarios impliquant des données de séries temporelles, comme l'analyse de journaux, le cursus d'événements ou les flux de capteurs IoT.
Les meilleures pratiques pour une jonction efficace des rangs sont les suivantes :
- Minimiser la charge de travail liée au brassage: Un bon partitionnement est essentiel pour éviter les déplacements de données coûteux. Lorsque vous travaillez avec des jointures de plages, envisagez de partitionner les données sur la base de fenêtres temporelles ou de plages de valeurs pertinentes pour votre cas d'utilisation.
- Trier les données dans les partitions: Le tri des données en fonction de la clé d'intervalle (par exemple, l'horodatage) avant d'effectuer la jointure peut améliorer les performances, en particulier lors de l'utilisation d'opérations telles que
rangeBetween
oubetween
. - Évitez les balayages à grande échelle: Les plages de jointure doivent être aussi étroites que possible afin de réduire le nombre de comparaisons et d'améliorer le temps d'exécution.
- Exploiter les fonctions de seau ou de fenêtre: Elles permettent de réduire les opérations de mélange et d'assurer un meilleur alignement lorsque vous traitez de grands ensembles de données et des plages qui se chevauchent.
Voici quelques exemples de cas d'utilisation :
- Aligner les données des campagnes de marketing sur les interactions avec les clients à travers des fenêtres temporelles.
- Fusionner des données IoT ou de capteurs capturées à intervalles séquentiels pour la détection d'anomalies ou l'analyse de tendances.
La diffusion en continu rejoint
L'assemblage de données en continu présente des défis uniques en raison de leur nature continue et souvent imprévisible. Les principaux problèmes sont les événements tardifs, les données non ordonnées et la nécessité de gérer efficacement l'état au fil du temps.
PySpark relève ces défis en utilisant les filigranes, qui définissent un seuil pour la durée pendant laquelle le système doit attendre les données en retard avant de les rejeter. En plaçant un filigrane sur les colonnes d'événements, PySpark peut gérer efficacement les données tardives et optimiser l'utilisation des ressources.
Les filigranes sont particulièrement utiles lorsque vous joignez deux sources de flux ou un flux avec un ensemble de données statiques, ce qui permet de maintenir à la fois les performances et la cohérence dans les pipelines de données en temps réel.
Joints de fonctionnalités d'apprentissage automatique (ML)
Les jointures jouent un rôle crucial dans la préparation des données pour les modèles d'apprentissage automatique. construction de modèles avec Spark MLlib. La consolidation de ces caractéristiques dans un ensemble de données unique et unifié est essentielle pour une formation et une évaluation précises des modèles.
Les stratégies de jonction efficace des caractéristiques dans les systèmes en temps réel sont les suivantes :
- Utilisation de jointures de diffusion pour les tableaux de caractéristiques statiques de petite taille afin de minimiser le brassage et d'accélérer l'enrichissement des données en temps réel.
- Pré-agrégation des caractéristiques utiliser des fonctions de fenêtre ou des opérations de groupe avant la jonction pour réduire le volume de données en mouvement.
- Exploiter les filigranes et l'alignement sur l'heure de l'événement lors de l'intégration de flux en temps réel afin de garantir des mises à jour de fonctionnalités pertinentes et en temps voulu.
- Persistance des jointures intermédiaires pour les caractéristiques fréquemment réutilisées afin d'éviter les calculs redondants dans les tâches en aval.
En gérant efficacement ces jointures, PySpark permet une ingénierie des fonctionnalités en temps réel, en prenant en charge à la fois les systèmes d'apprentissage en ligne et les pipelines d'apprentissage par lots actualisés.
Vous pouvez apprendre à faire des prédictions à partir de données avec Apache Spark, en utilisant des arbres de décision, la régression logistique, la régression linéaire, des ensembles et des pipelines avec notre... Apprentissage automatique avec PySpark.
Meilleures pratiques et défis communs
Les opérations de jonction dans l'informatique distribuée requièrent une attention régulière afin d'éviter les problèmes potentiels.
Des opérations de jonction efficaces peuvent améliorer considérablement la vitesse globale du pipeline et l'utilisation des ressources. Voici les principales pratiques à suivre :
- Filtrage avant jonction: Supprimez les lignes non pertinentes et supprimez les colonnes inutilisées avant d'effectuer des jointures. Cela permet de réduire la quantité de données mélangées dans le cluster.
- Commande de jointure intelligente: Commencez toujours par joindre des tableaux plus petits ou diffusables afin de minimiser la taille des données intermédiaires et d'éviter les calculs inutiles.
- Gestion de la mémoire et des ressources: Surveillez l'exécution des tâches via l'interface utilisateur de Spark ou les journaux pour détecter les asymétries ou les goulets d'étranglement. Veillez à ce que les stratégies de partitionnement s'alignent sur les ressources de la grappe afin d'éviter de surcharger les nœuds individuels.
- Répartir judicieusement: Utilisez
repartition()
oucoalesce()
pour ajuster la taille des partitions afin d'obtenir un traitement parallèle plus équilibré.
Les anti-modèles à éviter
Même de petites erreurs dans la logique de jointure peuvent entraîner des problèmes majeurs en termes de performances ou de précision des données. Voici les pièges les plus courants à éviter :
- Conditions de jonction incorrectes ou ambiguës: Des conditions de jointure mal définies peuvent entraîner des correspondances incorrectes ou des lignes en double. Vérifiez toujours que vos clés de jointure sont exactes, uniques si nécessaire, et que le type de données correspond bien.
- Joints croisés involontaires: Les jointures croisées multiplient chaque ligne d'un DataFrame avec chaque ligne d'un autre DataFrame, ce qui entraîne souvent des explosions massives de données. Ne les utilisez que lorsque cela est explicitement requis et que vous avez une bonne compréhension de la taille de la sortie.
- Forcer les jointures de diffusion sur les DataFrames de grande taille: La diffusion de tableaux volumineux peut saturer la mémoire et ralentir les performances. Laissez PySpark décider, ou ne diffusez manuellement que lorsque le DataFrame est petit et tient confortablement en mémoire.
- Ignorer l'asymétrie des données: Si une ou plusieurs clés de jointure sont fortement asymétriques, quelques partitions peuvent être surchargées. Détectez l'asymétrie à un stade précoce et envisagez des techniques de salage ou des stratégies alternatives.
Même les jointures bien structurées peuvent rencontrer des problèmes de performance ou d'exécution.
Voici les problèmes les plus courants et la manière de les résoudre :
- Lenteur d'exécution: Souvent causée par une asymétrie des données ou imbalanced partitions. Vérifiez la présence de valeurs clés de jointure très répétitives et envisagez de repartitionner ou d'appliquer des techniques de salage pour répartir les données de manière plus homogène.
- Défauts de jonction ou erreurs inattendues: Généralement en raison de la non-concordance des types de données ou de valeurs nulles dans les clés de jointure. Assurez-vous que les deux parties de la jointure utilisent les mêmes types de données et traitez les données nulles de manière appropriée.
- Erreurs hors mémoire: Cela peut se produire lorsque de grands ensembles de données sont joints sans filtrage ou lorsque des jointures de diffusion sont forcées sur de grands tableaux. Filtrez tôt et ne diffusez que des DataFrames statiques et de petite taille.
Conclusion
Les jointures PySpark sont la pierre angulaire du traitement des données à grande échelle, permettant des analyses puissantes et efficaces à travers des ensembles de données massifs. Lorsqu'ils sont utilisés correctement, ils révèlent des relations significatives qui permettent d'obtenir des informations plus approfondies et de prendre des décisions plus judicieuses.
Bien que l'optimisation des performances puisse présenter des défis, tels que la gestion de l'asymétrie des données ou l'optimisation du partitionnement, la maîtrise de ces aspects est essentielle pour construire des pipelines de données robustes.
Qu'il s'agisse de joindre des flux en temps réel ou de consolider des fonctionnalités pour l'apprentissage automatique, l'affinement de vos stratégies de jointure peut considérablement améliorer la vitesse d'exécution, l'efficacité des ressources et la précision de l'analyse.
Pour aller plus loin, explorez le traitement des big data avec PySpark et devenez un expert en ingénierie des données grâce à nos Fondamentaux du Big Data avec PySpark le cursus de compétences.
PySpark rejoint la FAQ
Quel est le type de jointure le plus efficace à utiliser dans PySpark ?
Le type de jointure le plus efficace dépend de vos données et de vos objectifs. Les jointures internes sont rapides lorsque des données correspondantes existent de part et d'autre. Pour le traitement de tableaux de référence de petite taille, les jointures de diffusion sont souvent les plus performantes.
Quand dois-je utiliser les jointures de diffusion dans PySpark ?
Utilisez les jointures par diffusion lorsque vous joignez un DataFrame de grande taille à un autre beaucoup plus petit (généralement moins de 10 Mo à 100 Mo en fonction de votre cluster). La diffusion du tableau le plus petit permet d'éviter les mélanges coûteux.
Quelles sont les causes de l'asymétrie des données dans les jointures et comment y remédier ?
L'asymétrie des données se produit lorsque certaines clés de jointure ont des enregistrements disproportionnés, ce qui entraîne une surcharge de certaines partitions. Le salage, qui consiste à ajouter des valeurs aléatoires pour répartir les données de manière plus homogène, est une technique efficace pour atténuer ce problème.
Le repartitionnement est-il toujours nécessaire avant une jointure ?
Ce n'est pas toujours le cas, mais le repartitionnement par la clé de jointure peut réduire considérablement le brassage et améliorer les performances lorsque les deux DataFrame proviennent de sources différentes ou sont partitionnés différemment.
PySpark peut-il gérer des jointures de données en streaming en temps réel ?
Oui. L'API Structured Streaming de PySpark prend en charge les jointures en temps réel en utilisant des filigranes pour traiter les données arrivant tardivement et gérer l'état de manière efficace.
