Cours
Les DataFrame PySpark sont essentiels lorsque vous créez des pipelines évolutifs dans Spark. Il est essentiel de garder à l'esprit que les DataFrames sont immuables. Cela signifie qu'une fois que vous en avez un, vous ne pouvez pas le modifier directement ; vous obtenez toujours un nouveau DataFrame chaque fois que vous effectuez une modification. C'est là qu'intervient l'withColumn()
, PySpark. Il vous permet d'ajouter, de mettre à jour ou de modifier des colonnes, mais comme les DataFrame ne changent pas sur place, vous devez toujours réaffecter le résultat à une nouvelle variable.
Dans ce tutoriel, je vais vous expliquer comment utiliser withColumn()
pour façonner et ajuster vos ensembles de données, que vous créiez des fonctionnalités, nettoyiez des types ou ajoutiez de la logique.
Que représente la fonction withColumn() dans PySpark ?
En résumé, la méthode ` withColumn()
` renvoie un nouveau DataFrame avec une colonne ajoutée ou remplacée. Étant donné que les DataFrame ne mutent pas, il est nécessaire d'assigner cette valeur de retour, df = df.withColumn(...)
.
En arrière-plan, chaque appel à withColumn()
ajoute une projection dans le plan logique. Cela ne pose pas de problème si vous n'en effectuez qu'une ou deux, mais en enchaînant plusieurs, vous risquez de surcharger le plan, ce qui ralentira considérablement Spark.
Vous débutez avec PySpark ? Vous pouvez acquérir les bases nécessaires pour gérer facilement le Big Data en apprenant à traiter, interroger et optimiser des ensembles de données volumineux afin de réaliser des analyses puissantes dans notre cours Introduction à PySpark.
Utilisations principales de PySpark avec column()
Examinons les principales utilisations d'withColumn()
:
Ajouter une colonne constante
Supposons que vous souhaitiez ajouter un horodatage ou un indicateur. Veuillez utiliser lit()
ou typedLit()
à partir de pyspark.sql.functions
. Par exemple :
from pyspark.sql.functions import lit
df = df.withColumn("ingest_date", lit("2025-07-29"))
Création d'une colonne à partir de données existantes
Peut-être souhaitez-vous obtenir une valeur dérivée, combiner des chaînes de caractères ou calculer un total. Vous pouvez effectuer les actions suivantes :
from pyspark.sql.functions import col, expr
df = df.withColumn("full_name", col("first_name") + expr(" ' ' + last_name"))
Les transformations arithmétiques ou basées sur des expressions sont également pertinentes dans ce contexte.
Remplacer une colonne existante
Si vous disposez déjà d'une colonne et souhaitez la modifier, withColumn()
la remplace simplement :
df = df.withColumn("age", col("age").cast("integer"))
Il n'est pas nécessaire de supprimer et de rajouter.
Types de données de casting
La modification du type d'une colonne est simple :
df = df.withColumn("price", col("price").cast("decimal(10,2)"))
Je trouve cela particulièrement utile lorsque je lis des fichiers JSON ou CSV où les types sont représentés sous forme de chaînes de caractères.
Si vous souhaitez obtenir davantage d'exemples illustrant ce qu'est PySpark et comment l'utiliser, je vous recommande notre tutoriel « Getting Started with PySpark » (Premiers pas avec PySpark).
Logique conditionnelle et expressions when()
Il arrive parfois que l'on doive effectuer des opérations plus complexes que de simples calculs arithmétiques. Peut-être êtes-vous en train de créer une colonne de statut basée sur un score. Ou signaler les entrées en fonction d'un ensemble de règles. C'est là qu'intervient when()
de pyspark.sql.functions
. Considérez cela comme une déclaration d'IF
. Vous pouvez l'associer à otherwise()
pour couvrir plusieurs chemins.
Voici à quoi cela ressemble :
from pyspark.sql.functions import when
df = df.withColumn(
"grade",
when(col("score") >= 90, "A")
.when(col("score") >= 80, "B")
.when(col("score") >= 70, "C")
.otherwise("F")
)
Cela se lit presque comme de l'anglais courant : Si la note est d'au moins 90, alors A. Sinon, si elle est de 80, alors B. Continuez... et si aucune de ces réponses ne correspond, attribuez un F. C'est expressif, et Spark transforme cette logique en une expression efficace en arrière-plan. Pas de boucles imbriquées, pas d'appels à l'apply()
, seulement des graphes d'arêtes dirigées (DAG) clairs et des plans d'exécution précis.
Cela s'avère utile lorsque vous souhaitez éviter de passer à SQL ou d'encombrer votre code avec des fonctions définies par l'utilisateur.
Vous pouvez apprendre à manipuler des données et à créer des ensembles de fonctionnalités d'apprentissage automatique dans Spark à l'aide de SQL en Python grâce à notre tutoriel Introduction à Spark SQL en Python .
Transformation de colonnes à l'aide de fonctions intégrées et définies par l'utilisateur
Il est fréquent de devoir formater ou restructurer des colonnes, convertir des chaînes en majuscules, les diviser en plusieurs parties, concaténer des valeurs, etc. PySpark dispose d'une bibliothèque complète de fonctions intégrées qui fonctionnent directement dans l'environnement d'exécution de l'analyse de données ( withColumn()
).
Voici un exemple :
from pyspark.sql.functions import upper, concat_ws, split
df = df.withColumn("full_caps", upper(col("name")))
df = df.withColumn("city_state", concat_ws(", ", col("city"), col("state")))
df = df.withColumn("first_word", split(col("description"), " ").getItem(0))
Les éléments encastrés sont très pratiques. Rapide, natif et optimisé. Cependant, il arrive parfois que vous ayez une règle unique qui ne correspond à aucune catégorie. C'est là que les fonctions définies par l'utilisateur (UDF) entrent en jeu.
Utilisation d'une fonction définie par l'utilisateur (UDF)
Supposons que vous souhaitiez calculer la longueur d'une chaîne et la nommer :
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def label_length(x):
return "short" if len(x) < 5 else "long"
label_udf = udf(label_length, StringType())
df = df.withColumn("name_length_label", label_udf(col("name")))
C'est assez simple. Cependant, il est important de noter que les UDF entraînent des frais supplémentaires. Ils extraient les données du moteur optimisé, les exécutent dans Python, puis les réintègrent. Cela convient lorsque nécessaire, mais pour les tâches à volume élevé, il est préférable d'utiliser des expressions intégrées ou SQL si possible.
Découvrez comment créer, optimiser et utiliser les fonctions UDF PySpark, y compris les fonctions UDF Pandas, afin de gérer efficacement les transformations de données personnalisées et d'améliorer les performances de Spark grâce à notre tutoriel intitulé « Comment utiliser efficacement les fonctions UDF PySpark et Pandas ».
Considérations relatives aux performances et pratiques avancées
À un moment donné, chaque utilisateur de PySpark est confronté à cette situation : vous continuez à empiler les appels d'withColumn()
s et votre pipeline ralentit considérablement. La raison ? Chaque appel ajoute une nouvelle couche au plan logique, que Spark doit analyser, optimiser et exécuter.
Si vous n'ajoutez qu'une ou deux colonnes, cela ne pose aucun problème. Cependant, si vous en enchaînez cinq, six ou plus, il serait judicieux de reconsidérer votre approche.
Veuillez utiliser select()
lors de l'ajout de nombreuses colonnes
Au lieu d'appeler withColumn()
à plusieurs reprises, veuillez créer une nouvelle liste de colonnes à l'aide de select()
:
df = df.select(
"*",
(col("salary") * 0.1).alias("bonus"),
(col("age") + 5).alias("age_plus_five")
)
Cette approche permet de construire le plan logique en une seule fois.
Pour en savoir plus sur select()
et d'autres méthodes, veuillez consulter notre fiche pratique PySpark : Tutoriel Spark dans Python.
Et qu'en est-il de withColumns()
?
Introduit dans Spark 3.3, le filtre ` withColumns()
` vous permet d'ajouter plusieurs colonnes en une seule fois. Veuillez noter qu'il n'est plus nécessaire de nous appeler à plusieurs reprises Il s'agit d'une méthode de type dictionnaire :
df = df.withColumns({
"bonus": col("salary") * 0.1,
"age_plus_five": col("age") + 5
})
Tout le monde n'a pas encore adopté Spark 3.3+, mais si vous l'avez déjà fait, veuillez utiliser ceci. C'est plus propre, plus rapide et cela évite le problème de « mort par chaînage ».
Exemple complet d'utilisation de PySpark avec Column()
Supposons que vous travaillez sur les données relatives à l'activité des utilisateurs d'une plateforme par abonnement. Votre DataFrame brut se présente comme suit :
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("withColumn-demo").getOrCreate()
data = [
("Alice", "NY", 24, 129.99),
("Bob", "CA", 31, 199.95),
("Charlie", "TX", 45, 0.0),
("Diana", "WA", 17, 19.99)
]
columns = ["name", "state", "age", "purchase_amount"]
df = spark.createDataFrame(data, columns)
Vous disposez des noms, des États, des âges et des montants dépensés. Assez basique. Cependant, dans la réalité, cela n'est jamais suffisant. Voici donc ce que nous allons faire ensuite :
- Veuillez ajouter une colonne constante pour la date d'ingestion.
- Veuillez créer une nouvelle colonne indiquant si l'utilisateur est un adulte.
- Veuillez formater l'
purchase_amount
e à deux décimales. - Classez les utilisateurs par niveau de dépenses.
- Appliquez une fonction personnalisée pour étiqueter les utilisateurs.
- Veuillez utiliser
withColumns()
pour regrouper les valeurs supplémentaires de manière plus claire.
Vous vous préparez pour votre prochain entretien ? L'article « Les 36 questions et réponses les plus fréquentes lors d'entretiens d'embauche sur PySpark pour 2025 » fournit un guide complet des questions et réponses d'entretien sur PySpark, couvrant des sujets allant des concepts fondamentaux aux techniques avancées et aux stratégies d'optimisation.
Étape 1 : Veuillez ajouter une date d'ingestion constante.
Il est recommandé de suivre le cursus de la date à laquelle les données sont saisies dans votre système.
from pyspark.sql.functions import lit
df = df.withColumn("ingest_date", lit("2025-07-29"))
Étape 2 : Signaler des adultes par rapport à des mineurs
Vous auriez pu utiliser uniquement col("age") >= 18
, mais l'envelopper avec when()
vous donne un contrôle total si la logique venait à se compliquer.
from pyspark.sql.functions import when, col
df = df.withColumn(
"is_adult",
when(col("age") >= 18, True).otherwise(False)
)
Étape 3 : Format du montant de l'achat
Le transtypage est l'une des tâches de nettoyage les plus fréquentes que vous effectuerez, en particulier lors de la lecture de fichiers CSV ou JSON.
df = df.withColumn("purchase_amount", col("purchase_amount").cast("decimal(10,2)"))
Étape 4 : Catégoriser le niveau de dépenses
Supposons que vous souhaitiez trois groupes : « aucun », « faible » et « élevé ».
df = df.withColumn(
"spend_category",
when(col("purchase_amount") == 0, "none")
.when(col("purchase_amount") < 100, "low")
.otherwise("high")
)
Cela vous permet de segmenter les utilisateurs sans avoir à exécuter une requête distincte.
Étape 5 : Identifier les utilisateurs à l'aide d'une fonction définie par l'utilisateur (UDF)
Maintenant, une règle inventée. Supposons que vous étiquetez une personne en fonction de la longueur de son nom.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def user_label(name):
return "simple" if len(name) <= 4 else "complex"
label_udf = udf(user_label, StringType())
df = df.withColumn("label", label_udf(col("name")))
Étape 6 : Ajouter plusieurs colonnes supplémentaires en une seule fois
Peut-être souhaitez-vous en ajouter quelques-uns, l'âge en mois et un message de bienvenue.
df = df.withColumns({
"age_in_months": col("age") * 12,
"welcome_msg": col("name") + lit(", welcome aboard!")
})
Beaucoup plus efficace que d'appeler withColumn()
à deux reprises.
Voici à quoi ressemble le DataFrame final lorsque vous l'affichez :
df.show(truncate=False)
nom |
État |
âge |
purchase_amount |
ingest_date |
is_adult |
catégorie de dépenses |
étiquette |
age_in_months |
welcome_msg |
Alice |
NEW YORK |
24 |
129,99 |
2025-07-29 |
Vrai |
élevé |
complexe |
288 |
Alice, bienvenue à bord ! |
Bob |
CA |
31 |
199,95 |
2025-07-29 |
Vrai |
élevé |
simple |
372 |
Bob, bienvenue à bord ! |
Charlie |
TX |
45 |
0,00 |
2025-07-29 |
Vrai |
aucun |
complexe |
540 |
Monsieur Charlie, bienvenue à bord. |
Diana |
WA |
17 |
19,99 |
2025-07-29 |
Faux |
faible |
complexe |
204 |
Diana, bienvenue à bord ! |
Ce type de pile de transformation est couramment utilisé dans l'ingénierie des fonctionnalités, le reporting ou le nettoyage des flux tiers.
Découvrez les principes fondamentaux du traitement des mégadonnées avec PySpark grâce à notre cours « Principes fondamentaux des mégadonnées avec PySpark ».
Meilleures pratiques et pièges à éviter avec la fonction withColumn()
withColumn()
de PySpark peut sembler simple, mais il peut poser des difficultés même aux ingénieurs expérimentés si l'on n'y prête pas attention. Voici le type de problèmes qui peuvent discrètement perturber votre pipeline, ainsi que quelques habitudes qui peuvent vous éviter de mauvaises surprises.
Veuillez toujours réaffecter le résultat.
Cette opération est élémentaire, mais elle surprend souvent les utilisateurs : withColumn()
ne modifie pas votre DataFrame d'origine. Il vous en fournit un nouveau. Si vous omettez de le réattribuer, votre modification sera perdue.
df.withColumn("new_col", lit(1)) # This won't do anything
df = df.withColumn("new_col", lit(1)) # This works
Veuillez faire attention aux écrasements accidentels.
Supposons que votre DataFrame comporte une colonne intitulée « status ». Veuillez exécuter ceci :
df = df.withColumn("Status", lit("Active"))
Cela semble inoffensif, n'est-ce pas ? Cependant, Spark traite les noms de colonnes comme insensibles à la casse par défaut. Cela signifie que vous venez de remplacer votre colonne d'état d'origine. Sans s'en rendre compte.
Une solution consiste à toujours vérifier df.columns
avant et après. Ou, si votre pipeline le permet, activez la distinction entre majuscules et minuscules à l'aide de :
spark.conf.set("spark.sql.caseSensitive", "true")
Veuillez éviter d'utiliser des littéraux Python dans les expressions.
Celui-ci est facile à oublier. Lorsque vous ajoutez des constantes, veuillez éviter d'utiliser des valeurs brutes de Python. Veuillez toujours les emballer avec lit()
.
df = df.withColumn("region", "US") # Bad
df = df.withColumn("region", lit("US")) # Good
Pourquoi ? Étant donné que withColumn()
attend une expression Column, et non une valeur brute. Si vous commettez une erreur, Spark peut générer une erreur peu utile, ou pire, interrompre silencieusement la logique en aval.
Traiter les exceptions en dehors de withColumn()
Il arrive que les personnes fassent preuve de créativité et placent des blocs d'withColumn()
s entiers à l'intérieur de try/except. Il est préférable d'isoler les parties à risque (telles que les UDF ou les lectures de données) et d'y intercepter les exceptions. Veuillez maintenir votre couche de transformation claire et prévisible.
try:
def risky_udf(x):
if not x:
raise ValueError("Empty input")
return x.lower()
except Exception as e:
print("Error in UDF definition:", e)
Permettez à Spark d'échouer rapidement, ne le dissimulez pas derrière des blocs try imbriqués.
Pour en savoir plus sur les exceptions en Python, veuillez consulter notre tutoriel Gestion des exceptions et des erreurs en Python.
Privilégiez les fonctions intégrées plutôt que les fonctions définies par l'utilisateur.
Bien sûr, les UDF vous offrent une grande puissance. Cependant, cela implique certains compromis : performances réduites, débogage plus complexe et optimisation moindre. Si une fonction intégrée permet d'accomplir cette tâche, veuillez l'utiliser.
Ceci :
df = df.withColumn("upper_name", upper(col("name")))
Est beaucoup plus rapide que ceci :
df = df.withColumn("upper_name", udf(lambda x: x.upper(), StringType())(col("name")))
Quand ne pas utiliser withColumn()
Malgré la flexibilité d'withColumn()
, il existe des situations où cet outil n'est pas le plus approprié pour accomplir une tâche.
Vous êtes en train de remodeler un grand nombre de colonnes simultanément.
Si vous vous surprenez à appeler withColumn()
dix fois de suite, il est temps de changer de stratégie. Veuillez utiliser select()
à la place et inscrire vos transformations dans le cadre d'une nouvelle projection.
df = df.select(
col("name"),
col("age"),
(col("salary") * 0.15).alias("bonus"),
(col("score") + 10).alias("adjusted_score")
)
Il est plus clair, plus performant et permet à l'optimiseur de Spark de travailler en votre faveur plutôt que contre vous.
Vous souhaitez écrire une logique de type SQL.
Si votre équipe s'appuie fortement sur SQL et que vous avez déjà enregistré le DataFrame en tant que vue temporaire, il est souvent plus simple d'exécuter une requête SQL.
df.createOrReplaceTempView("users")
df2 = spark.sql("""
SELECT name, age,
CASE WHEN age >= 18 THEN true ELSE false END AS is_adult
FROM users
""")
Cela peut être plus facile pour les analystes ou les équipes familiarisés avec SQL qui travaillent à la fois avec Spark et les bases de données traditionnelles.
Développez vos compétences en SQL grâce à des cours interactifs, des cursus et des projets conçus par des experts du domaine à l'aide de nos cours SQL.
Vous utilisez déjà Spark 3.3+
Si vous utilisez Spark 3.3 ou une version plus récente et que vous avez besoin d'ajouter plusieurs colonnes, la méthode ` withColumns()
` est votre alliée. Ce n'est pas seulement pratique, cela peut également être plus rapide en arrière-plan en créant une seule mise à jour logique du plan.
Apprenez à mettre en œuvre la gestion distribuée des données et l'apprentissage automatique dans Spark à l'aide du package PySpark grâce à notre cours « Fondements de PySpark ».
Conclusion
La fonction d'withColumn()
ion de PySpark constitue l'un des outils les plus polyvalents de votre arsenal de transformation de données, vous permettant d'ajouter, de modifier et de concevoir des fonctionnalités directement dans un flux de travail centré sur DataFrame. De la conversion des types de données et des constantes d'injection à l'intégration de logiques complexes avec des conditions et des fonctions définies par l'utilisateur, withColumn()
vous assiste dans la transformation de données désorganisées en pipelines prêts à être mis en production.
Cependant, ce pouvoir implique des responsabilités. L'utilisation excessive d'withColumn()
s dans de longues chaînes peut nuire aux performances de manière imperceptible en alourdissant le plan logique, ce qui rend vos tâches Spark plus difficiles à optimiser et à déboguer. C'est pourquoi savoir quand utiliser select()
, withColumns()
ou même SQL peut faire la différence entre un travail qui avance lentement et un travail qui évolue rapidement.
Spark étant en constante évolution, notamment avec des fonctionnalités telles que l' withColumns()
e dans Spark 3.3+, il est essentiel de comprendre le fonctionnement interne et les compromis en termes de performances de chaque méthode afin de rédiger un code plus propre, plus rapide et plus facile à maintenir.
Maîtrisez les techniques sous-jacentes aux transformations de colonnes à grande échelle, évitez les pièges liés à l'inflation des plans et découvrez comment les professionnels rationalisent les pipelines de fonctionnalités dans notre cours Ingénierie des fonctionnalités avec PySpark .
PySpark withColumn() - Questions fréquentes
Pourquoi mon travail Spark est-il lent lorsque j'utilise fréquemment withColumn() ?
Lorsque vous enchaînez plusieurs appels à la méthode ` withColumn()
`, Spark ajoute chacun d'entre eux comme une étape distincte dans le plan d'exécution logique. Au fil du temps, cela peut se transformer en un plan trop lourd, plus difficile à optimiser et plus lent à exécuter. Au lieu d'empiler dix appels withColumn()
, essayez de créer vos nouvelles colonnes à l'intérieur d'un seul select()
ou utilisez withColumns()
pour ajouter plusieurs colonnes en une seule fois.
Puis-je utiliser withColumn() pour supprimer une colonne ?
Non, withColumn()
permet uniquement d'ajouter ou de remplacer des colonnes, mais ne les supprime pas. Si vous souhaitez supprimer une colonne, veuillez utiliser drop()
à la place. Vous pouvez également utiliser select()
pour ne conserver que les colonnes dont vous avez besoin.
Pourquoi une erreur se produit-elle lorsque j'essaie d'utiliser une chaîne ou un nombre dans withColumn() ?
Cela se produit généralement lorsque vous transmettez une valeur brute de Python au lieu de l'encapsuler dans lit()
.withColumn()
attend une expression Spark Column. Voici la méthode appropriée :
from pyspark.sql.functions import lit
df = df.withColumn("new_col", lit(42))
