Saltar al contenido principal

Tutorial de tuberías de agregación de MongoDB en Python con PyMongo

Explora las canalizaciones de agregación de MongoDB utilizando PyMongo. Comprende el flujo de datos, las etapas como $match, $project, $group, $lookup y los patrones avanzados.
Actualizado 13 jun 2025  · 15 min de lectura

Trabajar con grandes conjuntos de datos suele plantear el reto de extraer patrones y conocimientos significativos manteniendo el rendimiento. Cuando tus aplicaciones almacenan datos en MongoDB, ejecutar consultas y transformaciones complejas directamente en la base de datos puede ser mucho más rápido que trasladar los datos a herramientas de análisis externas. Las canalizaciones de agregación de MongoDB proporcionan una solución al permitirte procesar, transformar y analizar los datos justo donde viven.

Puedes crear flujos de trabajo de tratamiento de datos personalizados conectando operaciones sencillas en secuencia. Cada etapa de la cadena transforma los documentos y pasa los resultados a la etapa siguiente. Por ejemplo, puede que necesites filtrar registros por intervalo de fechas, agruparlos por categorías, calcular medidas estadísticas y dar formato al resultado, todo ello mediante una única operación de base de datos que procese los datos cerca de su fuente.

En este artículo, aprenderás a construir canalizaciones de agregación para resolver retos comunes de datos, con todos los ejemplos demostrados en PyMongo (el cliente Python oficial de MongoDB). 

Aunque este artículo se centra en las canalizaciones de agregación, si eres nuevo en el uso de MongoDBcon Python, el curso Introducción a MongoDB en Python ofrece un punto de partida completo. Con este artículo obtendrás una base suficiente para trasladar estos conceptos de agregación al lenguaje de consulta de MongoDB por tu cuenta o con ayuda de modelos lingüísticos.

¿Qué son los conductos de agregación en MongoDB?

Imagínate que necesitas analizar las opiniones de los clientes de varios productos para comprender las tendencias de satisfacción. Las consultas tradicionales pueden recuperar los datos, pero no ayudan a combinar, analizar y transformar esa información en resúmenes útiles. 

Las canalizaciones de agregación de MongoDB resuelven esto proporcionando una forma estructurada de procesar los datos mediante una serie de operaciones que se construyen unas sobre otras.

Arquitectura de tuberías y flujo de documentos

Arquitectura de la tubería de agregación de MongoDB

Piensa en los conductos de agregación como cadenas de montaje para tus datos. Cada documento de tu colección entra en un extremo de la tubería y pasa por varias estaciones (etapas) donde se filtra, transforma, agrupa o enriquece. 

La salida de una etapa se convierte en la entrada de la siguiente, lo que te permite dividir las transformaciones de datos complejas en pasos más pequeños y manejables.

Estos pipelines utilizan un enfoque declarativo; especificas lo que quieres en cada etapa en lugar de cómo calcularlo. Este enfoque deja claras tus intenciones de procesamiento de datos y permite que MongoDB se encargue de los detalles de ejecución. A continuación, la base de datos puede aplicar diversas optimizaciones basadas en tu estructura de canalización.

El orden de las etapas es importante en el diseño de tu tubería. Filtrar los documentos al principio (antes de agruparlos o realizar cálculos complejos) reduce la cantidad de datos que fluyen por el canal. 

Este enfoque puede mejorar drásticamente el rendimiento cuando se trabaja con grandes colecciones. Un pipeline bien estructurado procesa sólo los datos necesarios para tus resultados finales.

  • Flujo de documentos: Los documentos entran → se transforman a través de etapas → surgen los resultados
  • Diseño declarativo: Especifica lo que quieres, no cómo calcularlo
  • Procesamiento ordenado: La salida de cada etapa alimenta directamente a la etapa siguiente
  • Beneficio de rendimiento: El filtrado temprano reduce el volumen de datos en etapas posteriores

Tipología de etapas y categorías funcionales

Las etapas de agregación de MongoDB se dividen en cuatro categorías principales en función de su finalidad. Las etapas de filtrado como $match funcionan como consultas, seleccionando sólo los documentos que cumplen unos criterios específicos. Esto ayuda a delimitar tu conjunto de datos antes de realizar operaciones más complejas.

Las etapas de remodelación transforman la estructura del documento. Utilizando $project o $addFields, puedes incluir, excluir o renombrar campos, o crear campos calculados a partir de valores existentes. Estas etapas ayudan a simplificar los documentos manteniendo sólo la información relevante y añadiendo los valores computados necesarios para el análisis.

Cuando necesites combinar varios documentos basándote en características compartidas, entran en juego las etapas de agrupación. La etapa $group es el caballo de batalla aquí, ya que te permite calcular recuentos, sumas, medias y otros valores agregados a través de grupos de documentos. Esto transforma miles de registros individuales en resúmenes significativos que responden a tus preguntas analíticas.

Para completar la imagen de tus datos, las etapas de unión como $lookup te permiten combinar información de varias colecciones. Esto te permite enriquecer los documentos con datos relacionados, de forma similar a las uniones SQL pero adaptadas al modelo de documentos de MongoDB. La capacidad de hacer referencia a los datos en todas las colecciones ayuda a mantener una normalización adecuada de los datos sin dejar de ofrecer resultados completos en una sola operación.

  • Filtrado: Selecciona documentos específicos con $match basándote en criterios
  • Dando nueva forma a: Transforma la estructura del documento con $project y $addFields
  • Agrupación: Combinar y resumir con $group para valores agregados
  • Unirse a: Conecta datos relacionados entre colecciones con $lookup

Para una referencia completa sobre capacidades y operadores, puedes consultar el Manual de la Tubería de Agregación de MongoDB.

Configurar el entorno para el tutorial

Antes de sumergirte en las canalizaciones de agregación de MongoDB, necesitas un entorno de trabajo con PyMongo y acceso a un conjunto de datos de muestra. Esta sección te guía a través del proceso de configuración con el conjunto de datos sample_analytics, que contiene datos financieros perfectos para demostrar los conceptos de agregación.

Installing PyMongo

PyMongo es el controlador Python oficial de MongoDB. Puedes instalarlo mediante pip tanto en macOS como en Windows:

# Install PyMongo using pip
pip install pymongo

# Or if you're using conda
conda install -c conda-forge pymongo
  • Compatibilidad de versiones: PyMongo 4.x funciona con Python 3.7+.
  • Gestión de la dependencia: El instalador gestiona automáticamente las dependencias necesarias
  • Compatible con plataformas: Funciona de forma idéntica en Windows, macOS y Linux

Acceder al conjunto de datos de muestra

MongoDB ofrece conjuntos de datos de muestra que puedes utilizar sin necesidad de crear tus propios datos. Lo más sencillo es utilizar MongoDB Atlas (la versión en la nube):

  1. Crea una cuenta gratuita de MongoDB Atlas en mongodb.com/cloud/atlas
  2. Crear un clúster de nivel gratuito

MongoDB Atlas cluster

2.1. Elige la versión gratuita para siempre

2.2. Nombra tu clúster

2.3. Haz clic en "Desplegar"

2.4. Copia tu nombre de usuario y contraseña en algún lugar seguro

2.5. Haz clic en "Crear usuario de base de datos"

2.6. Elige "Controladores" para tu método de conexión

"Controladores" para tu método de conexión

2.7. Copia el URI de conexión para el siguiente paso

3.1. Haz clic en los tres puntos de la vista de clústeres

3.2. Elige "Cargar conjunto de datos de muestra"

3.3. En la lista, selecciona sample_analytics y espera a que se cargue.

Para instalaciones locales de MongoDB, puedes cargar los conjuntos de datos de muestra utilizando:

# Download and restore the sample dataset
python -m pip install pymongo[srv]
python -c "from pymongo import MongoClient; MongoClient().admin.command('getParameter', '*')"
  • Tamaño de los datos: El conjunto de datos sample_analytics es lo suficientemente pequeño (~10 MB) como para funcionar bien en la capa gratuita
  • Estructura de la colección: Contiene cobros de clientes y cuentas con datos de relación
  • Realismo de los datos: Basado en modelos financieros realistas para ejemplos de agregación significativos

Conexión a MongoDB

Ahora que tienes MongoDB y el conjunto de datos de muestra listos, vamos a escribir el código para conectar y verificar nuestra configuración:

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = "mongodb+srv://bex:gTVAbSjPzuhRUiyE@cluster0.jdohtoe.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
   client.admin.command('ping')
   print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
   print(e)
Pinged your deployment. You successfully connected to MongoDB!
# Access the sample_analytics database
db = client.sample_analytics

# Verify connection by counting documents in collections
customer_count = db.customers.count_documents({})
account_count = db.accounts.count_documents({})

print(f"Found {customer_count} customers and {account_count} accounts in sample_analytics")

# Preview one document from each collection
print("\nSample customer document:")
print(db.customers.find_one())

print("\nSample account document:")
print(db.accounts.find_one())

Salida:

Found 500 customers and 1746 accounts in sample_analytics

Sample customer document:
{'_id': ObjectId('5ca4bbcea2dd94ee58162a68'), 'username': 'fmiller', 'name': 'Elizabeth Ray', 'address': '9286 Bethany Glens\nVasqueztown, CO 22939', 'birthdate': datetime.datetime(1977, 3, 2, 2, 20, 31), 'email': 'arroyocolton@gmail.com', 'active': True, 'accounts': [371138, 324287, 276528, 332179, 422649, 387979], 'tier_and_details': {'0df078f33aa74a2e9696e0520c1a828a': {'tier': 'Bronze', 'id': '0df078f33aa74a2e9696e0520c1a828a', 'active': True, 'benefits': ['sports tickets']}, '699456451cc24f028d2aa99d7534c219': {'tier': 'Bronze', 'benefits': ['24 hour dedicated line', 'concierge services'], 'active': True, 'id': '699456451cc24f028d2aa99d7534c219'}}}

Sample account document:
{'_id': ObjectId('5ca4bbc7a2dd94ee5816238c'), 'account_id': 371138, 'limit': 9000, 'products': ['Derivatives', 'InvestmentStock']}
  • Verificación: El código confirma tu conexión y el acceso al conjunto de datos
  • Vista previa del documento: Muestra la estructura de los documentos con los que vas a trabajar

Cuando ejecutes este código, deberías ver una salida que muestre el recuento de documentos y una vista previa de los documentos del cliente y de la cuenta. Esto confirma que tu entorno está preparado para los ejemplos de canalización de agregación que exploraremos en las próximas secciones.

Para una introducción a PyMongo y a la configuración local de MongoDB, consulta nuestro tutorial de PyMongo para principiantes.

Tuberías de agregación de MongoDB en profundidad

Ahora que hemos confirmado nuestra conexión y explorado la estructura del documento, vamos a sumergirnos en el marco de agregación de MongoDB. Esta potente función nos permite procesar y transformar datos directamente dentro de la base de datos. En esta sección, exploraremos las etapas más comunes del pipeline con ejemplos prácticos utilizando nuestro conjunto de datos de muestra.

Filtrar datos con $match

La etapa $match filtra los documentos en función de los criterios especificados. Piensa en ello como una forma de centrarte sólo en los datos que te interesan antes de realizar operaciones más complejas.

Busquemos todas las cuentas Premium con un límite superior a 9.000 $:

pipeline = [
   {"$match": {"limit": {"$gt": 9000}}}
]

premium_accounts = list(db.accounts.aggregate(pipeline))

print(f"Found {len(premium_accounts)} premium accounts")
print(premium_accounts[0])

Esta canalización utiliza la etapa $match con un operador de comparación $gt (mayor que) para filtrar las cuentas. Funciona igual que el método find(), pero en el contexto de una canalización. La consulta examina cada documento de la colección de cuentas y sólo conserva aquellos en los que el campo límite supera los 9000.

Salida:

Found 1701 premium accounts
{'_id': ObjectId('5ca4bbc7a2dd94ee5816238d'), 'account_id': 557378, 'limit': 10000, 'products': ['InvestmentStock', 'Commodity', 'Brokerage', 'CurrencyService']}

Observando los resultados, podemos ver que la tubería identificó 1.701 cuentas premium del total de 1.746 cuentas de nuestra base de datos. Este paso de filtrado estrecha nuestro enfoque, haciendo que el análisis posterior sea más eficaz y específico. En tus propios proyectos, podrías utilizar esta técnica para centrarte en los usuarios activos, las transacciones superiores a un determinado importe o los productos de una categoría específica, antes de realizar un análisis más profundo de esos documentos.

Remodelar documentos con $proyecto, $clasificar, $limitar y $saltar

Estas etapas te ayudan a controlar qué campos incluir y cómo organizar tus resultados. Aquí tienes un ejemplo que obtiene las 5 cuentas con los límites más altos, mostrando sólo la información esencial:

pipeline = [
   {"$project": {
       "_id": 0,
       "account_id": 1,
       "limit": 1,
       "product_count": {"$size": "$products"}
   }},
   {"$sort": {"limit": -1}},
   {"$limit": 5}
]

top_accounts = list(db.accounts.aggregate(pipeline))
for account in top_accounts:
   print(account)

Esta tubería tiene tres etapas que trabajan juntas:

1. La etapa $project da nueva forma a cada documento:

  • Excluyendo el campo _id (poniéndolo a 0)
  • Incluyendo los campos account_id y limit (poniéndolos a 1)
  • Crear un nuevo campo llamado product_count que utilice el operador $size (unode los muchos operadores de canalización de agregación) para contar los elementos del arreglo de productos

2. La etapa $sort ordena los resultados por el campo limit en orden descendente (-1)

3. La etapa $limit conserva sólo los 5 primeros documentos después de clasificarlos

Salida:

{'account_id': 674364, 'limit': 10000, 'product_count': 1}
{'account_id': 278603, 'limit': 10000, 'product_count': 2}
{'account_id': 383777, 'limit': 10000, 'product_count': 5}
{'account_id': 557378, 'limit': 10000, 'product_count': 4}
{'account_id': 198100, 'limit': 10000, 'product_count': 3}

El resultado muestra las cinco cuentas con los límites más altos, todos ellos de 10.000 $. También podemos ver cuántos productos tiene cada cuenta a través de nuestro campo calculado product_count.

Esto permite una presentación de datos más limpia, centrada exactamente en lo que se necesita, en lugar de devolver todos los campos. Al crear cuadros de mando o informes para tus propias aplicaciones, puedes utilizar técnicas similares para presentar sólo la información más relevante a los usuarios, reduciendo la transferencia de datos y simplificando la interfaz de usuario.

Agrupar y agregar con $group

En la etapa $group es donde realmente brilla la agregación. Puedes categorizar los documentos y calcular las métricas de cada grupo. Busquemos el límite medio de la cuenta por tipo de producto:

pipeline = [
   {"$unwind": "$products"},  # First unwind the products array
   {"$group": {
       "_id": "$products",
       "avg_limit": {"$avg": "$limit"},
       "count": {"$sum": 1}
   }},
   {"$sort": {"avg_limit": -1}}
]

product_analysis = list(db.accounts.aggregate(pipeline))
for product in product_analysis:
   print(f"Product: {product['_id']}")
   print(f"  Average limit: ${product['avg_limit']:.2f}")
   print(f"  Number of accounts: {product['count']}")

Esta tubería utiliza varios operadores para analizar los datos del producto:

  1. El operador $unwind divide los documentos con varios productos en documentos separados. Por ejemplo, una cuenta con ["Derivados", "InversiónStock"] se convierte en dos documentos, uno para cada producto.
  2. La etapa $group entonces:
  • Agrupa por nombre de producto (el campo _id de $group determina la clave de agrupación).
  • Calcula el límite medio de cada producto utilizando el acumulador $avg
  • Cuenta el número de cuentas de cada producto utilizando $sum: 1 (sumando 1 por cada documento).

3. La etapa $sort ordena los resultados por límite medio en orden descendente

Salida:

Product: Commodity
 Average limit: $9963.89
 Number of accounts: 720
Product: Brokerage
 Average limit: $9960.86
 Number of accounts: 741
Product: InvestmentStock
 Average limit: $9955.90
 Number of accounts: 1746
Product: InvestmentFund
 Average limit: $9951.92
 Number of accounts: 728
Product: Derivatives
 Average limit: $9951.84
 Number of accounts: 706
Product: CurrencyService
 Average limit: $9946.09
 Number of accounts: 742

Los resultados revelan que los productos Commodity están asociados a los límites medios de cuenta más altos, mientras que los productos CurrencyService tienen los más bajos .

Este tipo de análisis ayuda a identificar las correlaciones entre las ofertas de productos y la capacidad de gasto de los clientes. 

En tus propias aplicaciones, podrías utilizar técnicas similares para analizar las ventas por categoría, la participación de los usuarios por función o los errores por módulo: cualquier escenario en el que necesites resumir los datos en grupos en lugar de examinar registros individuales.

Unir colecciones con $lookup y $unwind

Cuando tus datos abarcan varias colecciones, $lookup te ayuda a reunirlos. Busquemos clientes con sus cuentas asociadas:

pipeline = [
   {"$match": {"username": "fmiller"}},  # Find a specific customer
   {"$lookup": {
       "from": "accounts",  # Collection to join with
       "localField": "accounts",  # Field from customers collection
       "foreignField": "account_id",  # Field from accounts collection
       "as": "account_details"  # Name for the new array field
   }},
   {"$project": {
       "name": 1,
       "accounts": 1,
       "account_details.account_id": 1,
       "account_details.limit": 1,
       "account_details.products": 1
   }}
]

customer_accounts = list(db.customers.aggregate(pipeline))

print(f"Customer: {customer_accounts[0]['name']}")
print(f"Has {len(customer_accounts[0]['account_details'])} accounts:")

for account in customer_accounts[0]['account_details']:
   print(f"  Account {account['account_id']}: ${account['limit']} limit with products: {', '.join(account['products'])}")

Este canal demuestra cómo unir datos relacionados entre colecciones:

  1. La etapa $match busca un cliente concreto por su nombre de usuario
  2. La etapa $lookup realiza una unión externa izquierda con la colección de cuentas:
  • from: especifica con qué colección unirse
  • localField: el campo de la colección actual (clientes) que debe coincidir
  • foreignField: el campo de la colección de destino (cuentas) con el que debe coincidir
  • as: el nombre del nuevo campo del arreglo que contendrá los documentos coincidentes

3. La etapa $project da forma a la salida para mostrar sólo los campos relevantes

Salida:

Customer: Elizabeth Ray
Has 6 accounts:
 Account 371138: $9000 limit with products: Derivatives, InvestmentStock
 Account 324287: $10000 limit with products: Commodity, CurrencyService, Derivatives, InvestmentStock
 Account 276528: $10000 limit with products: InvestmentFund, InvestmentStock
 Account 332179: $10000 limit with products: Commodity, CurrencyService, InvestmentFund, Brokerage, InvestmentStock
 Account 422649: $10000 limit with products: CurrencyService, InvestmentStock
 Account 387979: $10000 limit with products: Brokerage, Derivatives, InvestmentFund, Commodity, InvestmentStock

El resultado proporciona una visión completa del portafolio financiero de Elizabeth Ray, mostrando sus seis cuentas y sus productos asociados en un único resultado de consulta. 

Esto permite una visión completa de 360 grados de tus relaciones de datos sin necesidad de múltiples consultas o uniones del lado del cliente. 

Para tus propias aplicaciones, plantéate si has dividido los datos en colecciones con fines de normalización, pero necesitas volver a reunirlos para analizarlos o visualizarlos. Algunos ejemplos comunes son los perfiles de usuario con historial de actividad, los productos con estado de inventario o el contenido con comentarios relacionados.

Combinación de varias etapas para análisis complejos

Ahora, abordemos una cuestión empresarial más complicada: "¿Cuáles son los límites medios de las cuentas de los clientes, agrupados por su nivel de servicio?" Esto requiere que manejemos un objeto anidado dentro de nuestros documentos de cliente (tier_and_details ) y luego vinculemos esta información a la colección accounts.

En primer lugar, para comprender la estructura con la que estamos tratando en la colección customers, vamos a inspeccionar el campo tier_and_details y el campo accounts de un cliente de muestra. El campo tier_and_details es un objeto en el que cada clave es un identificador de una suscripción de nivel, y el valor contiene detalles como el nombre del nivel. El campo accounts es un arreglo de identificadores de cuenta asociados a ese cliente.

# First, let's look at the structure of 'tier_and_details' again
sample_customer = db.customers.find_one({"username": "fmiller"}) # Using a known customer for consistency
print("Tier structure example for customer 'fmiller':")
print(sample_customer['tier_and_details'])
print(f"Customer 'fmiller' has account IDs: {sample_customer['accounts']}")

Salida:

Tier structure example for customer 'fmiller':
{'0df078f33aa74a2e9696e0520c1a828a': {'tier': 'Bronze', 'id': '0df078f33aa74a2e9696e0520c1a828a', 'active': True, 'benefits': ['sports tickets']}, '699456451cc24f028d2aa99d7534c219': {'tier': 'Bronze', 'benefits': ['24 hour dedicated line', 'concierge services'], 'active': True, 'id': '699456451cc24f028d2aa99d7534c219'}}
Customer 'fmiller' has account IDs: [371138, 324287, 276528, 332179, 422649, 387979]
pipeline = [
   # Step 1: Project necessary fields, including 'accounts' and convert 'tier_and_details'
   {"$project": {
       "tiers_array": {"$objectToArray": "$tier_and_details"}, # Convert object to array
       "customer_account_ids": "$accounts",  # Explicitly carry over the customer's account IDs
       "_id": 1  # Keep customer _id for later
   }},
   # Step 2: Unwind the new 'tiers_array' to process each tier object separately
   {"$unwind": "$tiers_array"},
   # Step 3: Reshape to clearly define the tier and keep customer account IDs
   {"$project": {
       "tier": "$tiers_array.v.tier",  # Extract the tier name
       "customer_account_ids": 1,  # Ensure account IDs are still present
       "customer_id": "$_id"  # Rename _id to customer_id for clarity
   }},
   # Step 4: Look up account details using the customer_account_ids
   {"$lookup": {
       "from": "accounts",  # Target collection
       "localField": "customer_account_ids",  # Array of account IDs from the customer
       "foreignField": "account_id",  # Field in the 'accounts' collection
       "as": "matched_account_details"  # New array with joined account documents
   }},
   # Step 5: Unwind the 'matched_account_details' array.
   {"$unwind": "$matched_account_details"},
   # Step 6: Group by tier to calculate statistics
   {"$group": {
       "_id": "$tier",  # Group by the tier name
       "avg_limit": {"$avg": "$matched_account_details.limit"}, # Calculate average limit
       "total_accounts_in_tier": {"$sum": 1},  # Count how many accounts fall into this tier
       "unique_customers_in_tier": {"$addToSet": "$customer_id"}  # Count unique customers in this tier
   }},
   # Step 7: Format the final output
   {"$project": {
       "tier_name": "$_id",  # Rename _id to tier_name
       "average_account_limit": "$avg_limit",
       "number_of_accounts": "$total_accounts_in_tier",
       "number_of_customers": {"$size": "$unique_customers_in_tier"},  # Get the count of unique customers
       "_id": 0  # Exclude the default _id
   }},
   # Step 8: Sort by average limit
   {"$sort": {"average_account_limit": -1}}
]

tier_analysis = list(db.customers.aggregate(pipeline))

print("\nTier Analysis Results:")

for tier_data in tier_analysis:
   print(f"Tier: {tier_data['tier_name']}")
   print(f"  Average Account Limit: ${tier_data['average_account_limit']:.2f}")
   print(f"  Number of Accounts in this Tier: {tier_data['number_of_accounts']}")
   print(f"  Number of Unique Customers in this Tier: {tier_data['number_of_customers']}")

Este pipeline descompone el complejo análisis en ocho pasos manejables:

  1. $project: Empezamos transformando el objeto tier_and_details. El operador $objectToArray convierte este objeto en un arreglo de pares clave-valor (tiers_array). Esto es esencial porque etapas como $unwind funcionan con arreglos. De forma crítica, también arrastramos explícitamente el arreglo accounts del documento del cliente como customer_account_ids y el _id del cliente.
  2. $unwind: Esta etapa deconstruye el tiers_array, creando un documento distinto para cada entrada de nivel que pueda tener un cliente. Cada nuevo documento sigue conteniendo el customer_account_ids y el cliente original _id.
  3. $project: Remodelamos el documento para extraer claramente el nombre del nivel (por ejemplo, "Bronze") de la estructura anidada ($tiers_array.v.tier) y cambiamos el nombre del cliente _id por customer_id para mayor claridad. Pasa por customer_account_ids.
  4. $lookup: Aquí es donde nos unimos a la colección accounts. Utilizamos customer_account_ids (el arreglo de números de cuenta del documento del cliente) como localField. El foreignField es account_id de la colección accounts. MongoDB encontrará todas las cuentas cuyo account_id is present in the customer_account_ids array, adding them as an array to the matched_account_details field.
  5. $unwind: We unwind matched_account_details. Now, if a customer-tier combination was linked to multiple accounts, we get a separate document for each specific account, associated with that customer and tier.
  6. $group: We group the documents by tier. For each tier, we calculate the avg_limit using $avg on the limit from the joined account details. We count the total_accounts_in_tier using $sum: 1. We also use $addToSet with customer_id to collect the unique customer IDs belonging to each tier.
  7. $project: The final shaping of our output. We rename _id (which is the tier name from the group stage) to tier_name. We use $size to get the count of unique_customers_in_tier.
  8. $ordenar: We order the results by limite_promedio_cuenta` en orden descendente, para que aparezca primero el nivel con el límite medio más alto.
Tier Analysis Results:
Tier: Silver
 Average Account Limit: $9974.55
 Number of Accounts in this Tier: 393
 Number of Unique Customers in this Tier: 95
Tier: Bronze
 Average Account Limit: $9964.11
 Number of Accounts in this Tier: 418
 Number of Unique Customers in this Tier: 93
Tier: Platinum
 Average Account Limit: $9962.53
 Number of Accounts in this Tier: 427
 Number of Unique Customers in this Tier: 101
Tier: Gold
 Average Account Limit: $9962.44
 Number of Accounts in this Tier: 426
 Number of Unique Customers in this Tier: 99

Los resultados muestran ahora una jerarquía ligeramente diferente: los clientes del nivel Silver, por término medio, tienen cuentas con los límites más altos, seguidos de Bronze, luego Platinum y, por último, Gold. T

Este tipo de información es valiosa para comprender los segmentos de clientes. Por ejemplo, una entidad financiera podría utilizar esta información para investigar por qué los niveles Silver y Bronze tienen límites medios tan altos, o para adaptar las campañas de marketing u ofrecer servicios premium .

El uso de $objectToArray fue clave para desbloquear los datos de los niveles anidados, y el paso cuidadoso de customer_account_ids garantizó que nuestro $lookup pudiera conectar a los clientes con sus cuentas específicas.

Cuando encuentres objetos anidados en tus propios conjuntos de datos que necesites utilizar en agregaciones (como user preferences, product attributes, o configuration settings ), recuerda la técnica $objectToArray .

Asegúrate siempre de que los campos necesarios para las etapas posteriores, especialmente para las operaciones de $lookup, se incluyan explícitamente en tus etapas de $project. Este enfoque estructurado para desglosar datos complejos ayuda a obtener información significativa directamente en MongoDB.

Patrones avanzados de tuberías

Más allá de las transformaciones secuenciales, el marco de agregación de MongoDB ofrece patrones sofisticados para abordar consultas analíticas complejas. Cuando necesites analizar datos desde múltiples ángulos simultáneamente o realizar cálculos basados en un conjunto rodante de documentos, estos patrones avanzados proporcionan soluciones potentes directamente dentro de la base de datos.

Análisis multidimensional con $facet

La etapa $facet te permite ejecutar varias sub-líneas de agregación dentro de una misma etapa, utilizando el mismo conjunto de documentos de entrada. Imagina que necesitas clasificar los productos por rango de precios y, al mismo tiempo, hacer una lista de las 5 marcas de productos más populares de la misma colección de productos. $facet maneja estas diferentes perspectivas analíticas en paralelo.

Cada sublínea de $facet funciona de forma independiente con los documentos de entrada y produce su propio arreglo de documentos de salida. Esto significa que puedes reunir varias métricas o resúmenes -como los necesarios para un cuadro de mando completo- en una sola consulta a la base de datos. 

Por ejemplo, podrías obtener un recuento del total de usuarios activos, un desglose de los usuarios por nivel de suscripción y una lista de los usuarios que se han unido recientemente, todo ello a partir del mismo conjunto de datos de usuarios simultáneamente. Lo que esto permite es la creación de informes ricos y polifacéticos sin la sobrecarga de múltiples llamadas a la base de datos, agilizando la recuperación de datos para vistas complejas.

Análisis temporal con funciones de ventana

Introducidas en MongoDB 5.0, las funciones de ventana realizan cálculos en un conjunto de documentos relacionados con el documento actual, conocido como "ventana". 

Esto es útil para datos de series temporales o cualquier conjunto de datos ordenados en los que sea importante el contexto de los documentos vecinos. Por ejemplo, puede que quieras calcular una media móvil de 7 días de las ventas o encontrar la suma acumulada de las transacciones de cada cliente a lo largo del tiempo.

Las funciones de ventana se suelen utilizar en la etapa $setWindowFields. Esta etapa te permite definir particiones (grupos de documentos, como ventas por producto) y ordenar dentro de esas particiones (como por fecha). 

A continuación, puedes aplicar funciones de ventana como $avg, $sum, $min, $max, o funciones especializadas como $derivative o $integral sobre una ventana definida (por ejemplo, los 3 documentos anteriores y el documento actual).

Considera la posibilidad de calcular un total acumulado de ventas de productos. Una tubería que utilice $setWindowFields podría tener este aspecto:

pipeline = [
   {"$match": {"category": "Electronics"}}, # Filter for electronics
   {"$sort": {"sale_date": 1}}, # Sort by sale date
   {"$setWindowFields": {
       "partitionBy": "$product_id", # Calculate running total per product
       "sortBy": {"sale_date": 1},
       "output": {
           "running_total_sales": {
               "$sum": "$sale_amount",
               "window": {
                   "documents": ["unbounded", "current"] # Sum from start to current document
               }
           }
       }
   }}
]
# electronics_sales_with_running_total = list(db.sales.aggregate(pipeline))

En esta cadena conceptual, para cada venta electrónica se añade un campo running_total_sales. Este campo representa la suma de sale_amount para ese producto desde la venta más antigua hasta la venta actual .

Estos cálculos, que antes requerían una lógica compleja del lado del cliente o múltiples consultas, ahora se pueden hacer directamente en la base de datos, lo que simplifica el código de la aplicación y mejora el rendimiento de los análisis de tendencias o las comparaciones entre periodos. 

Conclusión

Las canalizaciones de agregación de MongoDB proporcionan un método estructurado para procesar datos directamente dentro de tu base de datos. Al enlazar distintas etapas operativas, puedes realizar manipulaciones de datos complicadas, como filtrar registros, remodelar documentos, agrupar información y combinar datos de varias colecciones. 

Este enfoque ayuda a refinar tus flujos de trabajo de datos, permitiendo una extracción más rápida del significado de grandes conjuntos de datos, como se muestra en los ejemplos de PyMongo. Procesar los datos cerca de su fuente a menudo puede mejorar el rendimiento de las consultas analíticas.

Desarrollar destreza con las canalizaciones de agregación te prepara para muchos retos de análisis de datos. Para aplicar lo que has aprendido, puedes probar un proyecto práctico como Construir una canalización de datos para el comercio minorista

Para obtener una perspectiva más amplia sobre los sistemas de datos y cómo las habilidades de MongoDB encajan en el panorama general, el programa Ingeniero de Datos con Python ofrece un aprendizaje en profundidad. Si quieres avanzar en tu carrera profesional, repasar las preguntas habituales de las entrevistas sobre MongoDB también puede ser beneficioso para comprender las expectativas típicas de la resolución de problemas.

Preguntas frecuentes sobre la canalización de agregación de MongoDB

¿Qué son las canalizaciones de agregación de MongoDB?

Las canalizaciones de agregación de MongoDB son un marco para el análisis de datos. Procesan los documentos a través de una serie de etapas, transformándolos para devolver resultados computados.

¿Cómo mejoran las canalizaciones de agregación el procesamiento de datos?

Permiten realizar transformaciones y análisis de datos complejos directamente dentro de MongoDB, reduciendo la transferencia de datos y procesándolos más cerca de su origen para mejorar el rendimiento.

¿Cuáles son las etapas habituales en una canalización de agregación de MongoDB?

Las etapas comunes incluyen $match para filtrar, $project para remodelar documentos, $group para agregar datos y $lookup para unir colecciones.

¿Puedo utilizar Python con los pipelines de agregación de MongoDB?

Sí, este tutorial muestra cómo construir y ejecutar canalizaciones de agregación de MongoDB utilizando PyMongo, el controlador oficial de Python para MongoDB.

¿Por qué es importante el orden de las etapas en los conductos de agregación?

El orden escénico es importante para el rendimiento. Filtrar los datos al principio con $match reduce el volumen de documentos procesados por etapas posteriores más complejas.


Bex Tuychiev's photo
Author
Bex Tuychiev
LinkedIn

Soy un creador de contenidos de ciencia de datos con más de 2 años de experiencia y uno de los mayores seguidores en Medium. Me gusta escribir artículos detallados sobre IA y ML con un estilo un poco sarcastıc, porque hay que hacer algo para que sean un poco menos aburridos. He publicado más de 130 artículos y un curso DataCamp, y estoy preparando otro. Mi contenido ha sido visto por más de 5 millones de ojos, 20.000 de los cuales se convirtieron en seguidores tanto en Medium como en LinkedIn. 

Temas

Los mejores cursos de DataCamp

Programa

Data Engineer in Python

0 min
Gain in-demand skills to efficiently ingest, clean, manage data, and schedule and monitor pipelines, setting you apart in the data engineering field.
Ver detallesRight Arrow
Comienza el curso
Ver másRight Arrow
Relacionado

Tutorial

Multiprocesamiento en Python: Guía de hilos y procesos

Aprende a gestionar hilos y procesos con el módulo de multiprocesamiento de Python. Descubre las técnicas clave de la programación paralela. Mejora la eficacia de tu código con ejemplos.
Kurtis Pykes 's photo

Kurtis Pykes

7 min

Tutorial

Tutorial de tuberías en R para principiantes

Aprenda más sobre el famoso operador de tuberías %>% y otras tuberías en R, por qué y cómo debe utilizarlas y qué alternativas puede considerar.
Karlijn Willems's photo

Karlijn Willems

15 min

Tutorial

Tutorial de FastAPI: Introducción al uso de FastAPI

Explore el marco FastAPI y descubra cómo puede utilizarlo para crear API en Python.
Moez Ali's photo

Moez Ali

13 min

Tutorial

Tutorial de unión de DataFrames en pandas

En este tutorial, usted aprenderá varias maneras en las que múltiples DataFrames pueden ser fusionados en python usando la librería Pandas.
DataCamp Team's photo

DataCamp Team

13 min

Tutorial

Tutorial de visualización de datos con Python y Tableau

Aprende a utilizar Python para ampliar las funciones de visualización de datos de Tableau.
Abid Ali Awan's photo

Abid Ali Awan

15 min

SQLAlchemy_Tutorial.

Tutorial

Tutorial de SQLAlchemy con ejemplos

Aprende a acceder y ejecutar consultas SQL en todo tipo de bases de datos relacionales utilizando objetos Python.
Abid Ali Awan's photo

Abid Ali Awan

13 min

Ver másVer más