1. 项目概述:这不是一次简单的SageMaker演示,而是一场MLOps实践者的压力测试
“Data Acquisition & Exploration: Exploring 5 Key MLOps Questions using AWS SageMaker”——这个标题里藏着的不是五个待回答的理论问题,而是五道横亘在数据科学家和机器学习工程师日常工作中、几乎每天都要面对的现实关卡。我带过十几支从零搭建MLOps流水线的团队,最常听到的抱怨不是“模型不准”,而是“数据又断了”“特征版本对不上”“昨天还跑通的pipeline今天报错说S3路径不存在”“业务方说线上效果下滑,但根本找不到是哪次数据变更导致的”。这五个问题,就是这些抱怨背后最核心的骨架。它们分别是:数据从哪儿来、数据是否可信、数据如何被追踪、数据怎样支撑迭代、数据如何与模型生命周期对齐。AWS SageMaker本身不直接回答这些问题,但它提供了一套可编程、可审计、可自动化的基础设施层,让这五个问题从“靠人盯、靠经验、靠Excel表格管理”的混沌状态,变成可以用代码定义、用日志验证、用API调用的确定性流程。你不需要是AWS认证专家,但必须理解SageMaker中FeatureStore、ProcessingJob、Experiment、Trial和Lineage这几个核心组件之间的咬合逻辑——它们不是孤立的功能按钮,而是一套精密的齿轮组。比如,一个ProcessingJob的输出,会自动成为FeatureStore中某个FeatureGroup的输入源;而这个FeatureGroup的每一次写入,又会触发Lineage图谱中一条新的DataSet节点生成,并关联到下游某个TrainingJob的TrialComponent。这种自动化的血缘关系,才是解决“数据怎么来的”和“模型用的是哪批数据”这两个问题的技术底座。这篇文章,就是我用真实项目复盘的方式,把这套齿轮组拆开、上油、再装回去的过程。它不讲SageMaker控制台怎么点,只讲在JupyterLab里敲下每一行Python SDK代码时,背后发生了什么、为什么必须这么写、以及如果写错了,系统会在哪个环节给你一个明确的错误提示(而不是让你在CloudWatch日志里翻三小时)。适合正在为数据混乱头疼的ML工程师,也适合想把第一个模型真正推上线的数据科学家——因为上线之后,90%的维护成本,都藏在这五个问题的答案里。
2. 核心问题拆解与SageMaker能力映射:为什么这五个问题必须被结构化回答
2.1 问题一:数据从哪儿来?——打破“数据黑箱”,建立可追溯的数据供应链
在传统工作流里,“数据从哪儿来”往往是一句模糊的口头承诺:“找数仓同事要一份ODS层的用户行为表”“爬虫组昨天更新了商品库”。这种依赖人工沟通的数据获取方式,在MLOps语境下是灾难性的。它直接导致两个后果:第一,无法自动化;第二,无法回溯。当模型效果异常时,你无法快速确认“上周五训练用的数据,是不是和前天A/B测试用的数据,来自同一个ETL任务的同一轮输出”。SageMaker对此的解法,不是提供一个更漂亮的数据库连接界面,而是强制将“数据来源”这个概念,编码进每一个计算任务的元数据中。
具体来说,SageMaker通过ProcessingJob的AppSpecification和InputDataConfig两个参数,把数据来源变成了一个可声明、可验证的对象。InputDataConfig里定义的S3Input,其S3Uri字段不仅是一个路径,更是一个指向S3对象版本(Version ID)的精确指针。这意味着,当你在代码里写下:
from sagemaker.sklearn.processing import SKLearnProcessor processor = SKLearnProcessor( framework_version='0.23-1', role=role, instance_type='ml.m5.xlarge', instance_count=1 ) processor.run( code='preprocess.py', inputs=[ ProcessingInput( source='s3://my-bucket/raw-data/2024-06-01/', destination='/opt/ml/processing/input/raw', s3_data_type='S3Prefix', s3_input_mode='File' ) ], outputs=[ ProcessingOutput( output_name='train', source='/opt/ml/processing/output/train/', destination='s3://my-bucket/processed-data/train-20240601/' ) ] )这段代码执行后,SageMaker不仅会启动一个EC2实例运行你的preprocess.py,还会在后台自动生成一条ProcessingJob的元数据记录,其中inputs[0].source字段被精确地记录为's3://my-bucket/raw-data/2024-06-01/',并且这个值会被持久化存储在SageMaker的元数据服务中,而非仅仅存在于你的本地Jupyter Notebook里。更重要的是,如果你的S3桶启用了版本控制(这是强烈建议的),SageMaker会自动捕获该路径下所有对象的版本ID,并将其作为Lineage图谱中DataSet节点的version属性。这就意味着,三个月后你想复现当时的训练过程,你不需要去翻邮件问同事“那天的数据放哪儿了”,你只需要查询这个ProcessingJob的元数据,就能拿到一个精确到字节的S3对象版本列表。我见过太多团队因为没启用S3版本控制,导致在一次误删操作后,只能眼睁睁看着价值百万的标注数据永远丢失。所以,回答“数据从哪儿来”这个问题的第一步,不是写SQL,而是检查你的S3桶配置——aws s3api get-bucket-versioning --bucket my-bucket,确保返回的是{"Status": "Enabled"}。这是整个数据供应链可追溯性的物理基石,没有它,后面所有的自动化都是空中楼阁。
2.2 问题二:数据是否可信?——用特征工程的“出厂检验”替代人工抽查
“数据是否可信”是数据科学家最常被业务方灵魂拷问的问题。一句“我们数据没问题”毫无说服力,而一份长达50页的《数据质量报告》又没人愿意看。SageMaker的解法,是把数据质量验证,变成特征工程流水线中一个不可跳过的、有明确成功/失败信号的标准化步骤。这个步骤的核心,是FeatureStore的IngestionJob和OfflineStore的DataQuality监控能力。
FeatureStore不是一个简单的键值存储。当你创建一个FeatureGroup时,你必须明确定义每个FeatureDefinition的FeatureType(Integral、Fractional或String)和ValueType(例如Integer、Float、String)。这个定义本身,就是一份轻量级的数据契约(Data Contract)。当IngestionJob开始向FeatureGroup写入数据时,SageMaker会实时执行类型校验。如果某条记录的user_age字段传入了一个字符串"N/A",而你在FeatureDefinition里定义的是Integral,那么这条记录会被直接拒绝,并在IngestionJob的日志中留下清晰的错误信息:ValueError: Expected integral type for feature 'user_age', got 'N/A'。这比任何事后的数据质量扫描都更早、更精准地拦截了问题。
但这还不够。类型校验只能保证“格式正确”,不能保证“业务合理”。比如,user_age字段可以是整数,但值为200显然不合理。这时就需要OfflineStore的DataQuality监控。SageMaker允许你为FeatureGroup的OfflineStore配置一个DataQualityMonitoringSchedule,它会定期(例如每天凌晨2点)扫描OfflineStore中的Parquet文件,并计算一系列统计指标:空值率、唯一值数量、数值型字段的均值/标准差/分位数、字符串型字段的长度分布等。关键在于,这些指标的计算结果,会被写入一个你指定的S3位置,并且SageMaker会自动生成一个MonitoringExecution资源,其状态(Completed、Failed、Aborted)就是数据质量的最终判决书。你可以把这个状态,直接集成到你的CI/CD流水线中。例如,在Jenkins或CodePipeline里,添加一个DescribeMonitoringExecution的API调用步骤,如果返回的状态是Failed,就立即中断后续的模型训练任务,并触发告警。我曾经在一个电商推荐项目里,把item_price字段的p95值设为监控阈值。当某次上游ERP系统故障,导致一批商品价格被错误地同步为0.01元时,DataQuality监控在30分钟内就检测到p95值骤降,并自动阻断了当天的特征更新和模型重训,避免了一场可能影响数百万用户的资损事故。所以,“数据是否可信”的答案,不应该是一份静态报告,而应该是一个动态的、能驱动行动的布尔值:True表示可以继续,False表示必须停止并调查。SageMaker提供的,正是这个布尔值的生成引擎。
2.3 问题三:数据怎样被追踪?——构建端到端的血缘图谱,让每一次变更都有迹可循
当模型在生产环境出现偏差(Bias)或漂移(Drift)时,最耗时的环节,从来不是修复模型,而是定位问题根源。是上游数据源的Schema变了?是特征工程脚本里一个隐藏的fillna(0)逻辑被悄悄修改了?还是TrainingJob用的超参数配置文件,其实引用了一个早已过期的FeatureGroup版本?在缺乏血缘追踪的系统里,这个问题的排查,往往需要召集数据工程师、算法工程师、运维工程师开一场长达数小时的“破案会议”。SageMaker的Lineage(血缘)功能,就是为终结这种低效协作而生的。
Lineage的本质,是一个由Artifact(制品)、Action(动作)和Context(上下文)三类节点构成的有向无环图(DAG)。Artifact代表一切有状态的实体:一个S3路径、一个FeatureGroup、一个ModelPackage、甚至是一段Python代码的Git Commit Hash。Action代表一切改变状态的操作:一个ProcessingJob、一个TrainingJob、一个TransformJob。Context则代表环境和意图:一个Experiment(实验)、一个Trial(试验)、一个Project(项目)。SageMaker SDK在你调用estimator.fit()或processor.run()时,会自动为你创建对应的Action节点,并尝试将输入的Artifact(如S3路径)和输出的Artifact(如训练好的模型S3路径)与之关联。但这个“自动”是有前提的:你必须使用SageMaker原生的SDK对象(如SKLearnProcessor,Estimator),而不是自己用boto3手动调用create_training_job。后者虽然也能完成任务,但会完全绕过Lineage系统,让你的血缘图谱变成一片空白。
真正的威力,在于Lineage的可编程性。你可以用boto3的sagemaker.lineage客户端,编写脚本来查询任意一个Artifact的完整上游依赖链。例如,要找出影响某个生产模型的所有数据源,你可以这样写:
from sagemaker.lineage import Artifact, Action, Context # 首先,根据模型的S3 URI,找到它的Artifact model_artifact = Artifact.load( artifact_arn="arn:aws:sagemaker:us-east-1:123456789012:artifact/abc123-model-20240601" ) # 然后,向上遍历所有关联的Action upstream_actions = model_artifact.parents() for action in upstream_actions: # 找出这个Action的输入Artifact input_artifacts = action.inputs() for input_art in input_artifacts: print(f"Input Artifact: {input_art.artifact_name} ({input_art.source.uri})") # 如果这个输入Artifact本身也是一个FeatureGroup,继续向上查 if 'feature-group' in input_art.source.uri: fg_artifact = Artifact.load(artifact_arn=input_art.artifact_arn) fg_upstream = fg_artifact.parents() for fg_action in fg_upstream: print(f" -> FeatureGroup upstream: {fg_action.action_name}")这段代码的输出,会是一张清晰的树状图,从最顶层的原始S3数据桶,一直贯穿到最终的模型文件。它不会告诉你“可能”是哪里出了问题,而是用确凿的ARN(Amazon Resource Name)告诉你“一定”是哪些资源参与了这次构建。我在一个金融风控模型的迭代中,就用这个方法,在5分钟内定位到问题:一个新加入的ProcessingJob,其输入S3路径指向了一个尚未经过DataQuality监控的临时数据集,导致模型学习到了大量异常的credit_score值。没有Lineage,这个排查至少需要两天。所以,“数据怎样被追踪”的答案,不是一张静态的架构图,而是一个随时可以执行的、返回精确ARN列表的Python函数。这才是现代MLOps团队应有的响应速度。
2.4 问题四:数据如何支撑迭代?——用FeatureStore实现特征的“即插即用”与“版本共存”
模型迭代的瓶颈,常常不在算法本身,而在特征。一个新想法,比如“用户过去7天的平均点击率”,需要经历:数据工程师开发ETL脚本、测试、上线;数据科学家等待数据就绪、下载、清洗、建模;最后还要确保线上服务能实时计算这个新特征。整个周期动辄数周。FeatureStore的设计哲学,就是把特征从“一次性计算产物”,变成“可复用、可版本化、可在线/离线统一访问的服务”。
FeatureStore的核心是FeatureGroup。一个FeatureGroup就像一个数据库表,但它有三个独特之处:第一,它同时拥有OnlineStore和OfflineStore。OnlineStore是一个毫秒级响应的NoSQL存储(基于DynamoDB),用于实时推理;OfflineStore是一个低成本、高吞吐的S3数据湖,用于批量训练。第二,FeatureGroup支持RecordIdentifierFeatureName和EventTimeFeatureName,这使得它可以天然地处理时间序列数据和事件驱动的更新。第三,也是最关键的,FeatureGroup支持FeatureGroupName的版本化。你不能直接修改一个已存在的FeatureGroup的Schema,但你可以创建一个名为user_features_v2的新FeatureGroup,它与user_features_v1共享大部分特征,但新增了7d_avg_click_rate。这两个FeatureGroup可以并存,互不影响。
这种设计带来的迭代效率提升是颠覆性的。数据科学家在JupyterLab里,只需几行代码,就能为自己的实验加载不同版本的特征:
from sagemaker.feature_store.feature_group import FeatureGroup # 加载v1版本,用于基线模型 fg_v1 = FeatureGroup(name="user_features_v1", sagemaker_session=sess) training_data_v1 = fg_v1.athena_query().to_dataframe() # 加载v2版本,用于新模型实验 fg_v2 = FeatureGroup(name="user_features_v2", sagemaker_session=sess) training_data_v2 = fg_v2.athena_query().to_dataframe() # 直接比较两个版本在相同评估集上的AUC baseline_auc = evaluate_model(training_data_v1, test_data) new_auc = evaluate_model(training_data_v2, test_data) print(f"v1 AUC: {baseline_auc:.4f}, v2 AUC: {new_auc:.4f}")而线上服务,也只需修改一行配置,就能切换到新特征:
# 在推理端点的预处理代码中 def model_fn(model_dir): # ... 加载模型 # 切换FeatureGroup名称即可 feature_group_name = "user_features_v2" # 从 "user_features_v1" 改为 "user_features_v2" return model def input_fn(request_body, request_content_type): # ... 解析请求 # 使用v2的FeatureGroup进行实时特征查找 record = feature_store.get_record( feature_group_name="user_features_v2", record_identifier_value_as_string=user_id ) return process_features(record)整个过程,无需等待ETL上线,无需修改任何数据管道,甚至不需要重启线上服务(得益于Lambda或容器的热更新能力)。这就是“数据如何支撑迭代”的终极答案:不是更快地造轮子,而是让轮子本身具备无限组合和无缝切换的能力。FeatureStore,就是那个承载所有轮子的标准化底盘。
2.5 问题五:数据如何与模型生命周期对齐?——用Experiment & Trial实现“谁在什么时候,用什么数据,训练了什么模型”的原子化记录
最后一个,也是最根本的问题:“数据如何与模型生命周期对齐?”它直指MLOps的核心矛盾:数据是持续流动的,模型是离散发布的。一次TrainingJob的启动,本质上是一次对特定时刻、特定版本数据的快照(Snapshot)。如果这个快照没有被精确地、不可篡改地记录下来,那么“模型”就只是一个孤立的二进制文件,失去了其全部业务意义。SageMaker的Experiment和Trial机制,就是为了解决这个快照记录问题而设计的。
Experiment是一个逻辑容器,代表一个宏观的研究目标,比如“Q2用户留存率提升项目”。Trial则是Experiment下的一个具体实验实例,代表一次独立的、可重复的模型训练尝试。而TrialComponent,是Trial中最细粒度的单元,它精确地记录了一次TrainingJob或ProcessingJob的全部输入、输出、参数和指标。当你调用estimator.fit()时,SageMaker SDK会自动为你创建一个TrialComponent,并将其与当前的Trial和Experiment关联。这个TrialComponent的元数据,包含了:
input_artifacts: 一个字典,键是输入名称(如'training-dataset'),值是S3 URI;output_artifacts: 一个字典,键是输出名称(如'model-artifact'),值是S3 URI;parameters: 一个字典,包含了所有传递给训练脚本的超参数(--learning-rate,--batch-size等);metrics: 一个字典,包含了训练过程中自动捕获的指标(validation:auc,train:loss等)。
这个结构,完美地封装了“谁在什么时候,用什么数据,训练了什么模型”这四个要素。who是Trial的CreatedBy字段(通常是IAM Role ARN);when是TrialComponent的CreationTime;what data是input_artifacts;what model是output_artifacts['model-artifact']。更重要的是,TrialComponent是不可变的。一旦创建,其内容无法被修改。如果你想调整超参数重新训练,SDK会创建一个新的TrialComponent,而不是覆盖旧的。这保证了历史记录的绝对可信。
我曾在一个医疗影像诊断项目中,严格遵循这个模式。每次模型迭代,我们都创建一个新的Trial,并为其命名,如"lung-cancer-detection-v3.2.1-20240601"。在Trial内部,我们会创建多个TrialComponent,分别对应数据预处理、模型训练、模型评估。当监管机构要求提供“模型v3.2.1的完整训练证据链”时,我们只需提供这个Trial的ARN,他们就可以在SageMaker Studio里,点开Trial,看到所有TrialComponent的详细日志、输入输出S3路径、超参数和评估指标。整个过程,不需要我们整理任何额外文档,所有证据都内生于SageMaker的元数据系统。所以,“数据如何与模型生命周期对齐”的答案,就是放弃用Excel表格管理模型版本,转而用Trial这个原生的、受AWS IAM策略保护的、可审计的云原生对象,来承载每一次模型诞生的全部上下文。这是MLOps从“手工作坊”走向“现代工厂”的标志性一步。
3. 实操全流程:从零构建一个可验证的MLOps数据探索流水线
3.1 环境准备与权限配置:安全不是事后补救,而是架构起点
在SageMaker上构建任何MLOps流水线,第一步永远不是写代码,而是配置权限。我见过太多团队,因为一开始用了过于宽泛的AdministratorAccess策略,导致后期在生产环境加固时,不得不推倒重来,浪费数周时间。正确的做法,是从最小权限原则(Principle of Least Privilege)出发,为流水线的每一个组件,精确地授予它所需的、仅够用的权限。
首先,你需要一个专门的IAM Role,我们称之为SageMakerMLOpsRole。这个Role的信任策略(Trust Policy)必须允许sagemaker.amazonaws.com代入。然后,附加以下三条自定义策略(Policy),每一条都对应一个核心能力:
策略一:S3数据访问策略(sagemaker-s3-access-policy)
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::my-mlops-bucket", "arn:aws:s3:::my-mlops-bucket/*" ] } ] }注意,这里ListBucket的资源是桶名本身(无/*),而GetObject和PutObject的资源是桶名加/*。这是S3权限的常见陷阱:ListBucket操作的资源必须是桶,否则会静默失败。
策略二:FeatureStore访问策略(sagemaker-featurestore-policy)
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sagemaker:CreateFeatureGroup", "sagemaker:DeleteFeatureGroup", "sagemaker:DescribeFeatureGroup", "sagemaker:PutRecord", "sagemaker:GetRecord", "sagemaker:StartIngestionJob", "sagemaker:DescribeIngestionJob" ], "Resource": "arn:aws:sagemaker:*:*:feature-group/my-feature-group-*" } ] }这里的关键是Resource的ARN模式。my-feature-group-*允许你创建以my-feature-group-开头的所有FeatureGroup,这为版本化命名(my-feature-group-v1,my-feature-group-v2)提供了灵活性,同时又避免了*带来的过度授权。
策略三:Lineage与Experiment管理策略(sagemaker-lineage-policy)
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sagemaker:CreateExperiment", "sagemaker:CreateTrial", "sagemaker:CreateTrialComponent", "sagemaker:AssociateTrialComponent", "sagemaker:DescribeTrialComponent", "sagemaker:ListArtifacts", "sagemaker:ListActions", "sagemaker:ListContexts" ], "Resource": "*" } ] }这个策略的Resource是*,因为Lineage相关的API大多不支持资源级权限(Resource-level permissions)。但这是安全的,因为这些API本身不涉及数据读写,只涉及元数据的创建和查询。
配置完Role后,在SageMaker Studio中,你需要为你的User Profile(用户配置文件)显式地关联这个SageMakerMLOpsRole。这一步至关重要,因为Studio的Notebook Kernel默认使用的是User Profile的Role,而不是你在代码里assume_role的Role。很多初学者的Lineage图谱为空,原因就是Kernel没有权限写入Lineage元数据。你可以通过在Notebook里运行!aws sts get-caller-identity来验证当前Kernel使用的Role ARN是否正确。
提示:在生产环境中,我强烈建议为
SageMakerMLOpsRole启用AWS CloudTrail日志记录,并设置一个CloudWatch告警,当该Role被用于CreateTrainingJob或CreateProcessingJob时,发送通知。这能让你第一时间感知到任何非预期的模型训练活动,是安全审计的第一道防线。
3.2 数据获取与初步探索:用ProcessingJob构建可复现的数据快照
数据获取阶段的目标,不是“把数据弄进来”,而是“把数据的上下文和契约一起弄进来”。我们以一个经典的电商用户行为数据集为例,原始数据存放在S3路径s3://my-raw-bucket/user-behavior/2024-06-01/下,格式为JSON Lines(JSONL)。我们的ProcessingJob不仅要解析JSON,还要做三件事:第一,为每条记录打上一个精确的event_time(事件时间戳);第二,对关键字段(如user_id,item_id)进行非空校验;第三,将处理后的数据,以Parquet格式,写入一个带有日期后缀的S3路径,作为本次数据快照的唯一标识。
以下是完整的preprocess.py脚本,它将被SKLearnProcessor调用:
import sys import json import pandas as pd from datetime import datetime import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def main(): # 1. 从命令行参数获取输入/输出路径 input_path = sys.argv[1] # /opt/ml/processing/input/raw/ output_path = sys.argv[2] # /opt/ml/processing/output/train/ logger.info(f"Reading raw data from {input_path}") # 2. 读取所有JSONL文件 all_records = [] for file_path in Path(input_path).rglob("*.jsonl"): with open(file_path, 'r') as f: for line_num, line in enumerate(f, 1): try: record = json.loads(line.strip()) # 3. 强制添加event_time,使用当前处理时间,确保一致性 record['event_time'] = datetime.utcnow().isoformat() + 'Z' all_records.append(record) except json.JSONDecodeError as e: logger.warning(f"Invalid JSON on line {line_num} of {file_path}: {e}") if not all_records: raise ValueError("No valid records found in input data") # 4. 转为DataFrame,并进行基础清洗 df = pd.DataFrame(all_records) logger.info(f"Loaded {len(df)} records") # 5. 关键字段非空校验 required_fields = ['user_id', 'item_id', 'event_type'] missing_fields = df[required_fields].isnull().any(axis=1) if missing_fields.any(): logger.warning(f"Found {missing_fields.sum()} records with missing required fields") df = df[~missing_fields].copy() # 6. 类型转换与标准化 df['user_id'] = df['user_id'].astype(str) df['item_id'] = df['item_id'].astype(str) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', errors='coerce') df = df.dropna(subset=['timestamp']) # 7. 写入Parquet,分区存储 # 按照event_type分区,便于后续按行为类型高效查询 df.to_parquet( f"{output_path}/data/", partition_cols=['event_type'], index=False, compression='snappy' ) logger.info(f"Processed data written to {output_path}") if __name__ == "__main__": main()现在,我们在Notebook中启动这个ProcessingJob:
from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from pathlib import Path # 定义处理器 processor = SKLearnProcessor( framework_version='0.23-1', role='arn:aws:iam::123456789012:role/SageMakerMLOpsRole', instance_type='ml.m5.xlarge', instance_count=1, volume_size_in_gb=30, max_runtime_in_seconds=3600 ) # 启动处理任务 processor.run( code='preprocess.py', # 输入:指向原始数据的S3路径,包含日期,形成快照 inputs=[ ProcessingInput( source='s3://my-raw-bucket/user-behavior/2024-06-01/', destination='/opt/ml/processing/input/raw', s3_data_type='S3Prefix', s3_input_mode='File' ) ], # 输出:写入一个带有相同日期后缀的路径,确保可追溯 outputs=[ ProcessingOutput( output_name='processed-data', source='/opt/ml/processing/output/train/', destination='s3://my-mlops-bucket/processed-data/2024-06-01/' ) ], # 传递参数给preprocess.py arguments=['/opt/ml/processing/input/raw/', '/opt/ml/processing/output/train/'] ) # 获取ProcessingJob的ARN,用于后续Lineage关联 processing_job = processor.jobs[-1] print(f"ProcessingJob ARN: {processing_job.describe()['ProcessingJobArn']}")这个ProcessingJob的成功执行,标志着我们完成了第一个可验证的数据快照。它的输出路径s3://my-mlops-bucket/processed-data/2024-06-01/,就是一个精确的、不可变的数据版本。任何后续的分析、训练,都必须基于这个路径,而不是一个模糊的latest别名。这是MLOps可靠性的第一块基石。
3.3 特征工程与FeatureStore注入:从数据快照到可复用的特征服务
有了ProcessingJob生成的Parquet数据,下一步是将其注入FeatureStore,使其从“静态数据”变为“活的特征”。这个过程分为两步:首先,定义FeatureGroup的Schema;其次,启动IngestionJob进行数据灌入。
Step 1: 定义FeatureGroup Schema
我们为用户行为数据定义一个名为user_behavior_features_v1的FeatureGroup。其Schema如下:
from sagemaker.feature_store.feature_definition import ( FeatureDefinition, FeatureTypeEnum ) feature_definitions = [ FeatureDefinition(feature_name="user_id", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="item_id", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="event_type", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="timestamp", feature_type=FeatureTypeEnum.FRACTIONAL), FeatureDefinition(feature_name="event_time", feature_type=FeatureTypeEnum.STRING), # 作为EventTimeFeatureName FeatureDefinition(feature_name="session_id", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="duration_sec", feature_type=FeatureTypeEnum.INTEGRAL), ] # 创建FeatureGroup feature_group = FeatureGroup( name="user_behavior_features_v1", feature_definitions=feature_definitions, record_identifier_name="user_id", event_time_feature_name="event_time", role_arn="arn:aws:iam::123456789012:role/SageMakerMLOpsRole", sagemaker_session=sess ) # 创建FeatureGroup,指定OnlineStore和OfflineStore feature_group.create( s3_uri="s3://my-mlops-bucket/feature-store-offline/", record_identifier_name="user_id", event_time_feature_name="event_time", enable_online_store=True, online_store_security_config={ "KmsKeyId": "arn:aws:kms:us-east-1:123456789012:key/abc123-def456" } )这里的关键点是record_identifier_name和event_time_feature_name。user_id作为主键,意味着OnlineStore会以user_id为DynamoDB的Partition Key;event_time作为事件时间,SageMaker会自动为每条记录生成一个event_time索引,用于高效的时序查询。
Step 2: 启动IngestionJob
IngestionJob不是一次性任务,而是一个长期运行的、从S3批量读取数据并写入FeatureStore的作业。我们需要告诉它从哪里读,以及如何映射字段。
# 从ProcessingJob的输出路径读取Parquet数据 ingestion_job = feature_group.ingest( data_source_uri="s3://my-mlops-bucket/processed-data/2024-06-01/", # 字段映射:S3 Parquet中的列名 -> FeatureGroup中的feature_name # 这里假设Parquet文件的列名与FeatureGroup的feature_name完全一致 # 如果不一致,需要在这里做显式映射 # feature_name_to_column_map={"user_id": "user_id", "item_id": "item_id", ...} wait=True, # 等待作业完成再返回 timeout=3600 ) print(f"IngestionJob completed. Status: {ingestion_job.describe()['IngestionJobStatus']}")IngestionJob完成后,FeatureStore的OfflineStore中就会有这批数据的副本,而OnlineStore也会被填充。此时,你可以立即进行两种查询:
- 离线查询(用于训练):
# 使用Athena查询OfflineStore query = feature_group.athena_query() query.run( sql=f"SELECT * FROM \"{query.table_name}\" WHERE user_id = 'u123' LIMIT 10", output_location="s3://my-mlops-bucket/athena-results/" ) df = query.as_dataframe()- 在线查询(用于实时推理):
# 查询单个用户的最新特征 record = feature_store.get_record( feature_group_name="user_behavior_features_v1", record_identifier_value_as_string="u123" ) print(record)这个过程,将数据获取、清洗、特征化、存储、服务化,全部串联成一个闭环。FeatureGroup的v1版本,就是我们为本次数据探索所定义的第一个、可被所有下游任务引用的“特征契约”。
3.4 构建Experiment与Trial:为每一次模型训练打上精确的时间戳
现在,我们拥有了一个干净的、可追溯的FeatureGroup。接下来,我们要用它来训练一个简单的XGBoost模型,并将整个过程,精确地记录在Experiment和Trial中。
首先,创建Experiment和Trial: