CAPÍTULO 9: Infraestructura Big Data
Ecosistema Completo
La infraestructura Big Data comprende un conjunto de tecnologías distribuidas diseñadas para almacenar, procesar y analizar volúmenes masivos de datos de manera eficiente, escalable y tolerante a fallos.
9.1. Ecosistema Hadoop
Apache Hadoop
Hadoop es el framework de código abierto más establecido para procesamiento distribuido de grandes conjuntos de datos usando el modelo MapReduce y almacenamiento HDFS.
Componentes Core de Hadoop
graph TD
A[Apache Hadoop Ecosystem] --> B[HDFS<br/>Almacenamiento]
A --> C[YARN<br/>Gestión Recursos]
A --> D[MapReduce<br/>Procesamiento]
B --> E[NameNode<br/>Metadata]
B --> F[DataNodes<br/>Bloques]
C --> G[ResourceManager]
C --> H[NodeManager]
D --> I[JobTracker]
D --> J[TaskTracker]
style A fill:#FFD93D
style B fill:#6BCB77
style C fill:#4D96FF
style D fill:#FF6B6B
HDFS (Hadoop Distributed File System)
Arquitectura Master-Slave:
| Componente |
Rol |
Responsabilidades |
| NameNode |
Master |
Gestiona metadata, namespace, bloques |
| DataNode |
Slave |
Almacena bloques de datos reales |
| Secondary NameNode |
Checkpoint |
Backups periódicos del NameNode |
Características clave:
- ✅ Replicación: Cada bloque se replica 3 veces (configurable)
- ✅ Bloques grandes: 128 MB por defecto (vs 4 KB en FS tradicionales)
- ✅ Write-once, read-many: Optimizado para lecturas masivas
- ✅ Tolerancia a fallos: Detección automática y re-replicación
- ✅ Localidad de datos: Procesamiento cerca de donde están los datos
Ejemplo: Operaciones HDFS básicas
| # === COMANDOS HDFS ===
# 1. Listar archivos en HDFS
hdfs dfs -ls /user/hadoop/
# 2. Crear directorio
hdfs dfs -mkdir -p /data/raw/2024
# 3. Subir archivo local a HDFS
hdfs dfs -put local_file.csv /data/raw/2024/
# 4. Descargar archivo de HDFS
hdfs dfs -get /data/raw/2024/file.csv ./local_file.csv
# 5. Ver contenido de archivo
hdfs dfs -cat /data/raw/2024/file.csv | head -10
# 6. Ver información de replicación
hdfs fsck /data/raw/2024/file.csv -files -blocks -locations
# 7. Cambiar factor de replicación
hdfs dfs -setrep -w 5 /data/important/critical.csv
# 8. Ver espacio usado
hdfs dfs -du -h /data/
# 9. Eliminar archivo (ir a trash)
hdfs dfs -rm /data/temp/old_file.csv
# 10. Eliminar permanentemente (skip trash)
hdfs dfs -rm -r -skipTrash /data/temp/
|
Ejemplo Python: Interactuar con HDFS
| # === PYTHON + HDFS ===
from hdfs import InsecureClient
import pandas as pd
from io import StringIO
# 1. Conectar a HDFS (WebHDFS)
client = InsecureClient('http://namenode:50070', user='hadoop')
# 2. Listar archivos
files = client.list('/data/raw/')
print(f"Archivos en /data/raw/: {files}")
# 3. Subir archivo
with open('local_data.csv', 'r') as f:
client.write('/data/raw/local_data.csv', f)
# 4. Leer archivo como DataFrame
with client.read('/data/raw/data.csv', encoding='utf-8') as reader:
content = reader.read()
df = pd.read_csv(StringIO(content))
print(f"DataFrame loaded: {df.shape}")
print(df.head())
# 5. Subir DataFrame directamente
df_output = pd.DataFrame({
'id': range(1000),
'value': range(1000, 2000)
})
csv_string = df_output.to_csv(index=False)
client.write('/data/processed/output.csv', csv_string, encoding='utf-8')
# 6. Verificar si archivo existe
if client.status('/data/raw/data.csv', strict=False):
print("✅ Archivo existe")
# Obtener metadata
status = client.status('/data/raw/data.csv')
print(f" Tamaño: {status['length'] / 1024 / 1024:.2f} MB")
print(f" Replicación: {status['replication']}")
print(f" Modificado: {status['modificationTime']}")
# 7. Eliminar archivo
client.delete('/data/temp/old_file.csv')
# Output ejemplo:
# Archivos en /data/raw/: ['data.csv', 'local_data.csv', 'transactions.parquet']
# DataFrame loaded: (50000, 10)
# ✅ Archivo existe
# Tamaño: 125.34 MB
# Replicación: 3
# Modificado: 1708435200000
|
Arquitectura HDFS: Lectura de Archivos
sequenceDiagram
participant Client
participant NameNode
participant DataNode1
participant DataNode2
Client->>NameNode: 1. Solicitar lectura file.txt
NameNode->>Client: 2. Devolver ubicaciones bloques<br/>(DN1: bloque1, DN2: bloque2)
Client->>DataNode1: 3. Leer bloque1
DataNode1->>Client: 4. Enviar bloque1
Client->>DataNode2: 5. Leer bloque2
DataNode2->>Client: 6. Enviar bloque2
Client->>Client: 7. Ensamblar archivo completo
MapReduce: Modelo de Programación
Concepto:
MapReduce es un modelo de programación para procesamiento distribuido de grandes datasets en clusters.
Fases:
- Map: Procesa entrada y emite pares clave-valor
- Shuffle & Sort: Agrupa valores por clave
- Reduce: Agrega valores para cada clave
Ejemplo Clásico: WordCount
| # === MapReduce en Python (mrjob) ===
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
"""
MapReduce para contar palabras en documentos
"""
def mapper(self, _, line):
"""
FASE MAP: Emitir (palabra, 1) por cada palabra
Input: línea de texto
Output: (palabra, 1)
"""
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def reducer(self, word, counts):
"""
FASE REDUCE: Sumar todos los counts por palabra
Input: (palabra, [1, 1, 1, ...])
Output: (palabra, total)
"""
yield (word, sum(counts))
if __name__ == '__main__':
MRWordCount.run()
# Ejecutar:
# python wordcount.py input.txt -o output/
# python wordcount.py hdfs:///data/books/*.txt -r hadoop -o hdfs:///results/wordcount/
# Output ejemplo:
# "hadoop" 1523
# "data" 2891
# "processing" 876
|
Ejemplo Avanzado: Join de Datasets
| # === MapReduce: Join de Clientes y Pedidos ===
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRJoinCustomersOrders(MRJob):
"""
Join entre customers.csv y orders.csv usando MapReduce
"""
def steps(self):
return [
MRStep(mapper=self.mapper_join,
reducer=self.reducer_join)
]
def mapper_join(self, _, line):
"""
MAP: Emitir (customer_id, (tipo, data))
customers.csv: C|123|John Doe|john@email.com
orders.csv: O|123|456.78|2024-01-15
"""
parts = line.strip().split('|')
if parts[0] == 'C': # Customer
customer_id = parts[1]
customer_data = {
'name': parts[2],
'email': parts[3]
}
yield (customer_id, ('customer', customer_data))
elif parts[0] == 'O': # Order
customer_id = parts[1]
order_data = {
'amount': float(parts[2]),
'date': parts[3]
}
yield (customer_id, ('order', order_data))
def reducer_join(self, customer_id, values):
"""
REDUCE: Combinar customer con sus orders
"""
customer = None
orders = []
for value_type, data in values:
if value_type == 'customer':
customer = data
elif value_type == 'order':
orders.append(data)
if customer and orders:
total_spent = sum(o['amount'] for o in orders)
yield (customer_id, {
'customer': customer,
'num_orders': len(orders),
'total_spent': total_spent,
'orders': orders
})
# Output ejemplo por customer_id:
# "123" -> {
# "customer": {"name": "John Doe", "email": "john@email.com"},
# "num_orders": 5,
# "total_spent": 2894.50,
# "orders": [...]
# }
|
YARN (Yet Another Resource Negotiator)
Arquitectura:
graph TD
A[YARN Cluster] --> B[ResourceManager<br/>Global]
A --> C[NodeManager<br/>Por nodo]
B --> D[Scheduler<br/>Asigna recursos]
B --> E[ApplicationsManager<br/>Gestiona apps]
C --> F[Container 1<br/>App Master]
C --> G[Container 2<br/>Task]
C --> H[Container 3<br/>Task]
style A fill:#FFD93D
style B fill:#FF6B6B
style C fill:#4D96FF
Componentes:
| Componente |
Función |
| ResourceManager |
Gestiona recursos del cluster completo |
| NodeManager |
Gestiona recursos en cada nodo |
| ApplicationMaster |
Coordina ejecución de una aplicación específica |
| Container |
Unidad de recursos (CPU, RAM) asignada a una tarea |
Hadoop Ecosystem: Herramientas Adicionales
| Herramienta |
Propósito |
Descripción |
| Hive |
SQL sobre Hadoop |
Motor SQL que traduce queries a MapReduce/Tez/Spark |
| Pig |
Scripting |
Lenguaje de alto nivel (Pig Latin) para ETL |
| HBase |
NoSQL DB |
Base de datos columnar distribuida sobre HDFS |
| Sqoop |
Ingesta |
Importar/exportar datos entre RDBMS y Hadoop |
| Flume |
Streaming |
Recolección y agregación de logs en tiempo real |
| Oozie |
Workflow |
Orquestación de jobs MapReduce/Pig/Hive |
| Zookeeper |
Coordinación |
Servicio de coordinación distribuida |
Ejemplo: Hive Query
| -- === APACHE HIVE: SQL sobre Hadoop ===
-- 1. Crear tabla externa sobre datos HDFS
CREATE EXTERNAL TABLE IF NOT EXISTS sales (
transaction_id BIGINT,
customer_id INT,
product_id INT,
quantity INT,
amount DECIMAL(10,2),
transaction_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/data/raw/sales/';
-- 2. Crear tabla particionada (mejor performance)
CREATE TABLE sales_partitioned (
transaction_id BIGINT,
customer_id INT,
product_id INT,
quantity INT,
amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC -- Formato columnar optimizado
TBLPROPERTIES ("orc.compress"="SNAPPY");
-- 3. Insertar datos con particiones
INSERT INTO sales_partitioned PARTITION(year=2024, month=1)
SELECT transaction_id, customer_id, product_id, quantity, amount
FROM sales
WHERE YEAR(transaction_date) = 2024 AND MONTH(transaction_date) = 1;
-- 4. Query analítico sobre datos particionados
SELECT
year,
month,
COUNT(*) as num_transactions,
SUM(amount) as total_revenue,
AVG(amount) as avg_transaction,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales_partitioned
WHERE year = 2024
GROUP BY year, month
ORDER BY month;
-- 5. Join con dimensión de productos
SELECT
p.category,
p.product_name,
SUM(s.quantity) as units_sold,
SUM(s.amount) as revenue
FROM sales_partitioned s
JOIN products p ON s.product_id = p.id
WHERE s.year = 2024 AND s.month = 1
GROUP BY p.category, p.product_name
ORDER BY revenue DESC
LIMIT 10;
|
9.2. Apache Spark
Spark: Procesamiento In-Memory
Apache Spark es el framework de procesamiento distribuido de nueva generación, hasta 100x más rápido que MapReduce gracias al procesamiento en memoria (RAM).
Arquitectura de Spark
graph TD
A[Spark Application] --> B[Driver Program<br/>SparkContext]
B --> C[Cluster Manager<br/>YARN/Mesos/K8s]
C --> D[Worker Node 1]
C --> E[Worker Node 2]
C --> F[Worker Node N]
D --> G[Executor 1<br/>Cache<br/>Tasks]
E --> H[Executor 2<br/>Cache<br/>Tasks]
F --> I[Executor N<br/>Cache<br/>Tasks]
style A fill:#E25822
style B fill:#3E4E88
style C fill:#62D2A2
style D fill:#FFD93D
style E fill:#FFD93D
style F fill:#FFD93D
Componentes:
- Driver: Ejecuta función
main(), crea SparkContext, convierte código a DAG
- Cluster Manager: Asigna recursos (YARN, Mesos, Kubernetes, Standalone)
- Executors: Procesos JVM en workers que ejecutan tareas y cachean datos
- Tasks: Unidades de trabajo enviadas a executors
RDD vs DataFrame vs Dataset
| Característica |
RDD |
DataFrame |
Dataset |
| Abstracción |
Colección distribuida inmutable |
Datos estructurados en columnas |
Tipo tipado + DataFrame |
| Optimización |
Manual |
Catalyst optimizer |
Catalyst optimizer |
| Type-safety |
❌ Runtime |
❌ Runtime |
✅ Compile-time |
| Performance |
Más lento |
Rápido |
Rápido |
| API |
Transformaciones funcionales |
SQL-like |
Ambos |
| Cuándo usar |
Transformaciones complejas |
Análisis estructurado estándar |
ML pipelines, tipo tipado |
Spark: Ejemplo Completo de Análisis
| # === APACHE SPARK: Análisis Completo ===
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
# 1. Crear SparkSession
spark = SparkSession.builder \
.appName("E-commerce Analysis") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.default.parallelism", "200") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
print(f"✅ Spark {spark.version} iniciado")
print(f" Master: {spark.sparkContext.master}")
print(f" Executors: {spark.sparkContext.defaultParallelism}")
# 2. Leer datos desde HDFS (varios formatos)
df_sales = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("hdfs:///data/raw/sales/*.csv")
df_customers = spark.read \
.parquet("hdfs:///data/processed/customers")
df_products = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/products") \
.option("dbtable", "products") \
.option("user", "admin") \
.option("password", "pass") \
.load()
print(f"\n📊 Datos cargados:")
print(f" Sales: {df_sales.count():,} registros")
print(f" Customers: {df_customers.count():,} registros")
print(f" Products: {df_products.count():,} registros")
# 3. Data Quality: Verificar calidad
print("\n🔍 Verificación de Calidad:")
print(" Sales - Valores nulos por columna:")
df_sales.select([
F.sum(F.col(c).isNull().cast("int")).alias(c)
for c in df_sales.columns
]).show()
# 4. Transformaciones: Limpieza y Enriquecimiento
df_sales_clean = df_sales \
.filter(F.col("amount") > 0) \
.filter(F.col("quantity") > 0) \
.filter(F.col("transaction_date").isNotNull()) \
.withColumn("transaction_date", F.to_date("transaction_date")) \
.withColumn("year", F.year("transaction_date")) \
.withColumn("month", F.month("transaction_date")) \
.withColumn("day_of_week", F.dayofweek("transaction_date"))
# 5. Joins: Enriquecer con dimensiones
df_enriched = df_sales_clean \
.join(df_customers, "customer_id", "left") \
.join(df_products, "product_id", "left")
# 6. Agregaciones: Métricas de negocio
df_monthly_metrics = df_enriched \
.groupBy("year", "month") \
.agg(
F.count("transaction_id").alias("num_transactions"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_transaction_value"),
F.countDistinct("customer_id").alias("unique_customers"),
F.countDistinct("product_id").alias("unique_products")
) \
.orderBy("year", "month")
print("\n📈 Métricas Mensuales:")
df_monthly_metrics.show(12, truncate=False)
# 7. Window Functions: Análisis avanzado
window_spec = Window.partitionBy("customer_id").orderBy("transaction_date")
df_customer_analysis = df_enriched \
.withColumn("transaction_number", F.row_number().over(window_spec)) \
.withColumn("cumulative_spent", F.sum("amount").over(
window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)) \
.withColumn("days_since_last_purchase",
F.datediff(F.col("transaction_date"),
F.lag("transaction_date", 1).over(window_spec)))
# 8. Detección de Churn: Clientes inactivos
from pyspark.sql import functions as F
from datetime import datetime, timedelta
max_date = df_enriched.agg(F.max("transaction_date")).collect()[0][0]
churn_threshold_days = 90
df_churn_risk = df_enriched \
.groupBy("customer_id") \
.agg(
F.max("transaction_date").alias("last_purchase"),
F.count("transaction_id").alias("total_purchases"),
F.sum("amount").alias("lifetime_value")
) \
.withColumn("days_inactive",
F.datediff(F.lit(max_date), F.col("last_purchase"))) \
.withColumn("churn_risk",
F.when(F.col("days_inactive") > churn_threshold_days, "High")
.when(F.col("days_inactive") > 60, "Medium")
.otherwise("Low"))
print(f"\n⚠️ Análisis de Churn:")
df_churn_risk.groupBy("churn_risk").count().show()
# 9. RFM Segmentation: Recency, Frequency, Monetary
df_rfm = df_enriched \
.groupBy("customer_id") \
.agg(
F.max("transaction_date").alias("last_purchase"),
F.count("transaction_id").alias("frequency"),
F.sum("amount").alias("monetary")
) \
.withColumn("recency_days",
F.datediff(F.lit(max_date), F.col("last_purchase")))
# Calcular cuartiles para RFM scores
quantiles = df_rfm.approxQuantile("recency_days", [0.25, 0.5, 0.75], 0.01)
df_rfm_scored = df_rfm \
.withColumn("R_score",
F.when(F.col("recency_days") <= quantiles[0], 4)
.when(F.col("recency_days") <= quantiles[1], 3)
.when(F.col("recency_days") <= quantiles[2], 2)
.otherwise(1)) \
.withColumn("F_score",
F.ntile(4).over(Window.orderBy("frequency"))) \
.withColumn("M_score",
F.ntile(4).over(Window.orderBy("monetary"))) \
.withColumn("RFM_segment",
F.concat(F.col("R_score"), F.col("F_score"), F.col("M_score")))
# Clientes "Champions": RFM = 444
champions = df_rfm_scored.filter(F.col("RFM_segment") == "444")
print(f"\n🏆 Champions (RFM=444): {champions.count():,} clientes")
# 10. Guardar resultados en múltiples formatos
# Parquet particionado (para queries rápidos)
df_enriched.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("hdfs:///data/processed/sales_enriched")
# CSV para reportes
df_monthly_metrics.write \
.mode("overwrite") \
.option("header", "true") \
.csv("hdfs:///data/reports/monthly_metrics")
# Delta Lake (ACID transactions)
df_customer_analysis.write \
.format("delta") \
.mode("overwrite") \
.save("hdfs:///data/delta/customer_analysis")
# 11. Cache para queries repetidos
df_enriched.cache()
print(f"\n💾 DataFrame cacheado en memoria: {df_enriched.storageLevel}")
# 12. Mostrar estadísticas de ejecución
print("\n📊 Estadísticas de Spark:")
print(f" Particiones: {df_enriched.rdd.getNumPartitions()}")
print(f" Memoria usada: {spark.sparkContext._jsc.sc().getExecutorMemoryStatus()}")
spark.stop()
# Output ejemplo:
# ✅ Spark 3.5.0 iniciado
# Master: yarn
# Executors: 200
#
# 📊 Datos cargados:
# Sales: 5,234,891 registros
# Customers: 125,340 registros
# Products: 8,456 registros
#
# 📈 Métricas Mensuales:
# +----+-----+------------------+---------------+---------------------+-----------------+-------------------+
# |year|month|num_transactions |total_revenue |avg_transaction_value|unique_customers |unique_products |
# +----+-----+------------------+---------------+---------------------+-----------------+-------------------+
# |2024|1 |432156 |12543876.50 |29.03 |45231 |3456 |
# |2024|2 |456789 |13234567.80 |28.97 |47892 |3512 |
# ...
#
# ⚠️ Análisis de Churn:
# +----------+------+
# |churn_risk|count |
# +----------+------+
# |High |12534 |
# |Medium |8765 |
# |Low |104041|
# +----------+------+
#
# 🏆 Champions (RFM=444): 3,456 clientes
|
Spark MLlib: Machine Learning a Escala
| # === SPARK MLlib: Pipeline de ML ===
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# 1. Preparar features
feature_cols = ['recency_days', 'frequency', 'monetary', 'avg_transaction_value']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
# 2. Preparar target (churn: 1 si inactivo >90 días)
df_ml = df_rfm.withColumn("churn",
(F.col("recency_days") > 90).cast("int"))
indexer = StringIndexer(inputCol="churn", outputCol="label")
# 3. Modelo: Random Forest
rf = RandomForestClassifier(featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=10)
# 4. Pipeline completo
pipeline = Pipeline(stages=[assembler, scaler, indexer, rf])
# 5. Split train/test
train, test = df_ml.randomSplit([0.8, 0.2], seed=42)
# 6. Entrenar modelo
model = pipeline.fit(train)
# 7. Predicciones
predictions = model.transform(test)
# 8. Evaluación
evaluator = BinaryClassificationEvaluator(labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"🎯 AUC-ROC: {auc:.4f}")
# 9. Feature Importance
rf_model = model.stages[-1]
importances = rf_model.featureImportances.toArray()
for feature, importance in zip(feature_cols, importances):
print(f" {feature}: {importance:.4f}")
# 10. Guardar modelo
model.write().overwrite().save("hdfs:///models/churn_predictor")
# Output:
# 🎯 AUC-ROC: 0.8923
# recency_days: 0.4521
# frequency: 0.2834
# monetary: 0.1876
# avg_transaction_value: 0.0769
|
9.3. Procesamiento: Batch vs Streaming
Comparación
| Aspecto |
Batch Processing |
Stream Processing |
| Latencia |
Minutos a horas |
Milisegundos a segundos |
| Volumen |
Grandes volúmenes completos |
Registros individuales o micro-batches |
| Complejidad |
Más simple |
Más complejo (windowing, watermarks) |
| Casos de uso |
ETL nightly, reportes históricos |
Detección fraude real-time, IoT |
| Tecnologías |
Hadoop MapReduce, Spark Batch |
Kafka Streams, Flink, Spark Streaming |
| Datos |
Datos completos (bounded) |
Datos infinitos (unbounded) |
| Reprocesamiento |
Fácil (re-run job) |
Complejo (state management) |
graph LR
subgraph Batch
A[Datos<br/>Acumulados] --> B[Procesar<br/>Todo]
B --> C[Resultados<br/>Completos]
end
subgraph Streaming
D[Stream<br/>Continuo] --> E[Procesar<br/>Incremental]
E --> F[Resultados<br/>Tiempo Real]
E --> E
end
style A fill:#BBE1FA
style B fill:#3282B8
style C fill:#0F4C75
style D fill:#FFE5B4
style E fill:#FF9A76
style F fill:#F24C4C
Spark Structured Streaming
Concepto: Streaming como tabla infinita que crece continuamente
| # === SPARK STRUCTURED STREAMING ===
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Real-Time Analytics") \
.getOrCreate()
# 1. Definir schema de eventos
events_schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", IntegerType(), True),
StructField("event_type", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
# 2. Leer stream desde Kafka
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# 3. Parsear mensajes JSON
df_events = df_stream \
.select(F.from_json(F.col("value").cast("string"),
events_schema).alias("data")) \
.select("data.*")
# 4. Watermark para datos tardíos (late data)
# Procesar eventos hasta 1 hora tarde
df_events_watermarked = df_events \
.withWatermark("timestamp", "1 hour")
# 5. Agregaciones por ventanas temporales
df_aggregated = df_events_watermarked \
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"), # ventana 5 min, avance 1 min
"event_type"
) \
.agg(
F.count("event_id").alias("num_events"),
F.sum("amount").alias("total_amount"),
F.countDistinct("user_id").alias("unique_users")
)
# 6. Detección de anomalías en streaming
df_anomalies = df_aggregated \
.filter(
(F.col("total_amount") > 100000) | # Spike de ingresos
(F.col("num_events") > 10000) # Spike de eventos
) \
.withColumn("alert_type", F.lit("SPIKE_DETECTED"))
# 7. Escribir resultados a múltiples sinks
# Sink 1: Console (para debugging)
query_console = df_aggregated.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
# Sink 2: Parquet en HDFS (append)
query_hdfs = df_aggregated.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "hdfs:///data/streaming/aggregated") \
.option("checkpointLocation", "hdfs:///checkpoints/aggregated") \
.partitionBy("window") \
.start()
# Sink 3: Kafka (para alertas)
query_kafka_alerts = df_anomalies \
.select(F.to_json(F.struct("*")).alias("value")) \
.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "alerts") \
.option("checkpointLocation", "hdfs:///checkpoints/alerts") \
.start()
# Sink 4: Delta Lake (ACID streaming)
query_delta = df_events.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "hdfs:///data/delta/events") \
.option("checkpointLocation", "hdfs:///checkpoints/events") \
.start()
# 8. Monitorear progreso del streaming
import time
print("🔴 Streaming iniciado...")
time.sleep(60) # Esperar 1 minuto
for query in spark.streams.active:
print(f"\n📊 Query: {query.name}")
print(f" Status: {query.status}")
print(f" Progress: {query.lastProgress}")
# 9. Esperar terminación (o Ctrl+C)
query_console.awaitTermination()
# Output en console cada minuto:
# +------------------------------------------+-----------+----------+------------+------------+
# |window |event_type |num_events|total_amount|unique_users|
# +------------------------------------------+-----------+----------+------------+------------+
# |{2024-02-20 11:30:00, 2024-02-20 11:35:00}|purchase |1523 |45678.90 |892 |
# |{2024-02-20 11:30:00, 2024-02-20 11:35:00}|click |8934 |0.00 |3421 |
# |{2024-02-20 11:31:00, 2024-02-20 11:36:00}|purchase |1687 |52341.20 |945 |
# ...
|
9.4. Apache Kafka
Kafka: Plataforma de Streaming Distribuida
Apache Kafka es una plataforma de streaming distribuida para publicar, suscribirse, almacenar y procesar streams de eventos en tiempo real.
Arquitectura Kafka
graph LR
A[Producers<br/>Apps, Logs, IoT] --> B[Kafka Cluster]
B --> C[Topic 1<br/>Partition 0<br/>Partition 1<br/>Partition 2]
B --> D[Topic 2<br/>Partition 0<br/>Partition 1]
C --> E[Consumer Group 1<br/>Consumer A<br/>Consumer B]
D --> F[Consumer Group 2<br/>Consumer C]
G[Zookeeper] -.coordina.-> B
style A fill:#FFD93D
style B fill:#6BCB77
style E fill:#4D96FF
style F fill:#4D96FF
style G fill:#FF6B6B
Conceptos clave:
- Topic: Categoría o feed de mensajes (similar a tabla en DB)
- Partition: Subdivisión ordenada e immutable de un topic
- Producer: Publica mensajes a topics
- Consumer: Se suscribe a topics y procesa mensajes
- Consumer Group: Grupo de consumers que trabajan juntos (load balancing)
- Broker: Servidor Kafka que almacena datos
- Offset: Posición incremental de mensaje en partition
Ejemplo: Producer y Consumer
| # === KAFKA PRODUCER ===
from kafka import KafkaProducer
import json
import time
from datetime import datetime
import random
# 1. Crear producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8'),
acks='all', # Esperar confirmación de todos los replicas
retries=3
)
print("🚀 Kafka Producer iniciado")
# 2. Producir eventos
for i in range(1000):
event = {
'event_id': f'evt_{i}',
'user_id': random.randint(1, 10000),
'event_type': random.choice(['click', 'purchase', 'view', 'cart_add']),
'product_id': random.randint(1, 1000),
'amount': round(random.uniform(10, 500), 2) if random.random() > 0.7 else 0,
'timestamp': datetime.now().isoformat()
}
# Enviar mensaje (async)
future = producer.send(
topic='user-events',
key=event['user_id'], # Partitioning por user_id
value=event
)
# Callback para confirmación
try:
record_metadata = future.get(timeout=10)
if i % 100 == 0:
print(f"✅ Mensaje {i} enviado:")
print(f" Topic: {record_metadata.topic}")
print(f" Partition: {record_metadata.partition}")
print(f" Offset: {record_metadata.offset}")
except Exception as e:
print(f"❌ Error enviando mensaje: {e}")
time.sleep(0.01) # 100 eventos/segundo
# 3. Flush y cerrar
producer.flush()
producer.close()
print("✅ Producer finalizado")
|
| # === KAFKA CONSUMER ===
from kafka import KafkaConsumer
import json
# 1. Crear consumer con group
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='analytics-consumer-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest', # Leer desde inicio si no hay offset guardado
enable_auto_commit=True, # Auto-commit offsets
max_poll_records=500
)
print("🔵 Kafka Consumer iniciado")
print(f" Suscrito a: {consumer.subscription()}")
print(f" Group ID: {consumer.config['group_id']}")
# 2. Consumir mensajes
purchases_count = 0
total_revenue = 0
try:
for message in consumer:
event = message.value
# Procesar evento
if event['event_type'] == 'purchase' and event['amount'] > 0:
purchases_count += 1
total_revenue += event['amount']
if purchases_count % 10 == 0:
print(f"\n📊 Estadísticas actuales:")
print(f" Compras procesadas: {purchases_count:,}")
print(f" Revenue acumulado: ${total_revenue:,.2f}")
print(f" Ticket promedio: ${total_revenue/purchases_count:.2f}")
print(f" Offset actual: {message.offset}")
print(f" Partition: {message.partition}")
except KeyboardInterrupt:
print("\n⏸️ Consumer detenido por usuario")
finally:
consumer.close()
print("✅ Consumer cerrado")
# Output:
# 🔵 Kafka Consumer iniciado
# Suscrito a: {'user-events'}
# Group ID: analytics-consumer-group
#
# 📊 Estadísticas actuales:
# Compras procesadas: 10
# Revenue acumulado: $2,345.67
# Ticket promedio: $234.57
# Offset actual: 156
# Partition: 2
# ...
|
9.5. Apache Airflow: Orquestación de Workflows
Airflow: Platform to Programmatically Author, Schedule and Monitor Workflows
Apache Airflow permite definir workflows complejos como código (DAGs), con scheduling, monitoreo y manejo de dependencias.
Anatomía de un DAG
| # === AIRFLOW DAG: ETL Completo ===
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# Funciones Python para tareas
def extract_from_api(**context):
"""Extraer datos de API externa"""
import requests
import json
response = requests.get('https://api.example.com/data')
data = response.json()
# Guardar en HDFS
with open('/tmp/api_data.json', 'w') as f:
json.dump(data, f)
print(f"✅ Extraídos {len(data)} registros de API")
# Guardar metadata en XCom para siguientes tareas
context['task_instance'].xcom_push(key='num_records', value=len(data))
def data_quality_check(**context):
"""Validar calidad de datos"""
import pandas as pd
df = pd.read_parquet('/data/processed/output.parquet')
# Checks
assert df.shape[0] > 0, "❌ DataFrame vacío"
assert df.isnull().sum().sum() < df.shape[0] * 0.05, "❌ >5% valores nulos"
assert df.duplicated().sum() == 0, "❌ Duplicados detectados"
print(f"✅ Quality checks passed: {df.shape[0]:,} registros válidos")
def send_success_notification(**context):
"""Enviar notificación de éxito"""
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
num_records = context['task_instance'].xcom_pull(
task_ids='extract_api',
key='num_records'
)
message = f"""
✅ *ETL Pipeline Completado*
• Registros procesados: {num_records:,}
• Duración: {context['task_instance'].duration}s
• Fecha: {context['execution_date']}
"""
print(message)
# Definir DAG
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='etl_sales_pipeline',
default_args=default_args,
description='ETL completo de datos de ventas',
schedule_interval='0 2 * * *', # Diario a las 2 AM
start_date=days_ago(1),
catchup=False,
tags=['etl', 'sales', 'production'],
) as dag:
# TASK 1: Extraer de API
task_extract_api = PythonOperator(
task_id='extract_api',
python_callable=extract_from_api,
provide_context=True
)
# TASK 2: Extraer de base de datos a HDFS
task_extract_db = BashOperator(
task_id='extract_database',
bash_command="""
sqoop import \
--connect jdbc:postgresql://db:5432/sales \
--username admin \
--password $DB_PASSWORD \
--table sales_transactions \
--target-dir /data/raw/sales/{{ ds }} \
--as-parquetfile \
--where "transaction_date = '{{ ds }}'"
"""
)
# TASK 3: Procesar con Spark
task_spark_processing = SparkSubmitOperator(
task_id='spark_processing',
application='/opt/airflow/dags/scripts/process_sales.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.dynamicAllocation.enabled': 'true'
},
application_args=[
'--input', '/data/raw/sales/{{ ds }}',
'--output', '/data/processed/sales/{{ ds }}'
]
)
# TASK 4: Quality checks
task_quality_check = PythonOperator(
task_id='quality_check',
python_callable=data_quality_check,
provide_context=True
)
# TASK 5: Cargar a Data Warehouse
task_load_dwh = PostgresOperator(
task_id='load_to_dwh',
postgres_conn_id='postgres_dwh',
sql="""
INSERT INTO dwh.fact_sales
SELECT * FROM staging.sales_{{ ds }}
WHERE NOT EXISTS (
SELECT 1 FROM dwh.fact_sales f
WHERE f.transaction_id = staging.sales_{{ ds }}.transaction_id
);
"""
)
# TASK 6: Actualizar métricas agregadas
task_update_metrics = PostgresOperator(
task_id='update_metrics',
postgres_conn_id='postgres_dwh',
sql='CALL dwh.refresh_sales_metrics(\'{{ ds }}\');'
)
# TASK 7: Notificación de éxito
task_notify = PythonOperator(
task_id='send_notification',
python_callable=send_success_notification,
provide_context=True
)
# Definir dependencias (DAG structure)
[task_extract_api, task_extract_db] >> task_spark_processing
task_spark_processing >> task_quality_check
task_quality_check >> task_load_dwh
task_load_dwh >> task_update_metrics
task_update_metrics >> task_notify
# Visualización del DAG:
#
# extract_api ──┐
# ├──> spark_processing --> quality_check --> load_dwh --> update_metrics --> notify
# extract_database ─┘
|
DAG en Airflow UI:
graph LR
A[extract_api] --> C[spark_processing]
B[extract_database] --> C
C --> D[quality_check]
D --> E[load_dwh]
E --> F[update_metrics]
F --> G[send_notification]
style A fill:#90EE90
style B fill:#90EE90
style C fill:#87CEEB
style D fill:#87CEEB
style E fill:#FFD700
style F fill:#FFD700
style G fill:#FF6347
9.6. Comparación Final de Tecnologías
Hadoop vs Spark
| Aspecto |
Hadoop MapReduce |
Apache Spark |
| Performance |
Lento (disk I/O) |
100x más rápido (in-memory) |
| Latencia |
Minutos-horas |
Segundos-minutos |
| Complejidad |
Más código |
API de alto nivel |
| Iteraciones |
Ineficiente (re-read disk) |
Eficiente (cache RAM) |
| Streaming |
No nativo |
Spark Streaming nativo |
| Machine Learning |
Mahout (limitado) |
MLlib completo |
| SQL |
Hive (lento) |
Spark SQL (rápido) |
| Cuándo usar |
Batch gigantes, costo/GB bajo |
Análisis iterativo, ML, streaming |
Casos de Uso por Tecnología
| Caso de Uso |
Tecnología Recomendada |
| ETL batch diario TB de datos |
Hadoop MapReduce o Spark Batch |
| Análisis exploratorio interactivo |
Spark DataFrames |
| Machine Learning a escala |
Spark MLlib |
| Streaming analytics real-time |
Kafka + Spark Streaming o Flink |
| Ingesta de logs/eventos |
Kafka + Flume |
| Orquestación workflows complejos |
Apache Airflow |
| NoSQL column-store |
HBase |
| SQL sobre data lake |
Spark SQL o Presto |
| Data warehouse MPP |
Redshift, Snowflake, BigQuery |
Conclusión
Ecosistema Completo
La infraestructura Big Data moderna combina múltiples tecnologías:
- Almacenamiento: HDFS, S3, Azure Blob
- Procesamiento Batch: Hadoop, Spark
- Procesamiento Streaming: Kafka, Spark Streaming, Flink
- Orquestación: Airflow, Oozie
- Query Engines: Hive, Spark SQL, Presto
- ML: Spark MLlib, TensorFlow on Spark
La elección depende de:
- 🎯 Latencia requerida
- 📊 Volumen de datos
- 💰 Presupuesto
- 🧑💻 Skills del equipo
- ⚙️ Complejidad de procesamiento
Recomendación Práctica
Stack moderno recomendado en 2026:
- Storage: S3/Azure Blob (object storage) + Delta Lake (ACID)
- Processing: Apache Spark (batch + streaming)
- Streaming: Apache Kafka
- Orchestration: Apache Airflow
- Query: Spark SQL o Presto
- ML: Spark MLlib + MLflow
- Infra: Kubernetes para deploy
Fin del Capítulo 9: Infraestructura Big Data