1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| import logging import time from kubeflow.trainer import TrainerClient # 根据之前的调试,显式从 types 导入以确保兼容性 from kubeflow.trainer.types.types import CustomTrainer
# 配置日志 logging.basicConfig(level=logging.INFO)
# ========================================== # 1. 定义训练逻辑 (将在 K8s Pod 内部运行) # ========================================== def qwen_lora_func(): import os import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import LoraConfig, get_peft_model, TaskType
# 获取环境变量 (由 SDK 注入) rank = os.environ.get('RANK', '0') print(f"🚀 [Pod-Node-{rank}] Training Environment Initialized!")
# 1. 定义模型 (Qwen 0.5B) model_id = "Qwen/Qwen2.5-0.5B-Instruct" try: print(f"📦 [Pod] Loading Tokenizer...") tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) print(f"📦 [Pod] Loading Model (Force CPU & float32)...") # ⚠️ 关键:为了适应 4G 内存且无 GPU,必须用 CPU + float32 model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float32, device_map="cpu", trust_remote_code=True ) print("⚙️ [Pod] Applying LoRA Adapter...") peft_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=8, lora_alpha=32, target_modules=['q_proj', 'v_proj'], lora_dropout=0.05 ) model = get_peft_model(model, peft_config) # 打印可训练参数,证明 LoRA 挂载成功 print("✅ [Pod] LoRA Applied Successfully! Trainable Parameters:") model.print_trainable_parameters() # 模拟训练循环 (不进行真实 Backward 以免 OOM) print("🔄 [Pod] Simulating training step...") time.sleep(2) print("✅ [Pod] Task Completed Successfully.") except Exception as e: print(f"❌ [Pod] Critical Error: {e}") raise e
# ========================================== # 2. 定义提交逻辑 (在 M1 Mac 上运行) # ========================================== def submit_job(): print("🔌 Initializing Kubeflow Trainer Client...") client = TrainerClient()
print("📦 Configuring Job (Single-Node CPU Mode)...") trainer = CustomTrainer( # A. 核心逻辑 func=qwen_lora_func, # B. 动态依赖 (Pod 启动时自动安装) packages_to_install=[ "transformers", "peft", "torch", "accelerate", "datasets", "bitsandbytes", "tiktoken", "sentencepiece" ], # C. 节点规模 (必须为 1) num_nodes=1, # D. 环境变量 (⚠️ 核心修复:强制单进程防止 OOM) env={ "PET_NPROC_PER_NODE": "1", "OMP_NUM_THREADS": "1", "HF_ENDPOINT": "https://hf-mirror.com", # <--- 加上这一行!使用国内镜像 # "HF_TOKEN": "xxx" }, # E. 资源限制 (适配 2C/4G 集群) resources_per_node={ "cpu": "1.5", # 留 0.5 给系统 "memory": "3Gi", # 留 1G 给系统 "gpu": "0" } )
print("🚀 Submitting to Kubernetes...") try: job_id = client.train(trainer=trainer) print("\n" + "="*50) print(f"✅ Job Submitted! ID: {job_id}") print("="*50) print("👉 Monitor logs with:") print(f" kubectl logs -n kubeflow-system -f {job_id}-worker-0") print("="*50)
except Exception as e: print(f"\n❌ Submission Failed: {e}") if "Runtime" in str(e): print("\n[Hint] Check if 'ClusterTrainingRuntime' is installed in your cluster.")
if __name__ == "__main__": submit_job()
|