Kubeflow MLOps
Handbook
End-to-end reference for building, training, and serving ML models on Kubernetes using Kubeflow Pipelines v2, Training Operator, and KServe. Optimized for GKE production environments with AlloyDB, BigQuery, and PubSub integrations.
What is Kubeflow?
Kubeflow is the open-source MLOps platform for Kubernetes — it makes running ML workflows on Kubernetes simple, portable, and scalable. Originally developed by Google, it is now a CNCF incubating project with contributions from Google, IBM, AWS, RedHat, and the broader community.
Kubeflow is not a single tool — it is a collection of components that compose into a full ML platform: pipeline orchestration, distributed training, model serving, experiment tracking, and a unified web UI. Each component can be deployed independently or as a full stack.
Platform Architecture
Kubeflow's architecture layers Kubernetes-native components into a cohesive ML platform. Understanding these layers is key to operating it effectively in production.
┌─────────────────────────────────────────────────────────────────┐ │ KUBEFLOW CENTRAL DASHBOARD │ │ (Istio Gateway + Dex OIDC Auth) │ ├──────────────┬──────────────┬──────────────┬────────────────────┤ │ PIPELINES v2│ NOTEBOOKS │ KATIB │ TRAINING OP │ │ (Argo WF) │ (JupyterHub)│ (AutoML) │ (PyTorch/TF) │ ├──────────────┴──────────────┴──────────────┴────────────────────┤ │ KSERVE (Model Serving) │ │ (Knative Serving + Istio + Cert-Manager) │ ├─────────────────────────────────────────────────────────────────┤ │ ML METADATA (MLMD) — MySQL / Cloud SQL │ ├─────────────────────────────────────────────────────────────────┤ │ KUBERNETES (GKE / EKS / AKS / Kind) │ └─────────────────────────────────────────────────────────────────┘
Request Flow: Pipeline Execution
Key Infrastructure Dependencies
| Dependency | Purpose | GKE Production Recommendation |
|---|---|---|
| Istio | Service mesh, traffic management, mTLS | Istio 1.17+ via ASM (Anthos Service Mesh) |
| Argo Workflows | Pipeline step execution engine | Bundled with KFP — do not install separately |
| Dex | OIDC identity provider for Dashboard auth | Replace with GCP IAP for production |
| Knative | Serverless framework for KServe | Knative 1.10+ on GKE |
| Cert-Manager | TLS certificate automation | v1.12+ with GCP-managed certs preferred |
| MySQL / Cloud SQL | MLMD metadata backend, pipeline store | Cloud SQL (Postgres) for production HA |
| MinIO / GCS | Artifact storage | GCS bucket (no MinIO needed on GKE) |
Core Components
Kubeflow Pipelines (KFP) v2
The heart of Kubeflow. Define ML workflows as Python functions decorated with @component and @pipeline. KFP compiles them to Argo Workflow YAML and submits to the cluster.
Training Operator
Provides Kubernetes CRDs for distributed ML training: PyTorchJob, TFJob, MXJob, PaddleJob, XGBoostJob, and MPIJob. Handles pod lifecycle, failure recovery, and gang scheduling.
KServe (Model Serving)
Formerly KFServing. Provides serverless model serving via Knative with support for TorchServe, Triton, MLflow, SKLearn, XGBoost, and custom runtimes. Supports canary, shadow, and A/B deployments natively.
Katib (AutoML / HPO)
Hyperparameter tuning and Neural Architecture Search (NAS) as Kubernetes CRDs. Supports Bayesian optimization, grid search, random search, and Hyperband. Integrates directly with Pipelines.
ML Metadata (MLMD)
Google's ML Metadata library stores artifact lineage — every dataset, model, metric, and execution is recorded with parent-child relationships. Enables full reproducibility audits.
vs Alternatives
| Feature | Kubeflow | MLflow | Vertex AI | Airflow + MLflow | ZenML |
|---|---|---|---|---|---|
| K8s Native | ✅ First-class | ⚠️ Via plugins | ✅ (managed) | ⚠️ Partial | ✅ |
| Pipeline DAGs | ✅ KFP v2 | Limited | ✅ | ✅ Airflow | ✅ |
| Distributed Training | ✅ Native CRDs | ⚠️ Manual | ✅ | ❌ | ⚠️ |
| Model Serving | ✅ KServe | ✅ MLflow Server | ✅ | ❌ | ⚠️ |
| Experiment Tracking | ✅ MLMD | ✅ Best-in-class | ✅ | ✅ MLflow | ✅ |
| Self-Hosted | ✅ | ✅ | ❌ GCP only | ✅ | ✅ |
| Complexity | High | Low | Low (managed) | Medium | Medium |
| GKE Integration | Best | Good | Native | Good | Good |
Kubeflow is ideal when you already operate Kubernetes (especially GKE), need full infrastructure control, require distributed training at scale, or want to keep all ML tooling in your own cluster. For simpler use cases or teams new to Kubernetes, MLflow + Vertex AI Pipelines may be more productive.
Prerequisites
Kubernetes Requirements
- Kubernetes 1.26–1.29
- kubectl matching cluster version
- kustomize 5.0+
- Istio 1.17+ or ASM
- Cert-Manager 1.12+
- Minimum cluster: 3 nodes × 4 vCPU / 16 GB RAM
GKE Specifics
- GKE Standard (not Autopilot — Istio complications)
- Workload Identity enabled
- GCS bucket for artifact storage
- Cloud SQL (PostgreSQL) for metadata
- Artifact Registry for pipeline images
- GPU node pool if running training jobs
# Check Kubernetes version kubectl version --short # Check kustomize kustomize version # Must be 5.0+ # Check Istio (if pre-installed) istioctl version # Check cert-manager kubectl get pods -n cert-manager # Check available resources kubectl describe nodes | grep -A5 "Allocatable"
GKE Installation
# Create GKE Standard cluster with Workload Identity gcloud container clusters create kubeflow-prod \ --zone us-central1-a \ --num-nodes 3 \ --machine-type n2-standard-8 \ --workload-pool=$(gcloud config get-value project).svc.id.goog \ --addons HorizontalPodAutoscaling,HttpLoadBalancing \ --release-channel regular \ --disk-size 100GB # Add GPU node pool (optional — for training) gcloud container node-pools create gpu-pool \ --cluster kubeflow-prod \ --zone us-central1-a \ --num-nodes 1 \ --machine-type n1-standard-8 \ --accelerator type=nvidia-tesla-t4,count=1 \ --disk-size 200GB # Get kubeconfig gcloud container clusters get-credentials kubeflow-prod \ --zone us-central1-a
# Clone official manifests git clone https://github.com/kubeflow/manifests.git cd manifests git checkout v1.8.0 # Pin to stable release # Install cert-manager first kustomize build common/cert-manager/cert-manager/base | kubectl apply -f - kubectl wait --for=condition=ready pod -l app=cert-manager \ -n cert-manager --timeout=180s # Install Istio kustomize build common/istio-1-17/istio-crds/base | kubectl apply -f - kustomize build common/istio-1-17/istio-namespace/base | kubectl apply -f - kustomize build common/istio-1-17/istio-install/base | kubectl apply -f - # Install Dex (identity provider) kustomize build common/dex/overlays/istio | kubectl apply -f - # Install OIDC AuthService kustomize build common/oidc-client/oidc-authservice/base | kubectl apply -f - # Install Knative (for KServe) kustomize build common/knative/knative-serving/overlays/gateways | kubectl apply -f - # Install full Kubeflow (takes ~5 minutes) while ! kustomize build example | kubectl apply -f -; do echo "Retrying..."; sleep 20 done # Watch all pods come up kubectl get pods -n kubeflow --watch
PROJECT=$(gcloud config get-value project)
BUCKET="gs://${PROJECT}-kubeflow-artifacts"
# Create GCS bucket
gsutil mb -l us-central1 $BUCKET
# Create GCP Service Account
gcloud iam service-accounts create kubeflow-pipelines-sa \
--display-name "Kubeflow Pipelines SA"
# Grant GCS access
gsutil iam ch serviceAccount:kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com:roles/storage.admin $BUCKET
# Bind Kubernetes SA to GCP SA (Workload Identity)
gcloud iam service-accounts add-iam-policy-binding \
kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:${PROJECT}.svc.id.goog[kubeflow/pipeline-runner]"
# Annotate the Kubernetes service account
kubectl annotate serviceaccount -n kubeflow pipeline-runner \
iam.gke.io/gcp-service-account=kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com
The kustomize build ... | kubectl apply -f - pattern may fail on first run due to CRD timing. The while ! ... do ... done retry loop handles this. Typically requires 2–3 attempts before all CRDs register and resources apply cleanly.
Local Install (Kind / Minikube)
For local development and testing, Kind (Kubernetes in Docker) is the fastest path to a Kubeflow environment. Not recommended for training jobs — use for pipeline authoring and testing only.
# Install Kind curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.22.0/kind-linux-amd64 chmod +x ./kind && sudo mv ./kind /usr/local/bin/ # Create Kind cluster with enough resources cat > kind-config.yaml <<EOF kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: - role: control-plane kubeadmConfigPatches: - | kind: InitConfiguration nodeRegistration: kubeletExtraArgs: node-labels: "ingress-ready=true" extraPortMappings: - containerPort: 80 hostPort: 8080 - containerPort: 443 hostPort: 8443 EOF kind create cluster --config kind-config.yaml --name kubeflow-local # Deploy Kubeflow (same kustomize method) # Tip: use --validate=false to skip some admission webhooks on local kustomize build example | kubectl apply --validate=false -f - # Port-forward Dashboard kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80
If you only need Pipelines (not the full Kubeflow stack), deploy KFP standalone: kustomize build apps/pipeline/upstream/env/platform-agnostic-multi-user | kubectl apply -f -. This skips Notebooks, KServe, and Katib — much faster to stand up and suitable for pipeline development.
Pipeline Concepts
KFP v2 represents a significant rearchitecting from v1. Pipelines are defined entirely in Python using the kfp SDK, compiled to PipelineSpec (IR YAML), and executed via Argo Workflows on the cluster.
@component. Gets its own Docker image, resource requests, and I/O specification.@pipeline. Describes how data flows between components via typed inputs/outputs.v1 vs v2 Key Differences
| Feature | KFP v1 | KFP v2 |
|---|---|---|
| IR Format | Argo YAML directly | PipelineSpec JSON/YAML (IR) |
| Artifacts | Untyped strings | Typed (Dataset, Model, Metrics…) |
| Component Base Image | packages_to_install | base_image + pip_packages |
| Parallelism | Manual with ExitHandler | Native parallel_for + Condition |
| SDK Import | kfp.v2 or kfp | kfp (v2 is default from 2.x) |
| Caching | Fingerprint-based | Improved with task.set_caching_options() |
Component SDK
Lightweight Python Components
The simplest component type — Python function decorated with @component. KFP builds a minimal container image automatically. Best for quick utility steps.
from kfp import dsl from kfp.dsl import Dataset, Output, component @component( base_image="python:3.11-slim", packages_to_install=["pandas==2.1.0", "google-cloud-bigquery==3.13.0"], ) def ingest_from_bigquery( project_id: str, dataset_id: str, table_id: str, output_dataset: Output[Dataset], row_limit: int = 100000, ): """Fetch training data from BigQuery and save as CSV.""" from google.cloud import bigquery import pandas as pd client = bigquery.Client(project=project_id) query = f""" SELECT * FROM `{project_id}.{dataset_id}.{table_id}` LIMIT {row_limit} """ df = client.query(query).to_dataframe() df.to_csv(output_dataset.path, index=False) output_dataset.metadata["rows"] = len(df) output_dataset.metadata["source"] = f"{project_id}.{dataset_id}.{table_id}" print(f"Ingested {len(df):,} rows from BigQuery")
Container Components (Custom Images)
For heavy dependencies (PyTorch, scikit-learn), use @component with a pre-built image from Artifact Registry. This keeps compile times fast and images reproducible.
from kfp.dsl import Dataset, Input, Model, Output, Metrics, component @component( base_image="us-central1-docker.pkg.dev/my-project/ml-images/trainer:latest", ) def train_sklearn_model( train_data: Input[Dataset], model_output: Output[Model], metrics_output: Output[Metrics], n_estimators: int = 100, max_depth: int = 6, learning_rate: float = 0.1, ): import pandas as pd import joblib from sklearn.ensemble import GradientBoostingClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, roc_auc_score df = pd.read_csv(train_data.path) X, y = df.drop("target", axis=1), df["target"] X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) clf = GradientBoostingClassifier( n_estimators=n_estimators, max_depth=max_depth, learning_rate=learning_rate ) clf.fit(X_train, y_train) accuracy = accuracy_score(y_val, clf.predict(X_val)) auc = roc_auc_score(y_val, clf.predict_proba(X_val)[:, 1]) joblib.dump(clf, model_output.path) model_output.metadata["framework"] = "sklearn" model_output.metadata["accuracy"] = accuracy metrics_output.log_metric("accuracy", accuracy) metrics_output.log_metric("roc_auc", auc) print(f"Accuracy: {accuracy:.4f} | AUC: {auc:.4f}")
Authoring Pipelines
import kfp from kfp import dsl, compiler from kfp.dsl import Condition, ParallelFor from components.data_ingest import ingest_from_bigquery from components.train_model import train_sklearn_model from components.evaluate import evaluate_model from components.register import register_model @dsl.pipeline( name="ml-training-pipeline", description="End-to-end ML training pipeline with conditional registration", ) def ml_pipeline( project_id: str = "my-gcp-project", dataset_id: str = "ml_features", table_id: str = "training_data_v2", accuracy_threshold: float = 0.85, n_estimators: int = 200, ): # Step 1: Ingest from BigQuery ingest_task = ingest_from_bigquery( project_id=project_id, dataset_id=dataset_id, table_id=table_id, ) ingest_task.set_display_name("Ingest Training Data") # Step 2: Train model train_task = train_sklearn_model( train_data=ingest_task.outputs["output_dataset"], n_estimators=n_estimators, ) train_task.set_display_name("Train GBM Model") train_task.set_cpu_request("2").set_memory_request("8G") # Step 3: Evaluate eval_task = evaluate_model( model=train_task.outputs["model_output"], test_data=ingest_task.outputs["output_dataset"], ) # Step 4: Conditional registration — only if accuracy threshold met with Condition( eval_task.outputs["accuracy"] >= accuracy_threshold, name="accuracy-gate", ): register_task = register_model( model=train_task.outputs["model_output"], metrics=eval_task.outputs["metrics_output"], model_name="gbm-classifier", ) register_task.set_display_name("Register to Model Registry") # Compile pipeline to YAML if __name__ == "__main__": compiler.Compiler().compile( pipeline_func=ml_pipeline, package_path="ml_pipeline.yaml", ) # Submit to Kubeflow client = kfp.Client(host="http://localhost:8080") run = client.create_run_from_pipeline_func( ml_pipeline, arguments={"project_id": "my-gcp-project", "n_estimators": 300}, run_name="training-run-v2", experiment_name="gbm-experiments", ) print(f"Run URL: {run.run_url}")
Resource Requests & Node Selectors
# CPU/Memory requests and limits task.set_cpu_request("4").set_cpu_limit("8") task.set_memory_request("16G").set_memory_limit("32G") # GPU — attach to specific node pool task.set_accelerator_type("nvidia.com/gpu").set_accelerator_limit("1") # Node selector — target GPU node pool task.add_node_selector_constraint( "cloud.google.com/gke-nodepool", "gpu-pool" ) # Toleration for tainted GPU nodes from kubernetes.client.models import V1Toleration task.add_toleration(V1Toleration( key="nvidia.com/gpu", operator="Exists", effect="NoSchedule" )) # Environment variables task.set_env_variable("GOOGLE_CLOUD_PROJECT", "my-project") # Retry policy task.set_retry(num_retries=2, backoff_duration="30s", backoff_max_duration="5m")
Artifacts & Metadata
KFP v2 has a first-class artifact system. All inter-component data larger than a simple value should be passed as a typed artifact — this enables MLMD lineage tracking, caching, and the visual artifact comparison in the UI.
| Artifact Type | Use Case | Stored In |
|---|---|---|
Dataset | CSV, Parquet, TFRecord files | GCS path |
Model | Saved model weights (pkl, pt, savedmodel) | GCS path |
Metrics | Scalar metrics (accuracy, loss, AUC) | MLMD + GCS |
ClassificationMetrics | Confusion matrix, ROC curves | MLMD + GCS |
HTML | Visualizations rendered in UI | GCS path |
Markdown | Reports rendered in UI | GCS path |
from kfp.dsl import ClassificationMetrics, Output, component @component(base_image="python:3.11-slim", packages_to_install=["scikit-learn"]) def evaluate_with_confusion_matrix( model_path: str, test_data_path: str, metrics: Output[ClassificationMetrics], ): import joblib, pandas as pd from sklearn.metrics import confusion_matrix clf = joblib.load(model_path) df = pd.read_csv(test_data_path) X, y = df.drop("target", axis=1), df["target"] y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) metrics.log_confusion_matrix( categories=["Class-0", "Class-1"], matrix=cm.tolist() ) metrics.log_roc_curve( fpr=[0.0, 0.1, 1.0], tpr=[0.0, 0.9, 1.0], threshold=[1.0, 0.5, 0.0] )
Caching & Reuse
KFP caches step outputs based on a fingerprint of: component image, command, arguments, and input artifact URIs. If a cache hit is found, the step is skipped and previous outputs are reused — critical for iterative development on expensive data steps.
# Enable caching per task (default: enabled) ingest_task.set_caching_options(enable_caching=True) # Disable caching — force re-execution always train_task.set_caching_options(enable_caching=False) # Disable caching for entire run at submission time client.create_run_from_pipeline_func( ml_pipeline, arguments={}, enable_caching=False, # Override all component settings ) # Force cache invalidation — bump a dummy parameter # Useful when you update the component code but inputs haven't changed ingest_task.set_caching_options(enable_caching=True) ingest_task.set_env_variable("CACHE_BUST", "v2") # Changes fingerprint
Enable caching on slow, stable steps (BigQuery ingest, feature engineering). Disable on steps that must always run fresh (model registration, notification steps). This pattern cuts iteration time from hours to minutes when debugging downstream steps.
Training Operator
The Kubeflow Training Operator provides Kubernetes CRDs for running distributed ML training jobs. It manages pod lifecycle, handles failures, supports gang scheduling, and integrates with Kubernetes-native features like resource quotas and node selectors.
| CRD | Framework | Distribution Strategy |
|---|---|---|
PyTorchJob | PyTorch | DDP, RPC, Gloo, NCCL |
TFJob | TensorFlow | ParameterServer, MultiWorkerMirroredStrategy |
MPIJob | Horovod / MPI | AllReduce via MPI |
PaddleJob | PaddlePaddle | Collective, Parameter Server |
XGBoostJob | XGBoost | Rabit AllReduce |
# Install Training Operator (standalone) kubectl apply -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=v1.7.0" # Verify kubectl get pods -n kubeflow | grep training-operator
PyTorchJob Reference
apiVersion: kubeflow.org/v1 kind: PyTorchJob metadata: name: arthavidya-finetune namespace: kubeflow spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: spec: containers: - name: pytorch image: us-central1-docker.pkg.dev/my-project/ml/trainer:latest command: - python - -m - torch.distributed.launch - --nproc_per_node=1 - train.py resources: requests: memory: "16Gi" cpu: "4" nvidia.com/gpu: 1 limits: nvidia.com/gpu: 1 env: - name: WANDB_PROJECT value: arthavidya - name: GOOGLE_CLOUD_PROJECT valueFrom: fieldRef: fieldPath: metadata.namespace nodeSelector: cloud.google.com/gke-nodepool: gpu-pool tolerations: - key: nvidia.com/gpu operator: Exists effect: NoSchedule Worker: replicas: 2 # 2 additional worker nodes restartPolicy: OnFailure template: spec: # same as Master spec above
# Submit job kubectl apply -f pytorch_job.yaml # Check status kubectl get pytorchjob arthavidya-finetune -n kubeflow # Watch pod logs (master node) kubectl logs -f arthavidya-finetune-master-0 -n kubeflow # Wait for completion kubectl wait pytorchjob/arthavidya-finetune \ --for=condition=Succeeded -n kubeflow --timeout=7200s # Delete after completion kubectl delete pytorchjob arthavidya-finetune -n kubeflow
PyTorchJob inside a KFP Pipeline
from kubeflow.training import PyTorchJobOp from kubernetes.client.models import ( V1Container, V1ResourceRequirements, V1PodSpec, V1PodTemplateSpec ) from kubeflow.training import V1ReplicaSpec @dsl.pipeline(name="fine-tuning-pipeline") def finetuning_pipeline(model_name: str = "qwen2.5-7b"): pytorch_job_op = PyTorchJobOp( name="finetune-job", namespace="kubeflow", worker_replicas=2, master_spec=V1ReplicaSpec( replicas=1, restart_policy="OnFailure", template=V1PodTemplateSpec( spec=V1PodSpec(containers=[ V1Container( name="pytorch", image="gcr.io/my-project/trainer:latest", command=["python", "train.py"], args=[f"--model={model_name}"], ) ]) ) ) )
Distributed Training Patterns
For DDP across multiple nodes on GKE, ensure all Training Operator pods are on the same VPC subnet, set NCCL_SOCKET_IFNAME=eth0, and use torch.distributed.launch with the master address injected as MASTER_ADDR env var by the Training Operator automatically.
KServe Overview
KServe is Kubeflow's model serving solution, built on Knative and Istio. It abstracts model deployment into a simple InferenceService CRD and handles autoscaling (including scale-to-zero), traffic routing, multi-model serving, and model explainability.
InferenceService Deployment
apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: gbm-classifier namespace: kubeflow annotations: sidecar.istio.io/inject: "true" spec: predictor: minReplicas: 1 maxReplicas: 5 scaleTarget: 50 # Requests per pod before scaling scaleMetric: rps sklearn: storageUri: "gs://my-project-kubeflow/models/gbm-classifier/v1" runtimeVersion: "1.3.0" resources: requests: cpu: "500m" memory: "2Gi" limits: cpu: "2" memory: "4Gi" transformer: # Optional: pre/post processing containers: - image: gcr.io/my-project/feature-transformer:latest name: transformer
# Get InferenceService URL MODEL_URL=$(kubectl get inferenceservice gbm-classifier \ -n kubeflow -o jsonpath='{.status.url}') # Invoke prediction (v2 KServe protocol) curl -X POST "$MODEL_URL/v2/models/gbm-classifier/infer" \ -H "Content-Type: application/json" \ -d '{ "inputs": [{ "name": "input-0", "shape": [1, 10], "datatype": "FP32", "data": [[0.5, 1.2, 0.8, 0.3, 1.1, 0.9, 0.4, 0.7, 1.3, 0.6]] }] }'
Canary Deployments
apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: gbm-classifier spec: predictor: canaryTrafficPercent: 20 # 20% → canary, 80% → default sklearn: storageUri: "gs://my-project/models/gbm-classifier/v2" # New version
# Check canary status kubectl get inferenceservice gbm-classifier -n kubeflow -o yaml | grep -A10 canary # Promote canary to 100% (full rollout) kubectl patch inferenceservice gbm-classifier -n kubeflow \ --type=json \ -p '[{"op":"remove","path":"/spec/predictor/canaryTrafficPercent"}]' # Rollback canary (set to 0%) kubectl patch inferenceservice gbm-classifier -n kubeflow \ --type=merge \ -p '{"spec":{"predictor":{"canaryTrafficPercent":0}}}'
Experiment Tracking
Kubeflow uses ML Metadata (MLMD) for lineage tracking. For richer experiment dashboards, integrate Weights & Biases (W&B) or MLflow — they complement MLMD rather than replace it.
@component( base_image="pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime", packages_to_install=["wandb", "unsloth"], ) def finetune_with_wandb( dataset_uri: str, model_name: str, wandb_project: str, wandb_api_key_secret: str, # K8s secret name output_model: Output[Model], ): import os, wandb from unsloth import FastLanguageModel # W&B init — API key from K8s secret (mounted as env var) wandb.init( project=wandb_project, config={"model": model_name, "dataset": dataset_uri}, tags=["kubeflow", "unsloth"], ) # ... training code ... # Metrics are automatically logged to both W&B and MLMD via output artifacts wandb.finish()
Never hardcode API keys. Store secrets as Kubernetes Secrets and mount them as environment variables in your component pod spec. Use task.set_env_variable_from_secret("WANDB_API_KEY", "wandb-secret", "api-key") in KFP v2.
Model Registry
Kubeflow 1.8 introduces a Model Registry component — a centralized catalog for versioned ML models with metadata, lineage, and deployment state. Models flow: training → registry → serving.
from model_registry import ModelRegistry from model_registry.types import RegisteredModel, ModelVersion, ModelArtifact # Connect to Kubeflow Model Registry registry = ModelRegistry( server_address="http://model-registry-service.kubeflow.svc.cluster.local", port=8080, author="ml-pipeline", ) # Register a new model version registered_model = registry.register_model( name="gbm-classifier", uri="gs://my-project/models/gbm-classifier/v2", model_format_name="sklearn", model_format_version="1.3", version="2.0.0", description="GBM trained on Q3 2025 features, AUC=0.94", metadata={ "accuracy": "0.912", "auc": "0.940", "training_data": "bq://my-project.ml_features.training_v3", "pipeline_run_id": "run-abc123", }, ) print(f"Registered: {registered_model.id}")
CI/CD Integration
A mature MLOps CI/CD pipeline triggers on data changes or code commits, compiles the KFP pipeline, runs tests, and submits a new training run automatically.
name: ML Pipeline CI/CD on: push: branches: [main] paths: ['components/**', 'pipeline.py'] jobs: pipeline-ci: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v5 with: {python-version: '3.11'} - name: Install dependencies run: pip install kfp==2.7.0 pytest - name: Run component unit tests run: pytest tests/ -v - name: Compile pipeline run: python pipeline.py --compile-only - name: Auth to GKE uses: google-github-actions/auth@v2 with: workload_identity_provider: ${{ secrets.WIF_PROVIDER }} service_account: ${{ secrets.SA_EMAIL }} - name: Build and push component images run: | gcloud builds submit --tag \ us-central1-docker.pkg.dev/$PROJECT/ml/trainer:${{ github.sha }} - name: Submit pipeline run run: | python scripts/submit_pipeline.py \ --host ${{ secrets.KFP_HOST }} \ --experiment "ci-runs" \ --run-name "ci-${{ github.sha }}"
Multi-Environment Pattern
Your GKE infrastructure spans dev, QA, stress, and prod environments. The recommended pattern for Kubeflow is namespace-per-environment on a shared cluster, or cluster-per-environment for strict isolation.
| Environment | Pattern | Resource Quota | Pipeline Runs |
|---|---|---|---|
| Dev | Namespace on shared cluster | Low (4 CPU, 16 GB) | Fast, small datasets |
| QA | Namespace on shared cluster | Medium (8 CPU, 32 GB) | Full dataset, eval gates |
| Stress | Namespace or separate cluster | High (GPU node pool) | Load testing serving |
| Prod | Separate GKE cluster | Unrestricted (autoscale) | Triggered by CI/CD only |
import kfp, os ENV = os.getenv("DEPLOY_ENV", "dev") KFP_HOSTS = { "dev": "http://kubeflow.dev.internal", "qa": "http://kubeflow.qa.internal", "prod": "http://kubeflow.prod.internal", } PIPELINE_CONFIGS = { "dev": {"row_limit": 5000, "n_estimators": 50}, "qa": {"row_limit": 50000, "n_estimators": 100}, "prod": {"row_limit": 500000, "n_estimators": 200}, } client = kfp.Client(host=KFP_HOSTS[ENV]) client.create_run_from_pipeline_func( ml_pipeline, arguments=PIPELINE_CONFIGS[ENV], run_name=f"auto-{ENV}-{os.getenv('GIT_SHA','manual')}", experiment_name=f"{ENV}-experiments", )
GKE IAM & Auth
Kubeflow on GKE uses a layered auth model. Understanding each layer is essential for debugging access issues.
| Layer | Technology | Controls |
|---|---|---|
| Network | Cloud Armor / IAP | Who can reach the Dashboard URL |
| Identity | Dex / IAP + IAM | OIDC token issuance, user identity |
| Kubernetes | RBAC | Namespace access, resource permissions |
| Service Mesh | Istio AuthorizationPolicy | Pod-to-pod traffic, cross-namespace |
| Workload Identity | GCP IAM → K8s SA binding | GCS / BigQuery / PubSub access from pods |
apiVersion: kubeflow.org/v1 kind: Profile metadata: name: ml-team-dev spec: owner: kind: User name: vick@company.com resourceQuotaSpec: hard: cpu: "20" memory: "80Gi" nvidia.com/gpu: "2" persistentvolumeclaims: "10" requests.storage: "500Gi"
PROJECT=$(gcloud config get-value project) # Bind pipeline-runner SA for GCS + BigQuery + PubSub access gcloud projects add-iam-policy-binding $PROJECT \ --member "serviceAccount:kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com" \ --role roles/storage.objectAdmin gcloud projects add-iam-policy-binding $PROJECT \ --member "serviceAccount:kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com" \ --role roles/bigquery.dataViewer gcloud projects add-iam-policy-binding $PROJECT \ --member "serviceAccount:kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com" \ --role roles/pubsub.publisher # Replicate for AlloyDB (Cloud SQL) gcloud projects add-iam-policy-binding $PROJECT \ --member "serviceAccount:kubeflow-pipelines-sa@${PROJECT}.iam.gserviceaccount.com" \ --role roles/cloudsql.client
Troubleshooting
Pipeline Steps Stuck in "Pending"
# Check pod events kubectl describe pod <pipeline-pod-name> -n kubeflow # Common causes: # 1. Insufficient CPU/memory — reduce resource requests # 2. GPU node pool has no free nodes — check node pool autoscaler # 3. Image pull error — check Artifact Registry permissions # 4. PVC binding failure — check StorageClass availability kubectl get events -n kubeflow --sort-by=.lastTimestamp | tail -20
502/503 on Dashboard or KFP API
# Check Istio ingressgateway kubectl get pods -n istio-system kubectl logs -n istio-system deployment/istio-ingressgateway # Check ml-pipeline API server kubectl logs -n kubeflow deployment/ml-pipeline kubectl logs -n kubeflow deployment/ml-pipeline-ui # Check VirtualService routing kubectl get virtualservices -n kubeflow
KServe InferenceService Stuck "Not Ready"
# Get detailed status kubectl describe inferenceservice <name> -n kubeflow # Check Knative revision status kubectl get ksvc -n kubeflow kubectl describe revision <revision-name> -n kubeflow # Common fixes: # 1. GCS model path doesn't exist or SA lacks storage.objectViewer # 2. Wrong model format version # 3. Knative serving webhook not ready # 4. Insufficient node resources for predictor pod
PyTorchJob Workers Crash on NCCL
# Add to your training container env vars: env: - name: NCCL_DEBUG value: INFO - name: NCCL_SOCKET_IFNAME value: eth0 - name: NCCL_IB_DISABLE value: "1" # Disable InfiniBand on GKE (not available) # Ensure pods are on same subnet — check GKE network policy kubectl get networkpolicies -n kubeflow
Pipeline Metadata Not Appearing in UI
# Check MLMD gRPC service kubectl get pods -n kubeflow | grep metadata kubectl logs -n kubeflow deployment/metadata-grpc-deployment # Check MySQL / Cloud SQL connection kubectl logs -n kubeflow deployment/ml-pipeline | grep -i "db\|mysql\|sql" # Verify pipeline root GCS path is accessible gsutil ls gs://your-bucket/pipeline-root/
Quick Cheat Sheet
Essential kubectl Commands
# Get all Kubeflow workloads kubectl get all -n kubeflow # Watch pipeline pods in real-time kubectl get pods -n kubeflow -l workflows.argoproj.io/completed!=true -w # Get pipeline run logs kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers # Forward Dashboard kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80 # Forward KFP API directly (bypass Istio) kubectl port-forward svc/ml-pipeline -n kubeflow 8888:8888 # Get all InferenceServices kubectl get inferenceservice -A # Get all PyTorchJobs kubectl get pytorchjob -n kubeflow # Force delete a stuck pipeline pod kubectl delete pod <pod-name> -n kubeflow --force --grace-period=0 # Scale down/up Kubeflow components (maintenance) kubectl scale deployment ml-pipeline -n kubeflow --replicas=0 kubectl scale deployment ml-pipeline -n kubeflow --replicas=1
KFP Python SDK Quick Reference
import kfp client = kfp.Client(host="http://localhost:8080") # List experiments client.list_experiments() # List runs in experiment client.list_runs(experiment_id="<exp-id>") # Get run details client.get_run(run_id="<run-id>") # Wait for run completion client.wait_for_run_completion(run_id="<run-id>", timeout=3600) # Upload pipeline YAML client.upload_pipeline("ml_pipeline.yaml", pipeline_name="ML Training v2") # Create recurring run client.create_recurring_run( experiment_id="<exp-id>", job_name="nightly-training", cron_expression="0 2 * * *", pipeline_id="<pipeline-id>", )
Component Types Decision Guide
| Situation | Use This |
|---|---|
| Quick Python logic, pure stdlib | @component(base_image="python:3.11") |
| Heavy deps (PyTorch, sklearn) | @component(base_image="custom-image") |
| Wrap existing CLI tool | ContainerOp (v1) or @component with subprocess |
| Distributed training | PyTorchJobOp / Training Operator |
| Parallel grid search | dsl.ParallelFor loop |
| Conditional logic | dsl.Condition block |