Curso
As funções definidas pelo usuário (UDFs) no PySpark oferecem aos desenvolvedores Python uma maneira poderosa de lidar com tarefas exclusivas que as funções integradas do Spark simplesmente não conseguem gerenciar. Se você é um engenheiro de dados, analista ou cientista proficiente em Python, entender os conceitos de UDF pode permitir que você enfrente com eficácia desafios complexos de dados do mundo real.
Este tutorial guia você pelos conceitos de UDF do PySpark, implementações práticas e práticas recomendadas de otimização, teste, depuração e padrões de uso avançados. Ao final, você será capaz de escrever, otimizar e implementar UDFs eficientes em escala com confiança.
Se você não tem experiência com o PySpark, recomendo que primeiro dê uma olhada em nosso Tutorial de introdução ao PySpark pois o que abordamos aqui são conceitos avançados do Spark.
O que são UDFs do PySpark?
As UDFs do PySpark são funções personalizadas do Python integradas à estrutura distribuída do Spark para operar em dados armazenados nos DataFrames do Spark. Ao contrário das funções integradas do Spark, as UDFs permitem que os desenvolvedores apliquem uma lógica complexa e personalizada no nível da linha ou da coluna.
Nossa Folha de dicas do PySpark cobre tudo o que você precisa saber sobre DataFrames do Spark, facilitando ainda mais a compreensão das UDFs do Spark.
Quando você deve usar os UDFs do PySpark?
Use UDFs quando:
- Você precisa de uma lógica que não pode ser expressa usando as funções integradas do Spark.
- Sua transformação envolve operações complexas nativas do Python (por exemplo, manipulações de regex, lógica NLP personalizada).
- Você não se importa em trocar desempenho por flexibilidade, especialmente durante a criação de protótipos ou para conjuntos de dados pequenos e médios.
Evite UDFs quando:
- Existe uma funcionalidade equivalente em
pyspark.sql.functions
, mas as funções nativas do Spark são mais rápidas, otimizadas e podem ser transferidas para o mecanismo de execução. - Você está trabalhando com grandes conjuntos de dados em que o desempenho é fundamental. Os UDFs introduzem uma sobrecarga de serialização e prejudicam a capacidade do Spark de otimizar os planos de execução.
- Você pode expressar sua lógica usando expressões SQL, suplementos incorporados do Spark SQL ou UDFs do Pandas (para operações vetorizadas).
Aplicativos estratégicos em engenharia de dados
Aqui estão os principais casos de uso das UDFs do PySpark:
- Transformações complexas de dados, como análise avançada de texto, extração de dados ou manipulação de strings.
- Integração com bibliotecas Python de terceiros, incluindo estruturas populares de machine learning, como TensorFlow e XGBoost.
- Conectar sistemas legados e oferecer suporte à evolução contínua do esquema à medida que as estruturas de dados mudam.
Os UDFs simplificam as tarefas complicadas de engenharia de dados do mundo real, capacitando as equipes a lidar com diversos requisitos de forma flexível e eficaz.
Vamos agora descobrir como você pode implementar UDFs do PySpark.
Implementação de UDFs do PySpark
Esta seção descreve como definir e implementar UDFs do PySpark na prática.
Métodos de declaração de UDF padrão
Há três abordagens comuns para declarar UDFs no PySpark:
1. UDFs baseadas em Lambda: Você pode definir rapidamente diretamente nas consultas de DataFrame; melhor para operações simples.
O UDF baseado em Lambda (Basic Python UDF) é melhor para transformações rápidas e simples. Evite-os em trabalhos de grande porte em que o desempenho seja importante.
Aqui está um exemplo:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
uppercase = udf(lambda x: x.upper() if x else None, StringType())
df = spark.createDataFrame([("Ada",), (None,)], ["name"])
df.withColumn("upper_name", uppercase("name")).show()
2. Funções Python decoradas: Anotação explícita usando @pyspark.sql.functions.udf
, dando suporte à reutilização e à legibilidade.
Por exemplo:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
@F.udf(returnType=IntegerType())
def str_length(s):
return len(s) if s else 0
df.withColumn("name_length", str_length("name")).show()
3. UDFs registrados no SQL: Registrados diretamente nos contextos do Spark SQL, permitindo que você os use em consultas SQL.
from pyspark.sql.types import StringType
def reverse_string(s):
return s[::-1] if s else ""
spark.udf.register("reverse_udf", reverse_string, StringType())
df.createOrReplaceTempView("people")
spark.sql("""SELECT name, reverse_udf(name) AS reversed FROM people""").show()
Cada método tem compensações: as UDFs lambda são concisas, mas limitadas, enquanto as anotações de função favorecem a legibilidade, a manutenção e as práticas recomendadas.
Os UDFs do Pandas permitem operações vetorizadas em lotes Arrow. Em geral, elas são mais rápidas do que as UDFs comuns e se integram melhor ao mecanismo de execução do Spark.
UDF do Scalar Pandas (com base em elementos, como o mapa)
Eles são mais adequados para transformações rápidas e em linhas em grandes conjuntos de dados. Por exemplo:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd
@pandas_udf(IntegerType())
def pandas_strlen(s: pd.Series) -> pd.Series:
return s.str.len()
df.withColumn("name_len", pandas_strlen("name")).show()
Mapa agrupado Pandas UDF
Esses são os melhores para lógica personalizada por grupo, semelhante ao groupby().apply()
em Pandas.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
schema = StructType([
StructField("group", StringType()),
StructField("avg_val", DoubleType())
])
@pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def group_avg(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({
"group": [pdf["group"].iloc[0]],
"avg_val": [pdf["value"].mean()]
})
df.groupBy("group").apply(group_avg).show()
UDF do Pandas Aggregate
Este executa Agregações sobre grupos, mais rápido do que o mapa agrupado. Por exemplo:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
@pandas_udf(DoubleType(), functionType="grouped_agg")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.groupBy("category").agg(mean_udf("value").alias("mean_value")).show()
Iterador UDF do Pandas
O Pandas Iterator UDF é melhor para grandes conjuntos de dados que exigem processamento com pouca memória (em lote). Por exemplo:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from typing import Iterator
import pandas as pd
@pandas_udf(IntegerType(), functionType="iterator")
def batch_sum(it: Iterator[pd.Series]) -> Iterator[pd.Series]:
for batch in it:
yield batch + 1
df.withColumn("incremented", batch_sum("id")).show()
Manuseio de tipos e segurança de nulos
Tipos e valores nulos representam desafios frequentes para as UDFs do PySpark. O PySpark impõe uma verificação rigorosa de tipos, o que geralmente causa conversões implícitas de tipos ou problemas de tempo de execução. Além disso, o Spark passa valores nulos diretamente para os UDFs, criando possíveis falhas se você não lidar com eles explicitamente.
Garanta UDFs robustos com essas estratégias:
- Especificar explicitamente os tipos de retorno.
- Incorpore verificações de nulidade (por exemplo, instruções condicionais) em suas funções Python.
- Adote práticas de codificação defensivas; verificações simples de nulo evitam exceções frustrantes em tempo de execução.
Otimização do desempenho do UDF
O desempenho costuma ser o calcanhar de Aquiles das UDFs padrão devido ao seu modelo de execução linha a linha. Se você aproveitar as UDFs vetorizadas e as ferramentas de otimização do Spark, os tempos de execução serão significativamente melhores.
UDFs vetorizados com integração do Pandas
As UDFs do Pandas introduzem uma abordagem vetorizada para as UDFs no PySpark, passando lotes de dados como séries do Pandas para as funções do Python. Esse design melhora significativamente o desempenho ao reduzir a sobrecarga de serialização em comparação com os UDFs padrão baseados em linhas.
Com o suporte do Apache Arrow para transferência de dados sem cópia entre a JVM e os processos Python, os UDFs do Pandas permitem a execução eficiente de operações em escala. Eles são particularmente eficazes para cálculos intensivos e manipulações complexas de strings em milhões de registros.
Abordamos mais detalhes sobre a manipulação de dados com o PySpark em nosso Curso de limpeza de dados com PySpark.
Além disso, os UDFs do Pandas permitem uma integração perfeita com o ecossistema mais amplo de ciência de dados do Python, aproveitando ferramentas e fluxos de trabalho familiares.
Tipo de UDF |
Estilo de execução |
Velocidade |
Otimizações do Spark |
Melhor para |
Notas |
UDF padrão |
Linha por linha (Python) |
Lento |
Não otimizado |
Lógica simples, conjuntos de dados pequenos |
Fácil de escrever, mas caro |
Pandas Scalar UDF |
Vetorizado (em colunas) |
Rápido |
Com suporte de seta |
Operações numéricas, transformações de strings |
Use UDFs padrão sempre que possível |
UDF de mapa agrupado do Pandas |
Por grupo (DataFrame do Pandas) |
Medium–Fast |
Com suporte de seta |
Transformações em grupos |
O esquema de saída deve ser definido manualmente |
UDF do Pandas Aggregate |
Por grupo (entrada em série → saída escalar) |
Rápido |
Otimizado |
Agregações como média, soma |
Mais simples do que o mapa agrupado |
Iterador UDF do Pandas |
Iterador de lote (streaming) |
Rápido |
Otimizado |
Processamento de grandes lotes com segurança |
Menor consumo de memória |
Técnicas de otimização de setas
O formato de memória colunar do Apache Arrow permite que você transfira dados com eficiência e sem cópia entre o Spark JVM e os processos Python. Ao habilitar o Arrow (spark.sql.execution.arrow.pyspark.enabled=true
) nas configurações do Spark, os dados se movem rapidamente entre os ambientes JVM e Python, acelerando consideravelmente a execução do UDF.
Otimização do plano de execução
Para otimizar os trabalhos do PySpark, você precisa entender como influenciar o otimizador Catalyst do Spark. As estratégias avançadas incluem técnicas como predicado pushdown, poda de coluna e uso de dicas de união de broadcast para melhorar o planejamento da consulta e a eficiência da execução.
Para maximizar o desempenho, é importante que você minimize o escopo da execução do UDF e dê preferência às funções SQL integradas do Spark sempre que possível. O uso estratégico do armazenamento em cache e a elaboração cuidadosa do plano podem aumentar ainda mais a velocidade de execução do trabalho e a utilização dos recursos.
A otimização do desempenho é uma das principais perguntas que você pode encontrar em uma entrevista do PySpark. Descubra como você pode responder a essa e a outras perguntas sobre o Spark em nossas 36 principais perguntas e respostas para entrevistas com PySpark em 2025 publicação no blog.
Padrões avançados e antipadrões
Compreender os padrões de uso adequados e inadequados ajuda a garantir implementações de UDF estáveis e eficientes.
Implementações de UDF com estado
As UDFs com estado e não determinísticas apresentam desafios únicos no PySpark. Essas funções produzem resultados que dependem do estado externo ou de condições variáveis, como variáveis de ambiente, tempo do sistema ou contexto da sessão.
Embora os UDFs não determinísticos às vezes sejam necessários - por exemplo, para gerar registros de data e hora, rastrear sessões de usuários ou introduzir aleatoriedade -, eles podem complicar a depuração, a reprodutibilidade e a otimização.
A implementação de UDFs com estado exige padrões de projeto cuidadosos: documentar o comportamento com clareza, isolar os efeitos colaterais e adicionar um registro completo para ajudar na solução de problemas e garantir a consistência entre as execuções de trabalho.
Quando usados com cuidado, eles podem liberar recursos poderosos, mas a manutenção de pipelines de dados confiáveis exige um gerenciamento disciplinado. Nosso curso Curso Fundamentos de Big Data com PySpark aborda em mais detalhes como lidar com big data no PySpark.
Antipadrões comuns
Os antipadrões comuns no uso de UDFs podem degradar significativamente o desempenho do PySpark:
- Processamento por linha em vez de processamento em lote: A aplicação de UDFs a linhas individuais, em vez de usar abordagens vetorizadas, como as UDFs do Pandas, leva a grandes lentidões na execução.
- Operações DataFrame aninhadas dentro de UDFs: Se você incorporar consultas DataFrame em UDFs, isso causará um excesso de computação e prejudicará a capacidade do Spark de otimizar os planos de execução.
- Registro repetido de UDFs em linha: Definir e registrar UDFs várias vezes dentro das consultas adiciona uma sobrecarga desnecessária; é melhor declarar UDFs uma vez e reutilizá-las entre os trabalhos.
- Uso excessivo da lógica Python personalizada para operações simples: Tarefas como filtragem básica, aritmética ou transformações simples devem favorecer as funções integradas altamente otimizadas do Spark em vez das UDFs personalizadas.
Ao evitar essas armadilhas, você garante melhor desempenho, otimização mais fácil pelo Catalyst e código PySpark mais fácil de manter.
Depuração e teste de UDFs do PySpark
O teste e a depuração de UDFs garantem a confiabilidade e a robustez em cenários de produção.
Padrões de tratamento de exceções
A implementação da captura de erros estruturada em UDFs é essencial para a criação de pipelines PySpark resilientes e de fácil manutenção. Use blocos try-except dentro de UDFs para lidar com surpresas em tempo de execução, como valores nulos, incompatibilidade de tipos ou erros de divisão por zero.
O tratamento robusto de exceções estabiliza os pipelines em relação a dados inesperados e simplifica a depuração, apresentando mensagens de erro claras e acionáveis. As exceções devidamente capturadas e registradas tornam o comportamento do UDF mais transparente, acelerando a resolução de problemas e melhorando a confiabilidade geral do pipeline.
Estruturas de teste de unidade
Use a classe base de teste integrada do PySpark, pyspark.testing.utils.ReusedPySparkTestCase
, juntamente com estruturas como pytest
, para escrever testes de unidade confiáveis para suas UDFs. Ao estruturar testes claros e focados, você garante a correção, a estabilidade e a capacidade de manutenção da sua lógica de UDF ao longo do tempo.
As práticas recomendadas para testar UDFs incluem a cobertura de casos típicos e extremos, a validação de resultados em relação a resultados conhecidos e o isolamento do comportamento do UDF de dependências externas. Testes bem projetados não apenas protegem contra regressões, mas também simplificam o desenvolvimento futuro e os esforços de refatoração.
Evolução e direções futuras
O ecossistema PySpark continua a evoluir rapidamente, introduzindo novos recursos que aprimoram ainda mais as UDFs.
Integração do Unity Catalog
Desenvolvimentos recentes integraram o registro de UDF ao Catálogo Unitysimplificando a forma como os UDFs são gerenciados, descobertos e governados em escala. O Unity Catalog permite o controle centralizado do gerenciamento do ciclo de vida do UDF, incluindo registro, controle de versão e controle de acesso, todos essenciais para ambientes corporativos.
Essa integração aprimora a governança, aplica políticas de segurança consistentes e melhora a capacidade de descoberta entre as equipes, facilitando a reutilização, a auditoria e o gerenciamento de UDFs em ecossistemas de dados grandes e complexos.
UDFs aceleradas por GPU
Estruturas como o RAPIDS Accelerator permitem o descarregamento de GPU para tarefas UDF com uso intensivo de computação no PySpark, oferecendo melhorias de desempenho transformadoras. Ao transferir operações pesadas, como análise numérica, inferência de aprendizagem profunda e modelagem de dados em grande escala, para GPUs, o RAPIDS pode reduzir os tempos de execução de horas para minutos em cargas de trabalho adequadas.
A aceleração de GPU é particularmente benéfica para cenários que envolvem conjuntos de dados maciços, cálculos vetorizados complexos e pipelines de machine learning, expandindo drasticamente o desempenho e a escalabilidade do PySpark para tarefas modernas de engenharia de dados. Nosso curso Curso de machine learning com PySpark se aprofunda nesses conceitos.
Conclusão
As UDFs do PySpark são uma ferramenta poderosa para estender os recursos do Spark, permitindo que as equipes lidem com tarefas complexas e personalizadas de processamento de dados que vão além das funções incorporadas. Quando aplicados corretamente, eles proporcionam flexibilidade e inovação em pipelines de dados em grande escala.
No entanto, a otimização do desempenho das UDFs requer atenção cuidadosa, evitando armadilhas comuns, como operações em linhas, gerenciando exceções de forma elegante e aproveitando técnicas como a vetorização de UDFs do Pandas com a integração do Arrow.
Os avanços emergentes, como a aceleração de GPU por meio de estruturas como o RAPIDS, estão expandindo ainda mais o que é possível com fluxos de trabalho orientados por UDF. Se você estiver transformando dados confusos do mundo real ou incorporando análises avançadas em sistemas de produção, dominar as práticas recomendadas de UDF é essencial para criar pipelines de dados rápidos, eficientes e confiáveis.
Aprenda os detalhes essenciais com os quais os cientistas de dados gastam de 70 a 80% do seu tempo, a organização de dados e a engenharia de recursos, em nosso curso Curso de engenharia de recursos com PySpark.
Perguntas frequentes sobre o PySpark UDF
Quando devo usar uma UDF do PySpark em vez de uma função integrada?
Você deve usar uma UDF do PySpark somente quando sua transformação não puder ser obtida usando as funções integradas do Spark. As funções incorporadas são otimizadas e executadas mais rapidamente do que as UDFs porque operam nativamente na JVM sem sobrecarga de serialização.
Por que as UDFs do Pandas são mais rápidas do que as UDFs normais do Python no PySpark?
As UDFs do Pandas (UDFs vetorizadas) são mais rápidas porque usam o Apache Arrow para serialização eficiente de dados e processam dados em lotes em vez de linha por linha, o que reduz a sobrecarga de movimentação de dados entre a JVM e o interpretador Python.
Sempre preciso especificar um tipo de retorno para uma UDF no PySpark?
Sim, o PySpark exige um tipo de dados de retorno explícito ao definir UDFs. O requisito garante a serialização adequada entre Java e Python e evita erros de tempo de execução.
Como faço para ativar o Apache Arrow em meu aplicativo PySpark?
Você pode ativar o Apache Arrow definindo a seguinte configuração antes de executar qualquer UDF:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Qual é a melhor maneira de lidar com valores nulos em uma UDF do PySpark?
Sempre inclua uma verificação condicional para valores None (nulos) em seu UDF para evitar exceções. Por exemplo: se product_name for None: retorna None.
