Saltar a contenido

Getting Started Guide

Guía completa para comenzar con Fracttal ETL Hub - desde la instalación hasta tu primer ETL en producción.

!!! tip "¿Tienes prisa?" Si quieres crear tu primer ETL en 5 minutos, ve directamente a la Quick Start Guide.


Lo que vamos a lograr

Al finalizar esta guía, tendrás:

  • Fracttal ETL Hub instalado y configurado
  • Apache Airflow funcionando correctamente
  • Tu primer ETL ejecutándose en producción
  • Monitoreo básico configurado
  • Mejores prácticas implementadas

Tiempo estimado: 30-45 minutos


Prerequisitos

Sistema Operativo

  • Linux (Ubuntu 20.04+, CentOS 8+)
  • macOS (10.15+)
  • Windows (10/11 con WSL2)

Software Requerido

  • Python 3.11+ con pip
  • Apache Airflow 2.9.3
  • Docker (opcional, recomendado)
  • Git para control de versiones

Accesos Necesarios

  • Base de datos (MySQL, PostgreSQL, etc.)
  • Sistemas destino (Google Sheets, APIs, etc.)
  • Credenciales de servicios a integrar

Instalación

Opción 1: Instalación con Docker (Recomendada)

docker-compose.yml
version: '3.8'
services:
  fracttal-etl:
    image: fracttal/etl-hub:latest
    ports:
      - "8080:8080"
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./configs:/opt/airflow/configs
    depends_on:
      - postgres

  postgres:
    image: postgres:13
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
# Levantar el entorno
docker-compose up -d

# Verificar que está funcionando
curl http://localhost:8080/health

Opción 2: Instalación Nativa

# 1. Crear entorno virtual
python -m venv fracttal-etl-env
source fracttal-etl-env/bin/activate  # Linux/macOS
# o en Windows: fracttal-etl-env\Scripts\activate

# 2. Instalar dependencias
pip install --upgrade pip
pip install fracttal-etl-hub[all]

# 3. Configurar Airflow
export AIRFLOW_HOME=$PWD/airflow
airflow db init
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@fracttal.com \
    --password admin

# 4. Iniciar servicios
airflow webserver --port 8080 &
airflow scheduler &

Verificación de Instalación

# Verificar versiones
fracttal-etl --version
airflow version

# Test básico
fracttal-etl tests-connection --type database --config config.json

Configuración Inicial

1. Variables de Entorno

.env
# Configuración de Airflow
AIRFLOW_HOME=/opt/airflow
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

# Configuración de Fracttal ETL
FRACTTAL_API_BASE_URL=https://api.fracttal.com
FRACTTAL_API_VERSION=v1
FRACTTAL_LOG_LEVEL=INFO

# Conexiones de base de datos
DATABASE_URL=postgresql://user:password@localhost:5432/fracttal_etl
REDIS_URL=redis://localhost:6379/0

# Configuración de logging
LOG_LEVEL=INFO
LOG_FORMAT=json

2. Configuración de Conexiones

connections.py
from fracttal_etl.connections import ConnectionManager

# Configurar conexiones disponibles
connections = ConnectionManager()

# Base de datos principal
connections.add_connection({
    "id": "main_database",
    "type": "postgresql",
    "host": "localhost",
    "port": 5432,
    "database": "production",
    "username": "etl_user",
    "password": "secure_password",
    "pool_size": 5
})

# Google Sheets
connections.add_connection({
    "id": "google_sheets",
    "type": "google_sheets",
    "service_account_file": "/path/to/service-account.json",
    "scopes": [
        "https://www.googleapis.com/auth/spreadsheets",
        "https://www.googleapis.com/auth/drive.file"
    ]
})

# API externa
connections.add_connection({
    "id": "external_api",
    "type": "http",
    "base_url": "https://api.external-service.com",
    "auth_type": "bearer",
    "token": "your_api_token",
    "timeout": 30
})

3. Estructura de Directorios

fracttal-etl/
├── airflow/
│   ├── dags/              # DAGs de Airflow
│   ├── plugins/           # Plugins personalizados
│   └── config/            # Configuraciones
├── configs/
│   ├── connections/       # Definiciones de conexiones
│   ├── etls/             # Configuraciones de ETL
│   └── schemas/          # Esquemas de validación
├── logs/                 # Logs de ejecución
├── tests/                # Tests unitarios
└── scripts/              # Scripts de utilidad

Primer ETL Completo

Vamos a crear un ETL real que: 1. Extrae datos de una base de datos PostgreSQL 2. Transforma los datos con limpieza y formateo 3. Carga los resultados en Google Sheets 4. Monitorea la ejecución

1. Configuración del ETL

configs/etls/customer-sync.json
{
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "jsonrpc": "2.0",
  "method": "etl.etl_update",
  "params": {
    "id": "customer-sync-v1",
    "config": {
      "description": "Sincronización diaria de clientes a Google Sheets",
      "source": {
        "id_type": 1,
        "id_connection": "main_database",
        "description": "Extraer clientes activos de PostgreSQL",
        "name": "Customer Database",
        "feature": "read_data",
        "parameters": {
          "query_string": {
            "table": "customers",
            "fields": "id,first_name,last_name,email,phone,created_at,last_login,status",
            "where": "status = 'active' AND created_at >= CURRENT_DATE - INTERVAL '30 days'",
            "order_by": "created_at DESC",
            "limit": 1000
          }
        }
      },
      "transform": {
        "map": [
          {
            "filter": [
              {"var": ""},
              {
                "and": [
                  {"!=": [{"var": "email"}, null]},
                  {"!=": [{"var": "email"}, ""]},
                  {"!=": [{"var": "first_name"}, null]}
                ]
              }
            ]
          },
          {
            "map": [
              {"var": ""},
              {
                "merge": [
                  {"var": ""},
                  {
                    "full_name": {
                      "concat": [
                        {"var": "first_name"},
                        " ",
                        {"var": "last_name"}
                      ]
                    },
                    "formatted_phone": {
                      "if": [
                        {"!=": [{"var": "phone"}, null]},
                        {"regex_replace": [{"var": "phone"}, "[^0-9]", ""]},
                        ""
                      ]
                    },
                    "days_since_created": {
                      "days_between": [
                        {"var": "created_at"},
                        {"format_date": ["now", "%Y-%m-%d"]}
                      ]
                    },
                    "is_recent_login": {
                      "if": [
                        {"!=": [{"var": "last_login"}, null]},
                        {
                          "<=": [
                            {"days_between": [{"var": "last_login"}, {"format_date": ["now", "%Y-%m-%d"]}]},
                            7
                          ]
                        },
                        false
                      ]
                    }
                  }
                ]
              }
            ]
          }
        ]
      },
      "target": {
        "id_type": 5,
        "id_connection": "google_sheets",
        "description": "Cargar datos procesados a Google Sheets",
        "name": "Customer Report Sheet",
        "feature": "replace_sheet_data",
        "parameters": {
          "spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
          "sheet_name": "Customers",
          "range": "A1:I1000",
          "headers": [
            "ID", "Nombre Completo", "Email", "Teléfono", 
            "Fecha Creación", "Días desde Creación", 
            "Último Login", "Login Reciente", "Estado"
          ],
          "clear_before_insert": true
        }
      },
      "settings": {
        "schedule": "0 8 * * *",  // Diario a las 8 AM
        "timeout": 300,
        "retries": 3,
        "retry_delay": "00:05:00",
        "email_on_failure": ["admin@company.com"],
        "email_on_success": ["reports@company.com"]
      }
    },
    "environment": "production"
  }
}

2. Crear el DAG de Airflow

airflow/dags/customer_sync_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from fracttal_etl import ETLHub

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'customer_sync',
    default_args=default_args,
    description='Sincronización diaria de clientes',
    schedule_interval='0 8 * * *',  # Diario a las 8 AM
    start_date=datetime(2024, 10, 1),
    catchup=False,
    tags=['etl', 'customers', 'daily'],
)

def execute_customer_sync():
    """Ejecutar ETL de sincronización de clientes"""
    etl_hub = ETLHub()

    # Cargar configuración
    with open('/opt/airflow/configs/etls/customer-sync.json', 'r') as f:
        config = json.load(f)

    # Ejecutar ETL
    result = etl_hub.execute_etl(config)

    if not result.success:
        raise Exception(f"ETL failed: {result.error}")

    return {
        'records_processed': result.records_count,
        'execution_time': result.execution_time,
        'status': 'success'
    }

# Definir tareas
sync_task = PythonOperator(
    task_id='sync_customers',
    python_callable=execute_customer_sync,
    dag=dag,
)

# Configurar dependencias (si hubiera más tareas)
sync_task

3. Validar y Probar

# Validar sintaxis del DAG
python airflow/dags/customer_sync_dag.py

# Probar configuración del ETL
fracttal-etl validate configs/etls/customer-sync.json

# Ejecutar tests en modo dry-run
fracttal-etl execute configs/etls/customer-sync.json --dry-run

# Ejecutar una vez manualmente
airflow dags tests customer_sync 2024-10-29

4. Verificar Resultados

Después de la ejecución exitosa:

  1. En Google Sheets: Verifica que los datos se cargaron correctamente
  2. En Airflow UI: Revisa los logs y métricas de ejecución
  3. En logs: Analiza cualquier warning o información adicional
# Ver logs de la última ejecución
tail -f logs/etl-customer-sync-$(date +%Y%m%d).log

# Verificar métricas
fracttal-etl metrics --etl customer-sync --date today

Monitoreo y Alertas

1. Configurar Alertas

configs/monitoring/alerts.yml
alerts:
  - name: "ETL Failure"
    condition: "execution_status == 'failed'"
    channels:
      - email: ["admin@company.com"]
      - slack: "#etl-alerts"

  - name: "Performance Degradation"
    condition: "execution_time > 300"  # 5 minutos
    channels:
      - email: ["performance-team@company.com"]

  - name: "Low Data Volume"
    condition: "records_processed < 100"
    channels:
      - email: ["data-quality@company.com"]

2. Dashboard de Métricas

scripts/generate_dashboard.py
from fracttal_etl.monitoring import MetricsCollector

def generate_daily_report():
    metrics = MetricsCollector()

    report = {
        "date": datetime.now().strftime("%Y-%m-%d"),
        "etls_executed": metrics.count_executions_today(),
        "success_rate": metrics.calculate_success_rate(),
        "avg_execution_time": metrics.avg_execution_time(),
        "total_records_processed": metrics.total_records_today(),
        "top_errors": metrics.get_top_errors(limit=5)
    }

    return report

Automatización

1. Programación de ETLs

configs/schedules/production.json
{
  "schedules": {
    "customer-sync": {
      "cron": "0 8 * * *",
      "timezone": "America/Bogota",
      "enabled": true
    },
    "inventory-update": {
      "cron": "0 */4 * * *",
      "timezone": "America/Bogota", 
      "enabled": true
    },
    "daily-reports": {
      "cron": "0 18 * * 1-5",
      "timezone": "America/Bogota",
      "enabled": true
    }
  }
}

2. Deployment Pipeline

.github/workflows/deploy-etls.yml
name: Deploy ETLs
on:
  push:
    branches: [main]
    paths: ['configs/etls/**']

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2

      - name: Validate ETL Configs
        run: |
          fracttal-etl validate-all configs/etls/

      - name: Deploy to Production
        run: |
          fracttal-etl deploy configs/etls/ --env production

      - name: Notify Team
        run: |
          echo "ETLs deployed successfully" | 
          slack-notify "#data-engineering"

Checklist de Producción

Antes de llevar tu ETL a producción:

Configuración

  • Variables de entorno configuradas correctamente
  • Conexiones validadas y con permisos correctos
  • Credenciales almacenadas de forma segura
  • Timeouts configurados apropiadamente

Testing

  • ETL probado en entorno de desarrollo
  • Validación de datos implementada
  • Manejo de errores configurado
  • Performance validado con volúmenes reales

Monitoreo

  • Alertas configuradas para fallos
  • Logging habilitado en nivel apropiado
  • Métricas siendo recolectadas
  • Dashboard accessible para el equipo

Documentación

  • Configuración documentada
  • Runbook de troubleshooting creado
  • Contactos de escalación definidos
  • SLA establecido

Próximos Pasos

¡Felicidades! Ya tienes tu ETL funcionando en producción. Ahora puedes:

Explorar Funcionalidades Avanzadas

Optimizar Performance

Escalar a Enterprise


Soporte

¿Necesitas ayuda con tu implementación?

!!! success "¡Excelente trabajo!" Has completado la configuración de Fracttal ETL Hub. Tu primer ETL está ejecutándose en producción y monitoreado correctamente.