Training Pipeline
Introduction
The training pipeline orchestrates the complete workflow for training machine learning models in aiNXT. It handles configuration loading, data preparation, model training, checkpoint management, and MLflow experiment tracking.
The entire pipeline is configuration-driven - you define what to train in YAML files, and the pipeline handles the execution.
Pipeline Overview
graph TB
CONFIG[Configuration Files] --> SETUP[Setup Environment]
SETUP --> MLFLOW[MLflow Experiment]
MLFLOW --> MODEL[Load/Create Model]
MODEL --> DATA[Prepare Datasets]
DATA --> SPLIT[Train/Test/Val Split]
SPLIT --> TRAIN[Execute Training]
TRAIN --> SAVE[Save Model & Artifacts]
SAVE --> MLFLOW_LOG[Log to MLflow]
style CONFIG fill:#FF6B35
style TRAIN fill:#0F596E,color:#fff
style MLFLOW_LOG fill:#0097B1,color:#fff
Training Script Entry Point
from ainxt.scripts.training import train
from context import CONTEXT
# Run training with configuration files
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml"
)
Pipeline Steps
Step 1: Configuration Setup
The pipeline starts by loading and merging all configuration files:
config = setup_configuration(
config=config,
data_config=data_config,
model_config=model_config,
training_config=training_config
)
Configuration Files:
config/data.yaml:
task: classification
name: seeds_dataset
params:
path: data/seeds.csv
features: [area, perimeter, compactness, length, width]
target: variety
config/model.yaml:
config/training.yaml:
training:
epochs: 50
batch_size: 32
learning_rate: 0.001
# Dataset splitting
split:
test_size: 0.2
validation_size: 0.1
random_state: 42
shuffle: true
stratify: true
mlflow:
tracking_uri: http://localhost:5000
experiment_name: seeds_classification
run_name: random_forest_exp_001
Step 2: Environment Preparation
Creates checkpoint directory for intermediate artifacts:
checkpoint_dir, logger = setup_environment(
checkpoint_dir=checkpoint_dir,
log_filename="log_training.txt",
script_type="training"
)
Checkpoint Directory Structure:
checkpoint_dir/
├── log_training.txt # Training logs
├── config.yaml # Merged configuration
├── data/ # Cached datasets
│ ├── train.json
│ ├── test.json
│ └── validation.json
├── model/ # Model checkpoints
│ └── checkpoint_epoch_10/
└── artifacts/ # Training artifacts
Step 3: MLflow Experiment Setup
Configures MLflow tracking for experiment management:
mlflow_info = setup_mlflow_experiment(
config=config,
checkpoint_dir=checkpoint_dir,
load_checkpoint=load_checkpoint,
logger=logger
)
# Returns: (experiment_id, experiment_name, run_id) or None
What Gets Logged: - Configuration parameters - Training hyperparameters - Model architecture details - Dataset statistics - Training metrics (loss, accuracy, etc.) - Model artifacts
Step 4: Model Preparation
Loads or creates the model using the factory system:
model = setup_model(
context=context,
config=config,
checkpoint_dir=checkpoint_dir,
load_checkpoint=load_checkpoint,
logger=logger
)
Two Modes:
-
Create New Model (
load_checkpoint=False): -
Resume from Checkpoint (
load_checkpoint=True):
Step 5: Data Preparation
Loads and splits the dataset:
train_set, test_set, validation_set = setup_datasets(
context=context,
config=config,
checkpoint_dir=checkpoint_dir,
analyze=analyze,
logger=logger
)
Dataset Flow:
-
Load Full Dataset:
-
Split into Train/Test:
-
Further Split Train into Train/Validation:
Final Split Example (1000 instances): - Training: 720 instances (72%) - Validation: 80 instances (8%) - Test: 200 instances (20%)
Caching:
# Datasets are cached to checkpoint directory
train_set.save(checkpoint_dir / "data/train.json", encoder)
test_set.save(checkpoint_dir / "data/test.json", encoder)
validation_set.save(checkpoint_dir / "data/validation.json", encoder)
Step 6: Training Configuration
Prepares training parameters:
training_kwargs = prepare_training_config(
context=context,
config=config,
checkpoint_dir=checkpoint_dir,
model=model,
validation_set=validation_set,
logger=logger
)
Extracted Parameters: - Epochs, batch size, learning rate - Early stopping criteria - Checkpoint frequency - Validation dataset (if provided) - Custom training callbacks
Step 7: Model Training
Executes the training loop:
What Happens:
# Simplified training execution
model.fit(
dataset=train_set,
epochs=50,
batch_size=32,
validation_data=validation_set,
callbacks=[checkpoint_callback, mlflow_callback]
)
Progress Logging:
[INFO] Starting training...
[INFO] Epoch 1/50 - loss: 0.543, accuracy: 0.812, val_loss: 0.489, val_accuracy: 0.845
[INFO] Epoch 2/50 - loss: 0.421, accuracy: 0.856, val_loss: 0.445, val_accuracy: 0.867
...
[INFO] Training completed in 2m 34s
Step 8: Save Model and Artifacts
Saves the trained model and logs to MLflow:
save_model_and_artifacts(
context=context,
config=config,
checkpoint_dir=checkpoint_dir,
model=model,
train_set=train_set,
validation_set=validation_set,
test_set=test_set,
mlflow_enabled=mlflow_enabled,
logger=logger,
cleanup_on_success=cleanup_on_success
)
Saved Artifacts:
-
Model Files:
-
MLflow Logging:
import mlflow # Log parameters mlflow.log_params(config.training.params) # Log metrics mlflow.log_metrics({ "train_loss": final_loss, "train_accuracy": final_accuracy }) # Log model mlflow.log_artifacts(checkpoint_dir / "model", artifact_path="model") # Log datasets mlflow.log_artifact(checkpoint_dir / "data/train.json") mlflow.log_artifact(checkpoint_dir / "data/test.json") -
Configuration:
Usage Examples
Basic Training
from ainxt.scripts.training import train
from context import CONTEXT
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml"
)
print(f"Model saved to: {checkpoint_dir}")
print(f"MLflow run ID: {mlflow_info[2]}")
Training with Checkpointing
# Train with automatic checkpoint directory
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml",
checkpoint_dir="checkpoints/experiment_001"
)
# Resume from checkpoint
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml",
checkpoint_dir="checkpoints/experiment_001",
load_checkpoint=True # Resume training
)
Training with Data Analysis
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml",
analyze=True # Generate data analysis reports
)
Training with Cleanup
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="config/base.yaml",
data_config="config/data.yaml",
model_config="config/model.yaml",
training_config="config/training.yaml",
cleanup_on_success=True # Remove checkpoint dir after MLflow logging
)
Configuration Options
Data Configuration
task: classification
name: my_dataset
params:
path: data/train.csv
preprocessing:
normalize: true
handle_missing: fill_mean
features: [feature1, feature2, feature3]
target: label
Model Configuration
task: classification
name: neural_network
params:
layers: [128, 64, 32]
activation: relu
dropout: 0.3
optimizer: adam
Training Configuration
training:
# Training hyperparameters
epochs: 100
batch_size: 64
learning_rate: 0.001
# Dataset splitting
split:
test_size: 0.2
validation_size: 0.15
random_state: 42
shuffle: true
stratify: true
# Early stopping
early_stopping:
monitor: val_loss
patience: 10
min_delta: 0.001
# Checkpointing
checkpoint:
save_frequency: 5 # Save every 5 epochs
save_best_only: true
mlflow:
tracking_uri: http://localhost:5000
experiment_name: my_experiment
run_name: run_001
tags:
team: data-science
project: classification
Return Values
The train function returns a tuple:
- model: Trained model instance
- checkpoint_dir: Path to checkpoint directory (str)
- mlflow_info: Tuple of
(experiment_id, experiment_name, run_id)orNone
Using Return Values:
# Use trained model directly
predictions = model.predict(new_instance)
# Load from checkpoint later
from context import CONTEXT
model = CONTEXT.load_model(model_config)
model.load(checkpoint_dir / "model")
# Continue in MLflow
import mlflow
experiment_id, experiment_name, run_id = mlflow_info
mlflow.set_tracking_uri("http://localhost:5000")
run = mlflow.get_run(run_id)
print(run.data.metrics)
Integration with Databricks
The training pipeline works seamlessly on Databricks:
# Databricks notebook
from ainxt.scripts.training import train
from context import CONTEXT
# MLflow automatically uses Databricks tracking
model, checkpoint_dir, mlflow_info = train(
context=CONTEXT,
config="/dbfs/configs/base.yaml",
data_config="/dbfs/configs/data.yaml",
model_config="/dbfs/configs/model.yaml",
training_config="/dbfs/configs/training.yaml"
)
# Model automatically logged to Databricks MLflow
Databricks Benefits: - Built-in MLflow tracking (no separate server needed) - Distributed training on Spark clusters - Automatic artifact storage in DBFS - Integration with Databricks model registry
Best Practices
1. Always Use Configuration Files
❌ Don't:
✅ Do:
# model.yaml
task: classification
name: my_model
params:
param1: value1
param2: value2
# Python
model, _, _ = train(context=CONTEXT, model_config="model.yaml", ...)
2. Version Control Configurations
3. Use MLflow for Experiment Tracking
4. Checkpoint Important Experiments
train(
...,
checkpoint_dir=f"checkpoints/{experiment_name}",
cleanup_on_success=False # Keep checkpoints for analysis
)
5. Split Data Properly
# Use stratification for classification
split:
test_size: 0.2
validation_size: 0.1
stratify: true
random_state: 42 # For reproducibility
Next Steps
- Evaluation Pipeline - Evaluate trained models
- MLflow Integration - Deep dive into experiment tracking
- Factory System - Understanding configuration-driven object creation