Cours
Les fonctions définies par l'utilisateur (User-Defined Functions, UDF) dans Python offrent aux développeurs Python un moyen puissant de gérer des tâches uniques que les fonctions intégrées de Spark ne peuvent tout simplement pas gérer. Si vous êtes un ingénieur de données, un analyste ou un scientifique maîtrisant Python, la compréhension des concepts UDF peut vous permettre de relever efficacement des défis complexes liés aux données du monde réel.
Ce tutoriel vous guide à travers les concepts PySpark UDF, les implémentations pratiques et les meilleures pratiques en matière d'optimisation, de test, de débogage et de modèles d'utilisation avancés. À la fin, vous serez en mesure d'écrire, d'optimiser et de déployer en toute confiance des UDF efficaces à grande échelle.
Si vous ne connaissez pas PySpark, je vous recommande d'abord de consulter notre tutoriel Tutoriel Premiers pas avec PySpark car nous y abordons des concepts avancés de Spark.
Que sont les UDF PySpark ?
Les UDF PySpark sont des fonctions Python personnalisées intégrées au cadre distribué de Spark pour opérer sur les données stockées dans les DataFrames Spark. Contrairement aux fonctions Spark intégrées, les UDF permettent aux développeurs d'appliquer une logique complexe et personnalisée au niveau de la ligne ou de la colonne.
Notre PySpark Cheat Sheet couvre tout ce que vous devez savoir sur les DataFrame Spark, ce qui vous permet de comprendre encore plus facilement les UDF Spark.
Quand devriez-vous utiliser les UDF de PySpark ?
Utilisez les UDF lorsque:
- Vous avez besoin d'une logique qui ne peut pas être exprimée à l'aide des fonctions intégrées de Spark.
- Votre transformation implique des opérations complexes natives de Python (par exemple, des manipulations de regex, une logique NLP personnalisée).
- Vous êtes prêt à sacrifier les performances au profit de la flexibilité, en particulier lors du prototypage ou pour des ensembles de données de petite ou moyenne taille.
Évitez les UDF lorsque:
- Des fonctionnalités équivalentes existent sur
pyspark.sql.functions
, les fonctions natives de Spark sont plus rapides, optimisées et peuvent être poussées vers le bas du moteur d'exécution. - Vous travaillez avec de grands ensembles de données pour lesquels la performance est essentielle. Les UDF introduisent une surcharge de sérialisation et rompent la capacité de Spark à optimiser les plans d'exécution.
- Vous pouvez exprimer votre logique à l'aide d'expressions SQL, de modules intégrés Spark SQL ou d'UDF Pandas (pour les opérations vectorisées).
Applications stratégiques de l'ingénierie des données
Voici les principaux cas d'utilisation des UDF de PySpark :
- Transformations complexes de données, telles que l'analyse avancée de texte, l'extraction de données ou la manipulation de chaînes de caractères.
- Intégration avec des bibliothèques Python tierces, notamment des frameworks d'apprentissage automatique populaires comme TensorFlow et XGBoost.
- Faire le lien entre les systèmes existants et prendre en charge l'évolution transparente des schémas au fur et à mesure que les structures de données changent.
Les UDF simplifient les tâches d'ingénierie des données dans le monde réel, en permettant aux équipes de gérer des exigences diverses de manière flexible et efficace.
Découvrons maintenant comment vous pouvez mettre en œuvre les UDF de PySpark.
Implémentation des UDF PySpark
Cette section explique comment définir et mettre en œuvre de manière pratique les UDF PySpark.
Méthodes standard de déclaration des UDF
Il existe trois approches courantes pour déclarer les UDF dans PySpark :
1. UDF basés sur Lambda : Rapide à définir directement dans les requêtes DataFrame ; idéal pour les opérations simples.
L'UDF basé sur Lambda (UDF de base en Python) est le meilleur pour les transformations rapides et simples. Évitez-les pour les emplois à grande échelle où les performances sont importantes.
En voici un exemple :
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
uppercase = udf(lambda x: x.upper() if x else None, StringType())
df = spark.createDataFrame([("Ada",), (None,)], ["name"])
df.withColumn("upper_name", uppercase("name")).show()
2. Fonctions Python décorées: Annoté explicitement à l'aide de @pyspark.sql.functions.udf
, ce qui favorise la réutilisation et la lisibilité.
Par exemple :
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
@F.udf(returnType=IntegerType())
def str_length(s):
return len(s) if s else 0
df.withColumn("name_length", str_length("name")).show()
3. UDF enregistrés dans SQL: Enregistrés directement dans les contextes SQL de Spark, ce qui permet de les utiliser dans des requêtes SQL.
from pyspark.sql.types import StringType
def reverse_string(s):
return s[::-1] if s else ""
spark.udf.register("reverse_udf", reverse_string, StringType())
df.createOrReplaceTempView("people")
spark.sql("""SELECT name, reverse_udf(name) AS reversed FROM people""").show()
Chaque méthode comporte des compromis : les UDF lambda sont concises mais limitées, tandis que les annotations de fonctions favorisent la lisibilité, la maintenabilité et les meilleures pratiques.
Les UDF Pandas permettent d'effectuer des opérations vectorisées sur les lots Arrow. Ils sont souvent plus rapides que les UDF classiques et s'intègrent mieux au moteur d'exécution de Spark.
Scalar Pandas UDF (par élément, comme map)
Ils conviennent mieux aux transformations rapides, par ligne, sur de grands ensembles de données. Par exemple :
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd
@pandas_udf(IntegerType())
def pandas_strlen(s: pd.Series) -> pd.Series:
return s.str.len()
df.withColumn("name_len", pandas_strlen("name")).show()
Carte groupée Pandas UDF
Il est préférable de les utiliser pour la logique personnalisée par groupe, comme dans le cas de groupby().apply()
dans Pandas.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
schema = StructType([
StructField("group", StringType()),
StructField("avg_val", DoubleType())
])
@pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def group_avg(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({
"group": [pdf["group"].iloc[0]],
"avg_val": [pdf["value"].mean()]
})
df.groupBy("group").apply(group_avg).show()
Pandas Aggregate UDF
Celui-ci effectue des agrégations sur les groupes, plus rapidement que les cartes groupées. Par exemple :
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
@pandas_udf(DoubleType(), functionType="grouped_agg")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.groupBy("category").agg(mean_udf("value").alias("mean_value")).show()
Pandas Iterator UDF
L'UDF Pandas Iterator est idéal pour les grands ensembles de données nécessitant un traitement en mémoire réduite (par lots). Par exemple :
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from typing import Iterator
import pandas as pd
@pandas_udf(IntegerType(), functionType="iterator")
def batch_sum(it: Iterator[pd.Series]) -> Iterator[pd.Series]:
for batch in it:
yield batch + 1
df.withColumn("incremented", batch_sum("id")).show()
Manipulation des types et sécurité nulle
Les types et les valeurs nulles constituent des défis fréquents pour les UDF de PySpark. PySpark applique une vérification stricte des types, ce qui entraîne souvent des conversions de types implicites ou des problèmes d'exécution. De plus, Spark transmet les valeurs nulles directement aux UDF, ce qui crée des risques de plantage si elles ne sont pas gérées de manière explicite.
Ces stratégies permettent d'assurer la solidité des UDF :
- Spécifiez explicitement les types de retour.
- Incorporez des contrôles de nullité (par exemple, des instructions conditionnelles) dans vos fonctions Python.
- Adoptez des pratiques de codage défensives ; de simples contrôles de nullité permettent d'éviter des exceptions frustrantes au moment de l'exécution.
Optimiser les performances de l'UDF
La performance est souvent le talon d'Achille des UDF standard en raison de leur modèle d'exécution ligne par ligne. Tirer parti des UDF vectorisés et des outils d'optimisation de Spark permettra d'améliorer considérablement les temps d'exécution.
UDF vectorisés avec intégration de Pandas
Les UDF Pandas introduisent une approche vectorielle des UDF dans PySpark en transmettant des lots de données en tant que séries Pandas aux fonctions Python. Cette conception améliore considérablement les performances en réduisant la surcharge de sérialisation par rapport aux UDF standard basés sur les lignes.
Soutenus par Apache Arrow pour un transfert de données sans copie entre la JVM et les processus Python, les UDF Pandas permettent une exécution efficace des opérations à l'échelle. Ils sont particulièrement efficaces pour les calculs intensifs et les manipulations de chaînes complexes portant sur des millions d'enregistrements.
Nous couvrons plus de détails sur la manipulation de données avec PySpark dans notre cours Nettoyage de données avec PySpark.
En outre, les UDF Pandas permettent une intégration transparente avec l'écosystème plus large de la science des données de Python, en tirant parti d'outils et de flux de travail familiers.
Type d'UDF |
Style d'exécution |
Vitesse |
Optimisations de Spark |
Meilleur pour |
Notes |
Standard UDF |
Ligne par ligne (Python) |
Lenteur |
Non optimisé |
Logique simple, petits ensembles de données |
Facile à écrire, mais coûteux |
Pandas Scalar UDF |
Vectorisé (en colonnes) |
Rapide |
Flèche d'arrêt |
Opérations numériques, transformations de chaînes de caractères |
Utiliser des UDF standard dans la mesure du possible |
Pandas Grouped Map UDF |
Par groupe (DataFrame Pandas) |
Moyennement rapide |
Flèche d'arrêt |
Transformations de groupe |
Le schéma de sortie doit être défini manuellement |
Pandas Aggregate UDF |
Par groupe (entrée série → sortie scalaire) |
Rapide |
Optimisé |
Agrégations comme la moyenne, la somme |
Plus simple que la carte groupée |
Pandas Iterator UDF |
Itérateur de lot (streaming) |
Rapide |
Optimisé |
Traiter des lots importants en toute sécurité |
Une empreinte mémoire plus faible |
Techniques d'optimisation des flèches
Le format de mémoire en colonnes d'Apache Arrow permet un transfert de données efficace sans copie entre la JVM Spark et les processus Python. En activant Arrow (spark.sql.execution.arrow.pyspark.enabled=true
) au sein de vos configurations Spark, les données se déplacent rapidement entre les environnements JVM et Python, ce qui accélère considérablement l'exécution des UDF.
Optimisation du plan d'exécution
L'optimisation des jobs PySpark implique de comprendre comment influencer l'optimiseur Catalyst de Spark. Les stratégies avancées comprennent des techniques telles que le pushdown des prédicats, l'élagage des colonnes et l'utilisation d'indices de jointure de diffusion pour améliorer la planification des requêtes et l'efficacité de l'exécution.
Pour maximiser les performances, il est important de minimiser la portée de l'exécution des UDF et de favoriser les fonctions SQL intégrées de Spark dans la mesure du possible. L'utilisation stratégique de la mise en cache et l'élaboration minutieuse de plans peuvent encore améliorer la vitesse d'exécution des tâches et l'utilisation des ressources.
L'optimisation des performances est l'une des principales questions que vous pouvez rencontrer lors d'un entretien avec PySpark. Découvrez comment répondre à cette question et à d'autres questions sur Spark dans notre Top 36 PySpark Interview Questions and Answers for 2025 article de blog.
Modèles et anti-modèles avancés
Comprendre les modes d'utilisation corrects et incorrects permet de garantir des déploiements d'UDF stables et efficaces.
Implémentations d'UDF avec état
Les UDF avec état et non déterministes présentent des défis uniques dans PySpark. Ces fonctions produisent des résultats qui dépendent d'un état externe ou de conditions changeantes, telles que des variables d'environnement, l'heure du système ou le contexte de la session.
Si les UDF non déterministes sont parfois nécessaires - par exemple, pour générer des horodatages, suivre les sessions des utilisateurs ou introduire un caractère aléatoire - ils peuvent compliquer le débogage, la reproductibilité et l'optimisation.
La mise en œuvre d'UDF avec état nécessite des modèles de conception minutieux : documenter clairement le comportement, isoler les effets secondaires et ajouter une journalisation complète pour faciliter le dépannage et assurer la cohérence entre les exécutions de tâches.
Lorsqu'ils sont utilisés de manière réfléchie, ils peuvent débloquer des capacités puissantes, mais le maintien de pipelines de données fiables nécessite une gestion disciplinée. Notre cours Fondamentaux du Big Data avec PySpark va plus en détail sur la façon de gérer les big data dans PySpark.
Antimodèles courants
Les anti-modèles courants dans l'utilisation des UDF peuvent dégrader de manière significative les performances de PySpark :
- Traitement par rangée au lieu du traitement par lots: L'application d'UDF à des lignes individuelles plutôt que l'utilisation d'approches vectorielles comme les UDF Pandas entraîne des ralentissements d'exécution importants.
- Opérations DataFrame imbriquées dans les UDF: L'intégration de requêtes DataFrame dans des UDF entraîne des calculs excessifs et entrave la capacité de Spark à optimiser les plans d'exécution.
- Enregistrement répété d'UDF en ligne: Définir et enregistrer les UDF plusieurs fois dans les requêtes ajoute une surcharge inutile ; il est préférable de déclarer les UDF une seule fois et de les réutiliser dans tous les travaux.
- Surutilisation de la logique Python personnalisée pour des opérations simples: Les tâches telles que le filtrage de base, l'arithmétique ou les transformations directes devraient favoriser les fonctions intégrées hautement optimisées de Spark plutôt que les UDF personnalisées.
Éviter ces pièges garantit de meilleures performances, une optimisation plus facile par Catalyst et un code PySpark plus facile à maintenir.
Débogage et test des UDF PySpark
Le test et le débogage des UDF garantissent leur fiabilité et leur robustesse dans les scénarios de production.
Modèles de gestion des exceptions
L'implémentation d'une capture d'erreur structurée dans les UDF est essentielle pour construire des pipelines PySpark résistants et faciles à maintenir. Utilisez des blocs try-except à l'intérieur des UDF pour gérer de manière élégante les surprises d'exécution, telles que les valeurs nulles, les erreurs de type ou les erreurs de division par zéro.
La gestion robuste des exceptions stabilise les pipelines face aux données inattendues et simplifie le débogage en faisant apparaître des messages d'erreur clairs et exploitables. Des exceptions correctement capturées et enregistrées rendent le comportement de l'UDF plus transparent, ce qui accélère la résolution des problèmes et améliore la fiabilité globale du pipeline.
Cadres de tests unitaires
Utilisez la classe de base de test intégrée à PySpark, pyspark.testing.utils.ReusedPySparkTestCase
, ainsi que des frameworks comme pytest
, pour écrire des tests unitaires fiables pour vos UDF. La structuration de tests clairs et ciblés garantit l'exactitude, la stabilité et la maintenabilité de votre logique UDF au fil du temps.
Les meilleures pratiques pour tester les UDF consistent à couvrir à la fois les cas typiques et les cas extrêmes, à valider les résultats par rapport à des résultats connus et à isoler le comportement de l'UDF des dépendances externes. Des tests bien conçus protègent non seulement contre les régressions, mais simplifient également les efforts de développement et de remaniement futurs.
Évolution et orientations futures
L'écosystème PySpark continue d'évoluer rapidement, introduisant de nouvelles capacités qui améliorent encore les UDF.
Intégration du catalogue Unity
Des développements récents ont intégré l'enregistrement de l'UDF dans Unity Catalogsimplifiant ainsi la gestion, la découverte et la gouvernance des UDF à grande échelle. Unity Catalog permet un contrôle centralisé de la gestion du cycle de vie des UDF, y compris l'enregistrement, la gestion des versions et le contrôle d'accès, autant d'éléments essentiels pour les environnements d'entreprise.
Cette intégration renforce la gouvernance, applique des politiques de sécurité cohérentes et améliore la découvrabilité au sein des équipes, ce qui facilite la réutilisation, l'audit et la gestion des UDF au sein d'écosystèmes de données vastes et complexes.
UDF accélérés par le GPU
Des frameworks tels que RAPIDS Accelerator permettent de décharger le GPU pour les tâches UDF à forte intensité de calcul dans PySpark, offrant ainsi des améliorations de performance transformatrices. En transférant les opérations lourdes, telles que l'analyse numérique, l'inférence d'apprentissage profond et la modélisation de données à grande échelle, vers les GPU, RAPIDS peut réduire les temps d'exécution de plusieurs heures à quelques minutes dans les charges de travail appropriées.
L'accélération GPU est particulièrement bénéfique pour les scénarios impliquant des ensembles de données massifs, des calculs vectoriels complexes et des pipelines d'apprentissage automatique, augmentant considérablement les performances et l'évolutivité de PySpark pour les tâches modernes d'ingénierie des données. Notre cours Apprentissage automatique avec PySpark approfondit ces concepts.
Conclusion
Les UDF PySpark sont un outil puissant pour étendre les capacités de Spark, permettant aux équipes de s'attaquer à des tâches de traitement de données complexes et personnalisées qui vont au-delà des fonctions intégrées. Lorsqu'ils sont appliqués correctement, ils permettent de débloquer la flexibilité et l'innovation dans les pipelines de données à grande échelle.
Cependant, l'optimisation des performances des UDF nécessite une attention particulière, en évitant les pièges courants tels que les opérations sur les lignes, en gérant les exceptions avec élégance et en exploitant des techniques telles que la vectorisation UDF de Pandas avec l'intégration d'Arrow.
De nouvelles avancées, telles que l'accélération par le GPU grâce à des frameworks comme RAPIDS, élargissent encore les possibilités offertes par les flux de travail pilotés par UDF. Que vous transformiez des données désordonnées du monde réel ou que vous intégriez des analyses avancées dans des systèmes de production, la maîtrise des meilleures pratiques UDF est essentielle pour créer des pipelines de données rapides, efficaces et fiables.
Apprenez les détails concrets sur lesquels les data scientists passent 70-80% de leur temps, le data wrangling et le feature engineering, grâce à notre cours Ingénierie des fonctionnalités avec PySpark.
PySpark UDF FAQs
Quand dois-je utiliser une UDF PySpark au lieu d'une fonction intégrée ?
Vous ne devez utiliser un UDF PySpark que lorsque votre transformation ne peut pas être réalisée à l'aide des fonctions intégrées de Spark. Les fonctions intégrées sont optimisées et s'exécutent plus rapidement que les UDF car elles fonctionnent nativement dans la JVM sans frais de sérialisation.
Pourquoi les UDF Pandas sont-ils plus rapides que les UDF Python classiques dans PySpark ?
Les UDF Pandas (UDF vectorisés) sont Ils sont plus rapides car ils utilisent Apache Arrow pour une sérialisation efficace des données et traitent les données par lots plutôt que ligne par ligne, ce qui réduit la surcharge liée au déplacement des données entre la JVM et l'interpréteur Python.
Dois-je toujours spécifier un type de retour pour un UDF dans PySpark ?
Oui, PySpark exige un type de données de retour explicite lors de la définition des UDF. Cette exigence garantit une sérialisation correcte entre Java et Python et permet d'éviter les erreurs d'exécution.
Comment activer Apache Arrow dans mon application PySpark ?
Vous pouvez activer Apache Arrow en définissant la configuration suivante avant d'exécuter des UDF :
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Quelle est la meilleure façon de gérer les valeurs nulles dans un UDF PySpark ?
Pour éviter les exceptions, prévoyez toujours une vérification conditionnelle des valeurs None (null) dans votre UDF. Par exemple : si le nom du produit est None : retournez None.
