Skip to content

Training Pipeline

Introduction

The training pipeline orchestrates the complete workflow for training machine learning models in aiNXT. It handles configuration loading, data preparation, model training, checkpoint management, and MLflow experiment tracking.

The entire pipeline is configuration-driven - you define what to train in YAML files, and the pipeline handles the execution.

Pipeline Overview

graph TB
    CONFIG[Configuration Files] --> SETUP[Setup Environment]
    SETUP --> MLFLOW[MLflow Experiment]
    MLFLOW --> MODEL[Load/Create Model]
    MODEL --> DATA[Prepare Datasets]
    DATA --> SPLIT[Train/Test/Val Split]
    SPLIT --> TRAIN[Execute Training]
    TRAIN --> SAVE[Save Model & Artifacts]
    SAVE --> MLFLOW_LOG[Log to MLflow]

    style CONFIG fill:#FF6B35
    style TRAIN fill:#0F596E,color:#fff
    style MLFLOW_LOG fill:#0097B1,color:#fff

Training Script Entry Point

from ainxt.scripts.training import train
from context import CONTEXT

# Run training with configuration files
model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml"
)

Pipeline Steps

Step 1: Configuration Setup

The pipeline starts by loading and merging all configuration files:

config = setup_configuration(
    config=config,
    data_config=data_config,
    model_config=model_config,
    training_config=training_config
)

Configuration Files:

config/data.yaml:

task: classification
name: seeds_dataset
params:
  path: data/seeds.csv
  features: [area, perimeter, compactness, length, width]
  target: variety

config/model.yaml:

task: classification
name: random_forest
params:
  n_estimators: 100
  max_depth: 10
  random_state: 42

config/training.yaml:

training:
  epochs: 50
  batch_size: 32
  learning_rate: 0.001

  # Dataset splitting
  split:
    test_size: 0.2
    validation_size: 0.1
    random_state: 42
    shuffle: true
    stratify: true

mlflow:
  tracking_uri: http://localhost:5000
  experiment_name: seeds_classification
  run_name: random_forest_exp_001

Step 2: Environment Preparation

Creates checkpoint directory for intermediate artifacts:

checkpoint_dir, logger = setup_environment(
    checkpoint_dir=checkpoint_dir,
    log_filename="log_training.txt",
    script_type="training"
)

Checkpoint Directory Structure:

checkpoint_dir/
├── log_training.txt          # Training logs
├── config.yaml               # Merged configuration
├── data/                     # Cached datasets
│   ├── train.json
│   ├── test.json
│   └── validation.json
├── model/                    # Model checkpoints
│   └── checkpoint_epoch_10/
└── artifacts/                # Training artifacts

Step 3: MLflow Experiment Setup

Configures MLflow tracking for experiment management:

mlflow_info = setup_mlflow_experiment(
    config=config,
    checkpoint_dir=checkpoint_dir,
    load_checkpoint=load_checkpoint,
    logger=logger
)
# Returns: (experiment_id, experiment_name, run_id) or None

What Gets Logged: - Configuration parameters - Training hyperparameters - Model architecture details - Dataset statistics - Training metrics (loss, accuracy, etc.) - Model artifacts

Step 4: Model Preparation

Loads or creates the model using the factory system:

model = setup_model(
    context=context,
    config=config,
    checkpoint_dir=checkpoint_dir,
    load_checkpoint=load_checkpoint,
    logger=logger
)

Two Modes:

  1. Create New Model (load_checkpoint=False):

    model = CONTEXT.load_model(config.model)
    

  2. Resume from Checkpoint (load_checkpoint=True):

    model = CONTEXT.load_model(config.model)
    model.load(checkpoint_dir / "model")
    

Step 5: Data Preparation

Loads and splits the dataset:

train_set, test_set, validation_set = setup_datasets(
    context=context,
    config=config,
    checkpoint_dir=checkpoint_dir,
    analyze=analyze,
    logger=logger
)

Dataset Flow:

  1. Load Full Dataset:

    dataset = CONTEXT.load_dataset(config.data)
    

  2. Split into Train/Test:

    train_set, test_set = split_dataset(
        dataset,
        test_size=0.2,
        random_state=42,
        stratify=True
    )
    

  3. Further Split Train into Train/Validation:

    train_set, validation_set = split_dataset(
        train_set,
        test_size=0.1,  # 10% of training data
        random_state=42
    )
    

Final Split Example (1000 instances): - Training: 720 instances (72%) - Validation: 80 instances (8%) - Test: 200 instances (20%)

Caching:

# Datasets are cached to checkpoint directory
train_set.save(checkpoint_dir / "data/train.json", encoder)
test_set.save(checkpoint_dir / "data/test.json", encoder)
validation_set.save(checkpoint_dir / "data/validation.json", encoder)

Step 6: Training Configuration

Prepares training parameters:

training_kwargs = prepare_training_config(
    context=context,
    config=config,
    checkpoint_dir=checkpoint_dir,
    model=model,
    validation_set=validation_set,
    logger=logger
)

Extracted Parameters: - Epochs, batch size, learning rate - Early stopping criteria - Checkpoint frequency - Validation dataset (if provided) - Custom training callbacks

Step 7: Model Training

Executes the training loop:

execute_training(
    model=model,
    dataset=train_set,
    training_kwargs=training_kwargs,
    logger=logger
)

What Happens:

# Simplified training execution
model.fit(
    dataset=train_set,
    epochs=50,
    batch_size=32,
    validation_data=validation_set,
    callbacks=[checkpoint_callback, mlflow_callback]
)

Progress Logging:

[INFO] Starting training...
[INFO] Epoch 1/50 - loss: 0.543, accuracy: 0.812, val_loss: 0.489, val_accuracy: 0.845
[INFO] Epoch 2/50 - loss: 0.421, accuracy: 0.856, val_loss: 0.445, val_accuracy: 0.867
...
[INFO] Training completed in 2m 34s

Step 8: Save Model and Artifacts

Saves the trained model and logs to MLflow:

save_model_and_artifacts(
    context=context,
    config=config,
    checkpoint_dir=checkpoint_dir,
    model=model,
    train_set=train_set,
    validation_set=validation_set,
    test_set=test_set,
    mlflow_enabled=mlflow_enabled,
    logger=logger,
    cleanup_on_success=cleanup_on_success
)

Saved Artifacts:

  1. Model Files:

    model.save(checkpoint_dir / "model")
    

  2. MLflow Logging:

    import mlflow
    
    # Log parameters
    mlflow.log_params(config.training.params)
    
    # Log metrics
    mlflow.log_metrics({
        "train_loss": final_loss,
        "train_accuracy": final_accuracy
    })
    
    # Log model
    mlflow.log_artifacts(checkpoint_dir / "model", artifact_path="model")
    
    # Log datasets
    mlflow.log_artifact(checkpoint_dir / "data/train.json")
    mlflow.log_artifact(checkpoint_dir / "data/test.json")
    

  3. Configuration:

    config.save(checkpoint_dir / "config.yaml")
    mlflow.log_artifact(checkpoint_dir / "config.yaml")
    

Usage Examples

Basic Training

from ainxt.scripts.training import train
from context import CONTEXT

model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml"
)

print(f"Model saved to: {checkpoint_dir}")
print(f"MLflow run ID: {mlflow_info[2]}")

Training with Checkpointing

# Train with automatic checkpoint directory
model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml",
    checkpoint_dir="checkpoints/experiment_001"
)

# Resume from checkpoint
model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml",
    checkpoint_dir="checkpoints/experiment_001",
    load_checkpoint=True  # Resume training
)

Training with Data Analysis

model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml",
    analyze=True  # Generate data analysis reports
)

Training with Cleanup

model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="config/base.yaml",
    data_config="config/data.yaml",
    model_config="config/model.yaml",
    training_config="config/training.yaml",
    cleanup_on_success=True  # Remove checkpoint dir after MLflow logging
)

Configuration Options

Data Configuration

task: classification
name: my_dataset
params:
  path: data/train.csv
  preprocessing:
    normalize: true
    handle_missing: fill_mean
  features: [feature1, feature2, feature3]
  target: label

Model Configuration

task: classification
name: neural_network
params:
  layers: [128, 64, 32]
  activation: relu
  dropout: 0.3
  optimizer: adam

Training Configuration

training:
  # Training hyperparameters
  epochs: 100
  batch_size: 64
  learning_rate: 0.001

  # Dataset splitting
  split:
    test_size: 0.2
    validation_size: 0.15
    random_state: 42
    shuffle: true
    stratify: true

  # Early stopping
  early_stopping:
    monitor: val_loss
    patience: 10
    min_delta: 0.001

  # Checkpointing
  checkpoint:
    save_frequency: 5  # Save every 5 epochs
    save_best_only: true

mlflow:
  tracking_uri: http://localhost:5000
  experiment_name: my_experiment
  run_name: run_001
  tags:
    team: data-science
    project: classification

Return Values

The train function returns a tuple:

model, checkpoint_dir, mlflow_info = train(...)
  1. model: Trained model instance
  2. checkpoint_dir: Path to checkpoint directory (str)
  3. mlflow_info: Tuple of (experiment_id, experiment_name, run_id) or None

Using Return Values:

# Use trained model directly
predictions = model.predict(new_instance)

# Load from checkpoint later
from context import CONTEXT
model = CONTEXT.load_model(model_config)
model.load(checkpoint_dir / "model")

# Continue in MLflow
import mlflow
experiment_id, experiment_name, run_id = mlflow_info
mlflow.set_tracking_uri("http://localhost:5000")
run = mlflow.get_run(run_id)
print(run.data.metrics)

Integration with Databricks

The training pipeline works seamlessly on Databricks:

# Databricks notebook
from ainxt.scripts.training import train
from context import CONTEXT

# MLflow automatically uses Databricks tracking
model, checkpoint_dir, mlflow_info = train(
    context=CONTEXT,
    config="/dbfs/configs/base.yaml",
    data_config="/dbfs/configs/data.yaml",
    model_config="/dbfs/configs/model.yaml",
    training_config="/dbfs/configs/training.yaml"
)

# Model automatically logged to Databricks MLflow

Databricks Benefits: - Built-in MLflow tracking (no separate server needed) - Distributed training on Spark clusters - Automatic artifact storage in DBFS - Integration with Databricks model registry

Best Practices

1. Always Use Configuration Files

Don't:

model = MyModel(param1=value1, param2=value2)
model.fit(dataset)

Do:

# model.yaml
task: classification
name: my_model
params:
  param1: value1
  param2: value2

# Python
model, _, _ = train(context=CONTEXT, model_config="model.yaml", ...)

2. Version Control Configurations

git add config/
git commit -m "Add training config for experiment 42"

3. Use MLflow for Experiment Tracking

# Always configure MLflow
mlflow:
  experiment_name: my_project
  run_name: experiment_${timestamp}

4. Checkpoint Important Experiments

train(
    ...,
    checkpoint_dir=f"checkpoints/{experiment_name}",
    cleanup_on_success=False  # Keep checkpoints for analysis
)

5. Split Data Properly

# Use stratification for classification
split:
  test_size: 0.2
  validation_size: 0.1
  stratify: true
  random_state: 42  # For reproducibility

Next Steps