MLflow: Enterprise Lifecycle Handbook
A production-ready guide to experiment tracking, model registry, deployment, and LLMOps using MLflow 2.x — solving the chaos of the "which notebook produced this .pkl?" problem once and for all.
@champion, @challenger) instead of the deprecated Staging/Production stage workflow. All code is copy-pasteable and production-tested. Environment setup: pip install mlflow>=2.10 scikit-learn pandas torch.Why MLflow? The ML Chaos Problem
Every ML team hits the same wall. Six months after shipping a model, a data scientist leaves and the team inherits a folder of 47 Jupyter notebooks with names like final_model_v3_REAL_final2.ipynb, a best_model.pkl file with unknown provenance, and no record of which hyperparameters produced it.
MLflow standardizes the entire ML lifecycle across teams through four integrated components:
The Problem: Spreadsheet Hell
Before MLflow, hyperparameter tracking looked like this: a shared Google Sheet with columns for learning_rate, n_estimators, and val_accuracy, manually filled in by whoever remembered to do it. The model file was saved somewhere on S3 with a timestamp in the name. Six weeks later, nobody can reproduce the best run.
MLflow's Tracking component addresses this by recording every experiment as a run — an atomic unit that contains parameters, metrics over time, artifacts, environment metadata, and source code pointers. All runs live under an experiment namespace.
Manual Logging
The most explicit approach: wrap your training loop with mlflow.start_run() and log everything you care about. This gives you full control and is the best starting point for understanding what MLflow captures.
# manual_logging.py — explicit experiment tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd
import json, os
# Point to your remote tracking server (not the local ./mlruns default!).
# In production this is a PostgreSQL-backed server with an S3 artifact store.
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
# Group related runs under a named experiment.
mlflow.set_experiment("iris-classifier-v2")
# ── Data prep ────────────────────────────────────────────────
X, y = load_iris(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ── Hyperparameters ───────────────────────────────────────────
params = {
"n_estimators": 200,
"max_depth": 6,
"min_samples_split": 4,
"random_state": 42,
}
# ── MLflow Run ───────────────────────────────────────────────
with mlflow.start_run(run_name="rf-tuning-run-01"):
# log_param: records scalar configuration values.
# Use this for anything that does NOT change during training.
for k, v in params.items():
mlflow.log_param(k, v)
# Train
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# log_metric: records numeric values that may change over time.
# Pass step= to record a value at each training epoch.
preds = model.predict(X_test)
mlflow.log_metric("accuracy", accuracy_score(y_test, preds))
mlflow.log_metric("f1_weighted", f1_score(y_test, preds, average="weighted"))
# log_artifact: upload any file to the run's artifact store.
# Great for plots, confusion matrices, and feature importance CSVs.
feature_imp = pd.Series(
model.feature_importances_, index=X.columns
).sort_values(ascending=False)
imp_path = "/tmp/feature_importance.csv"
feature_imp.to_csv(imp_path)
mlflow.log_artifact(imp_path, artifact_path="reports")
# log_dict: shortcut for logging JSON metadata inline.
mlflow.log_dict({"dataset": "iris", "n_samples": len(X)}, "data_info.json")
# Log the trained model itself — this is what the registry will use.
mlflow.sklearn.log_model(model, "model")
print(f"Run ID: {mlflow.active_run().info.run_id}")
with mlflow.start_run(): — it guarantees the run is ended cleanly even if training throws an exception. Never use mlflow.start_run() + mlflow.end_run() as separate calls.Autologging — The Magic Button
For standard frameworks (scikit-learn, PyTorch, TensorFlow, XGBoost, LightGBM, Keras, Spark, Gluon), MLflow can intercept the training loop and log everything automatically: hyperparameters, evaluation metrics, model signatures, and even training plots.
# autologging.py — zero boilerplate experiment tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("iris-autolog-demo")
# mlflow.sklearn.autolog() patches sklearn's fit() method.
# It captures: all __init__ params, CV results, model signature,
# feature names, and the serialised model artifact.
mlflow.sklearn.autolog(
log_input_examples=True, # saves a sample of X_train as an artifact
log_model_signatures=True, # infers the input/output schema automatically
log_post_training_metrics=True,
)
X, y = load_iris(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# All MLflow calls happen automatically inside .fit() — no boilerplate!
with mlflow.start_run(run_name="gbt-autolog"):
model = GradientBoostingClassifier(
n_estimators=150,
learning_rate=0.05,
max_depth=4,
subsample=0.8,
)
model.fit(X_train, y_train)
# MLflow has already logged all params and the trained model.
# You can still add custom metrics on top:
from sklearn.metrics import roc_auc_score
proba = model.predict_proba(X_test)
mlflow.log_metric("auc_ovr", roc_auc_score(y_test, proba, multi_class="ovr"))
mlflow.sklearn.autolog(), mlflow.pytorch.autolog(), mlflow.tensorflow.autolog(), mlflow.xgboost.autolog(), mlflow.lightgbm.autolog(). Use mlflow.autolog() as the universal shortcut — it auto-detects the active framework.| What autolog captures | Logged As | Notes |
|---|---|---|
All __init__ constructor arguments | Parameters | Including nested param grids from GridSearchCV |
| Training & validation scores | Metrics | training_accuracy, val_accuracy, per epoch |
| Fitted model object | Artifact | Serialised via the correct flavor (pickle, ONNX, etc.) |
| Input/output schema | Model Signature | Inferred from training data if log_model_signatures=True |
| Sample input rows | Artifact | Stored as input_example.json for serving tests |
The MLmodel Format
An MLflow Model is a directory containing the model weights/artifacts plus a MLmodel YAML manifest. The manifest defines one or more flavors — named serialization formats that let the same model be loaded by different runtimes without code changes.
# Example MLmodel manifest for a scikit-learn model
artifact_path: model
flavors:
python_function: # universal flavor — load with mlflow.pyfunc.load_model()
env:
conda: conda.yaml
virtualenv: python_env.yaml
loader_module: mlflow.sklearn
model_path: model.pkl
predict_fn: predict
python_version: 3.10.12
sklearn: # framework-specific flavor — load with mlflow.sklearn.load_model()
code: null
pickled_model: model.pkl
serialization_format: cloudpickle
sklearn_version: 1.4.2
mlflow_version: 2.12.1
model_uuid: 3f9a2e1b-4c88-4d20-8a7e-1234567890ab
run_id: 3f9a2e1b-4c88-4d20-8a7e-1234567890ab
signature:
inputs: '[{"name": "sepal length (cm)", "type": "double"}, ...]'
outputs: '[{"name": "predictions", "type": "long"}]'
The python_function (pyfunc) flavor is the deployment superpower: any model — scikit-learn, PyTorch, TensorFlow, custom Python — can be loaded through the same mlflow.pyfunc.load_model() interface. Your serving infrastructure never needs to know which framework trained the model.
Logging a Model
Use the framework-specific log_model function. Below shows both scikit-learn and PyTorch patterns — the principle is identical across all flavors.
Scikit-Learn
# log_sklearn_model.py
import mlflow
import mlflow.sklearn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from mlflow.models import infer_signature
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("iris-pipeline")
X, y = load_iris(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
with mlflow.start_run(run_name="sklearn-pipeline"):
pipe = Pipeline([
("scaler", StandardScaler()),
("clf", LogisticRegression(C=1.0, max_iter=300)),
])
pipe.fit(X_train, y_train)
# infer_signature derives the input/output schema from real data.
# This is what gets stored in the MLmodel manifest's "signature" block.
signature = infer_signature(X_train, pipe.predict(X_train))
mlflow.sklearn.log_model(
sk_model=pipe,
artifact_path="model",
signature=signature,
input_example=X_train.head(3), # saves a sample payload for serving tests
registered_model_name="iris-lr-pipeline", # auto-registers in the Model Registry
)
PyTorch
# log_pytorch_model.py
import torch
import torch.nn as nn
import mlflow
import mlflow.pytorch
from mlflow.models import infer_signature
import numpy as np
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("pytorch-classifier")
class IrisNet(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(4, 32), nn.ReLU(),
nn.Linear(32, 16), nn.ReLU(),
nn.Linear(16, 3),
)
def forward(self, x):
return self.layers(x)
with mlflow.start_run(run_name="pytorch-irisnet"):
model = IrisNet()
# ... training loop omitted for brevity ...
# Build a representative input for signature inference.
sample_input = torch.randn(5, 4).numpy()
with torch.no_grad():
sample_output = model(torch.tensor(sample_input)).numpy()
signature = infer_signature(sample_input, sample_output)
mlflow.pytorch.log_model(
pytorch_model=model,
artifact_path="model",
signature=signature,
input_example=sample_input[:3],
)
Model Signatures — Type Safety at Inference Time
A Model Signature defines the expected column names, data types, and shapes for both the model's inputs and outputs. Without one, a model silently accepts anything — a DataFrame with columns in the wrong order, float32 where float64 is expected, or a missing feature will produce garbage predictions without error.
# signatures.py — explicit schema definition
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec, TensorSpec
import numpy as np
# Option 1 — infer from data (recommended for tabular models)
from mlflow.models import infer_signature
import pandas as pd
X_sample = pd.DataFrame({
"age": [30, 45],
"income": [50000.0, 85000.0],
"churn": ["yes", "no"], # categorical feature
})
y_sample = np.array([0, 1])
signature = infer_signature(X_sample, y_sample)
# → inputs: [age: long, income: double, churn: string]
# → outputs: [Tensor (dtype=int64, shape=(-1,))]
# Option 2 — explicit schema for deep learning or strict contracts
input_schema = Schema([
ColSpec("double", "age"),
ColSpec("double", "income"),
ColSpec("string", "churn"),
])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 2))])
explicit_signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# Attach the signature when logging — it is then stored in MLmodel
# and enforced automatically by `mlflow models serve`.
with mlflow.start_run():
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
)
The Model Registry: GitHub for Model Weights
The Model Registry is a centralized catalog of named, versioned models. Think of it as GitHub for model weights: every push creates an immutable version, every version has a commit-like run lineage, and aliases mark which version is currently serving traffic.
This solves the operational problem of "which exact artifact is running in production right now, and can I roll it back?" — a question that is unanswerable without a registry.
| Concept | Registry Equivalent | GitHub Analogy |
|---|---|---|
| Registered Model | iris-classifier | Repository |
| Model Version | iris-classifier/versions/7 | Commit SHA |
| Alias | iris-classifier@champion | Branch / tag |
| Run | Training run that produced the version | CI/CD build that produced the artifact |
| Tags / Annotations | Metadata on any version | PR description / release notes |
Registering a Model Programmatically
There are two ways to register a model. The cleanest is to pass registered_model_name directly to log_model() — it creates the registered model if it doesn't exist and bumps the version automatically.
# register_model.py — three equivalent approaches
import mlflow
import mlflow.sklearn
from mlflow import MlflowClient
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
## ── Approach 1: Inline during log_model (simplest) ──────────
with mlflow.start_run() as run:
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="iris-classifier", # creates version N+1
)
run_id = run.info.run_id
## ── Approach 2: Register from an existing run URI ───────────
# Useful when you want to register after evaluating the run offline.
model_uri = f"runs:/{run_id}/model"
result = mlflow.register_model(
model_uri=model_uri,
name="iris-classifier",
)
print(f"Registered as version: {result.version}")
## ── Approach 3: MlflowClient for full control ───────────────
client = MlflowClient()
# Add descriptive metadata to the version for auditability.
client.update_model_version(
name="iris-classifier",
version=result.version,
description="Logistic regression on standardised iris features. Accuracy=0.97.",
)
# Tag the version — useful for CI/CD pipelines to filter by status.
client.set_model_version_tag(
name="iris-classifier",
version=result.version,
key="validated_by",
value="ci-pipeline-run-892",
)
Batch Inference
Load the @champion model via mlflow.pyfunc and apply it to a Pandas DataFrame or PySpark DataFrame for large-scale offline scoring. The pyfunc interface abstracts the framework entirely — the same code works whether the model is scikit-learn, PyTorch, or a custom LLM wrapper.
Pandas (single-node)
# batch_pandas.py — offline scoring on a single machine
import mlflow.pyfunc
import pandas as pd
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
# Load the @champion alias — always points to the current production model.
model = mlflow.pyfunc.load_model("models:/iris-classifier@champion")
# Load the batch to score — typically from a data warehouse query or S3.
batch_df = pd.read_parquet("s3://data-lake/iris/batch_2026_03.parquet")
# predict() returns a numpy array or a pandas Series depending on model flavor.
predictions = model.predict(batch_df)
batch_df["predicted_class"] = predictions
batch_df.to_parquet("s3://data-lake/iris/predictions_2026_03.parquet", index=False)
print(f"Scored {len(batch_df):,} rows. Distribution:\n{batch_df['predicted_class'].value_counts()}")
PySpark (distributed)
# batch_spark.py — distributed scoring across a Spark cluster
import mlflow.pyfunc
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("mlflow-batch-score").getOrCreate()
# mlflow.pyfunc.spark_udf() broadcasts the model to every executor node
# and wraps it as a PySpark UDF — no custom serialisation needed.
predict_udf = mlflow.pyfunc.spark_udf(
spark=spark,
model_uri="models:/iris-classifier@champion",
result_type="integer", # matches the model's output type
)
# Read the large batch from Delta Lake / Hive / S3.
batch_sdf = spark.read.parquet("s3://data-lake/iris/raw/2026-03")
# Apply the UDF to the feature columns. Spark handles parallelism.
result_sdf = batch_sdf.withColumn(
"predicted_class",
predict_udf(*[batch_sdf[col] for col in batch_sdf.columns])
)
result_sdf.write.mode("overwrite").parquet("s3://data-lake/iris/predictions/2026-03")
Real-Time Serving
MLflow ships a built-in model server that exposes any registered model as a REST API in one command. This is ideal for prototyping and internal APIs. For production, wrap it in a Docker container and put it behind a load balancer.
Start the REST server
# Serve the @champion alias on port 5001.
# MLFLOW_TRACKING_URI is read from the environment.
MLFLOW_TRACKING_URI=http://mlflow-server.internal:5000 \
mlflow models serve \
--model-uri "models:/iris-classifier@champion" \
--port 5001 \
--no-conda # use the current venv instead of creating a conda env
Query via curl
# Send a scoring request. The payload matches the model's signature schema.
# "dataframe_split" format: column names + rows (efficient for tabular data).
curl -s -X POST http://localhost:5001/invocations \
-H "Content-Type: application/json" \
-d '{
"dataframe_split": {
"columns": ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"],
"data": [
[5.1, 3.5, 1.4, 0.2],
[6.7, 3.0, 5.2, 2.3]
]
}
}'
# Response: {"predictions": [0, 2]}
Query via Python requests
# realtime_client.py — programmatic inference against the REST server
import requests, json
SERVING_URL = "http://localhost:5001/invocations"
payload = {
"dataframe_split": {
"columns": [
"sepal length (cm)", "sepal width (cm)",
"petal length (cm)", "petal width (cm)",
],
"data": [
[5.1, 3.5, 1.4, 0.2], # expected: class 0 (Setosa)
[6.7, 3.0, 5.2, 2.3], # expected: class 2 (Virginica)
],
}
}
response = requests.post(
SERVING_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(payload),
timeout=10,
)
response.raise_for_status()
predictions = response.json()["predictions"]
print(f"Predictions: {predictions}") # → [0, 2]
dataframe_split (column names + rows array — recommended for tabular), dataframe_records (list of row dicts), and instances (TensorFlow Serving compatible). Match the format to your client's serialization library.Tracking LLMs and Prompts
MLflow 2.x added first-class support for GenAI workloads through the MLflow Tracing and Fluent API. The challenge with LLMs is different from classical ML: instead of hyperparameters and loss curves, you need to track prompt templates, model versions (e.g., gpt-4o), token usage, latency, and qualitative evaluation results.
prompt_tokens, completion_tokens, and total_cost_usd as metrics to track inference costs per experiment.LangChain / OpenAI Integration
MLflow's LangChain integration auto-captures every chain invocation — including intermediate steps, retrieved documents, LLM calls, and token counts — without manual instrumentation.
Logging a LangChain Chain
# langchain_logging.py — log a LangChain chain as an MLflow model
import mlflow
import mlflow.langchain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("llmops-sentiment-chain")
# Enable MLflow's LangChain callback handler.
# This patches chain.__call__() to log every LLM request/response pair.
mlflow.langchain.autolog(log_input_examples=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a sentiment classifier. Reply with only: POSITIVE, NEGATIVE, or NEUTRAL."),
("human", "{review}"),
])
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | StrOutputParser()
with mlflow.start_run(run_name="sentiment-v1-prompt-a"):
# Log the prompt template as an artifact for version history.
mlflow.log_text(prompt.format(review="{{review}}"), "prompt_template.txt")
mlflow.log_param("model", "gpt-4o-mini")
mlflow.log_param("temperature", 0)
# Run the chain — autolog captures the full trace automatically.
reviews = [
"This product is absolutely amazing!",
"Terrible experience. Would not recommend.",
"It was okay, nothing special.",
]
results = [chain.invoke({"review": r}) for r in reviews]
# Log aggregate cost/quality metrics manually.
mlflow.log_metric("n_reviews_scored", len(results))
# Save the chain as a reusable MLflow model in the registry.
mlflow.langchain.log_model(
lc_model=chain,
artifact_path="sentiment_chain",
registered_model_name="sentiment-classifier-lc",
)
Tracking Raw OpenAI Calls
# openai_tracking.py — manual tracking of raw OpenAI API calls
import mlflow
import openai
import time
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("llmops-openai-raw")
# mlflow.openai.autolog() wraps openai.chat.completions.create()
# to capture prompts, responses, latency, and token usage per call.
mlflow.openai.autolog()
client = openai.OpenAI()
with mlflow.start_run(run_name="openai-summariser-v2"):
mlflow.log_param("model", "gpt-4o")
mlflow.log_param("temperature", 0.3)
mlflow.log_param("max_tokens", 256)
start = time.time()
response = client.chat.completions.create(
model="gpt-4o",
temperature=0.3,
max_tokens=256,
messages=[
{"role": "system", "content": "Summarise the following text in one sentence."},
{"role": "user", "content": "MLflow is an open source platform..."},
],
)
latency_ms = (time.time() - start) * 1000
# Log metrics that matter for LLM cost/quality monitoring.
usage = response.usage
mlflow.log_metric("prompt_tokens", usage.prompt_tokens)
mlflow.log_metric("completion_tokens", usage.completion_tokens)
mlflow.log_metric("total_tokens", usage.total_tokens)
mlflow.log_metric("latency_ms", latency_ms)
# Estimated cost (GPT-4o pricing as of March 2026 — update as needed).
cost = (usage.prompt_tokens / 1_000_000) * 2.50 \
+ (usage.completion_tokens / 1_000_000) * 10.0
mlflow.log_metric("estimated_cost_usd", cost)
summary = response.choices[0].message.content
mlflow.log_text(summary, "output_summary.txt")
MLflow Evaluate — Benchmarking LLMs
mlflow.evaluate() provides a standardised evaluation harness for both classical ML models and LLMs. For LLMs it integrates with evaluation libraries to compute metrics like ROUGE, toxicity (via Perspective API), faithfulness, and relevance — all logged as MLflow metrics for comparison across prompt versions.
# mlflow_evaluate.py — LLM benchmark evaluation
import mlflow
import pandas as pd
mlflow.set_tracking_uri("http://mlflow-server.internal:5000")
mlflow.set_experiment("llm-evaluation-suite")
# Build an evaluation dataset: inputs + expected ground-truth outputs.
eval_data = pd.DataFrame({
"inputs": [
"What is MLflow?",
"How does the Model Registry work?",
"What is a model signature?",
],
"ground_truth": [
"MLflow is an open-source platform for managing the ML lifecycle.",
"The Model Registry provides centralized model versioning and aliases.",
"A model signature defines the expected input and output schema.",
],
})
# The model under evaluation can be any Python callable,
# a registered MLflow model URI, or an mlflow.pyfunc model.
with mlflow.start_run(run_name="eval-gpt4o-mini-v1"):
results = mlflow.evaluate(
model="models:/qa-llm@champion", # URI of the model to evaluate
data=eval_data,
targets="ground_truth",
model_type="question-answering", # built-in QA evaluation harness
evaluators="default", # uses mlflow's built-in evaluator
extra_metrics=[
mlflow.metrics.genai.relevance(), # LLM-as-judge relevance score
mlflow.metrics.genai.faithfulness(), # checks hallucination
mlflow.metrics.toxicity(), # via Hugging Face evaluate lib
mlflow.metrics.rouge1(), # classic summarisation metric
],
)
# All metrics are automatically logged to the current MLflow run.
# Access the full per-row breakdown via the EvaluationResult object.
print(results.metrics)
print(results.tables["eval_results_table"].head())
mlflow.evaluate() in your CI/CD pipeline on every prompt change. Compare the new run's metrics against the @champion baseline in the MLflow UI to catch quality regressions before deployment.Pitfall 1: Ignoring the Tracking URI
By default, MLflow writes runs to ./mlruns — a local directory on the machine that ran the training script. This is acceptable for solo development but breaks immediately in team environments: runs are invisible to teammates, data scientists working on different machines have isolated run histories, and notebooks on Databricks or SageMaker write to ephemeral storage that disappears with the cluster.
./mlruns in a team environment. Your CI/CD server, Databricks cluster, and local machine will each have a different, disconnected ./mlruns folder.Production Tracking Server Setup (AWS)
# server_setup.sh — launch a production MLflow tracking server
# Prerequisites: PostgreSQL RDS instance, S3 bucket for artifacts.
# The backend-store-uri connects to PostgreSQL for run metadata.
# The default-artifact-root stores model files on S3.
mlflow server \
--backend-store-uri postgresql://mlflow_user:$PG_PASS@rds.internal:5432/mlflow \
--default-artifact-root s3://my-mlflow-artifacts/runs \
--host 0.0.0.0 \
--port 5000
---
# In every training script or notebook, set the URI explicitly:
import mlflow, os
# Prefer reading from an environment variable so the same code works
# in local dev (pointing to a local Docker server) and in prod.
mlflow.set_tracking_uri(os.environ.get(
"MLFLOW_TRACKING_URI",
"http://mlflow-server.internal:5000" # fallback
))
# Alternatively, set this environment variable in your shell profile
# or container spec so no code changes are required:
# export MLFLOW_TRACKING_URI=http://mlflow-server.internal:5000
Pitfall 2: Forgetting Dependencies
A model that works in the registry but crashes during deployment is one of the most time-consuming production issues in ML. The root cause is almost always a dependency mismatch: the model was trained with scikit-learn==1.3.2 but the serving container has scikit-learn==1.4.0, which changed the pickle serialization format for certain estimators.
mlflow.sklearn.log_model(model, "model") without a pip_requirements or conda_env argument will auto-detect dependencies, but this detection is not always complete for complex dependency trees.# dependency_logging.py — explicit environment pinning
import mlflow
import mlflow.sklearn
import subprocess, sys
def get_pip_freeze() -> str:
"""Capture the complete pip environment — the gold standard for reproducibility."""
result = subprocess.run(
[sys.executable, "-m", "pip", "freeze"],
capture_output=True, text=True, check=True
)
return result.stdout
with mlflow.start_run():
# Log the full pip freeze as an artifact — human readable and diffable.
mlflow.log_text(get_pip_freeze(), "requirements_freeze.txt")
# Pass a curated requirements list to log_model.
# This is embedded in the MLmodel manifest and used when serving.
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
pip_requirements=[
"scikit-learn==1.4.2",
"pandas==2.2.1",
"numpy==1.26.4",
],
# Alternatively, point to a requirements.txt file:
# pip_requirements="requirements.txt",
)
# For conda-based environments (useful when CUDA versions matter):
conda_env = {
"name": "mlflow-model-env",
"channels": ["defaults", "conda-forge"],
"dependencies": [
"python=3.10.12",
"pip",
{"pip": ["scikit-learn==1.4.2", "mlflow==2.12.1"]},
],
}
mlflow.sklearn.log_model(sk_model=model, artifact_path="model_conda", conda_env=conda_env)
pip_requirements, not ranges. Use pip freeze as the audit trail. Add a CI step that loads the registered model in a clean Docker container to validate it can be deserialized before promoting to @champion.Pitfall 3: Bloated Artifacts
It is tempting to log the entire training dataset as an artifact on every run — "for reproducibility." In practice, logging a 2GB Parquet file to S3 on every training run consumes storage aggressively, slows down the MLflow UI, and creates a false sense of data lineage (you're tracking the file, not its version or transformation history).
mlflow.log_artifact("training_data_2gb.parquet") on every run. After 100 training runs you have 200GB of redundant data in your artifact store and a $400/month S3 bill for it.# data_versioning.py — reference data, don't copy it
import mlflow
import hashlib
# ── Anti-pattern (DON'T do this for large datasets) ──────────
# mlflow.log_artifact("training_data_2gb.parquet") # ← Uploads the file!
# ── Pattern 1: Log a pointer + checksum, not the data itself ─
def file_sha256(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
DATA_PATH = "s3://data-lake/iris/train/2026-03-01.parquet"
LOCAL_COPY = "/tmp/train.parquet" # downloaded for training, not logged
with mlflow.start_run():
# Log a lightweight metadata dict — no file upload.
mlflow.log_dict({
"data_uri": DATA_PATH,
"data_sha256": file_sha256(LOCAL_COPY),
"row_count": 150,
"partition": "2026-03-01",
}, "data_provenance.json")
mlflow.log_param("data_version", "2026-03-01")
# ── Pattern 2: DVC integration ────────────────────────────────
# DVC manages data versions in Git. Log the DVC commit hash to MLflow
# so the training run is pinned to an exact data version.
import subprocess
dvc_hash = subprocess.check_output(
["dvc", "get", "--show-url", ".", "data/train.parquet"],
text=True
).strip()
with mlflow.start_run():
mlflow.log_param("dvc_data_hash", dvc_hash)
# Fetch the data separately; DVC handles version resolution.
subprocess.run(["dvc", "pull", "data/train.parquet"], check=True)
| What to Log as Artifact | What NOT to Log as Artifact | Alternative |
|---|---|---|
| Trained model weights | Raw training datasets (>50MB) | Log S3 URI + checksum in log_dict() |
| Confusion matrix plot (PNG) | Full validation set predictions CSV | Log aggregate metrics only |
| Feature importance CSV (<100KB) | Tokenized text dataset | Log DVC hash or S3 pointer |
| Prompt templates / system prompts | Entire embedding matrix | Log vector DB collection name & version |
| Data schema / column definitions | Intermediate model checkpoints | Keep only final checkpoint |