计费系统设计:精准追踪每个用户的Token消耗
在AI服务逐渐成为基础设施的今天,如何公平、准确地衡量资源使用,已经成为平台能否可持续运营的核心命题。尤其是大语言模型(LLM)广泛应用后,“按Token计费”几乎成了行业默认标准——但看似简单的“数Token”,背后却藏着复杂的工程挑战。
想象这样一个场景:一个企业用户通过API调用生成一份报告,输入500个Token,模型返回了1200个Token的内容。这笔请求该收多少钱?如果系统把输入算成480、输出误判为1300,长期累积下来不仅账单不准,还可能引发客户争议。更糟糕的是,若在高并发下出现数据丢失或重复统计,整个计费信誉就崩塌了。
这正是我们需要一套可信赖、低侵入、高精度的Token计量体系的原因。而实现这一点的关键,并不在于从零开发一个计费引擎,而是要借力成熟的机器学习框架本身的能力。比如TensorFlow,它不仅是跑模型的容器,更是天然的“行为记录仪”。
为什么选TensorFlow作为计量底座?
很多人认为计费系统应该独立于模型运行环境,以免影响性能。但实际上,最准确的数据来源恰恰是推理执行现场。TensorFlow之所以适合作为计量基础,是因为它提供了几个不可替代的特性:
首先是它的生产级稳定性。这套框架在Google内部支撑着搜索、翻译、广告等核心业务多年,经历过极端流量考验。这意味着基于它构建的计费逻辑不会因为偶发崩溃导致事件漏报。
其次是其强大的可观测性支持。TensorFlow不仅允许你在tf.function中插入自定义逻辑,还能通过TensorBoard实时监控每一步张量变化。你可以清楚看到输入张量的形状、输出序列长度,甚至内存占用趋势——这些全是计费所需的关键元数据。
再者是生态整合能力。借助TensorFlow Serving,你可以将模型打包部署为gRPC服务,同时在预处理/后处理阶段无缝注入计量代码;配合SavedModel格式,还能统一管理不同版本模型对应的分词规则和计费策略。
更重要的是,它原生支持批处理与异步执行。这对于计费来说意义重大:上报动作完全可以脱离主推理路径,走后台线程或消息队列,避免阻塞响应。这样一来,即便写数据库慢一点,也不会拖累API延迟。
如何在推理流程中“无感”捕获Token消耗?
真正的难点不是“能不能统计”,而是“怎么统计才不影响性能”。理想的做法是:对模型本身零修改,对用户体验零感知,但又能拿到每一笔请求的完整上下文。
下面这个模式在多个生产系统中被验证有效:
import tensorflow as tf from transformers import T5Tokenizer import time # 全局共享组件 tokenizer = T5Tokenizer.from_pretrained("t5-small") model = tf.saved_model.load("path/to/saved_model") @tf.function def predict_with_metering(inputs): # 输入编码 → 获取input_ids input_tokens = tokenizer(inputs, return_tensors="tf", padding=True, truncation=True) # 关键:直接从张量维度获取输入Token数 input_token_count = tf.shape(input_tokens["input_ids"])[1] # 批次中最大长度 start_time = time.time() outputs = model(**input_tokens) end_time = time.time() # 输出解码 → 统计实际生成Token generated_text = tokenizer.batch_decode(outputs, skip_special_tokens=True) output_token_count = sum(len(tokenizer.encode(text)) for text in generated_text) # 异步上报(模拟) log_billing_event( user_id="user_123", model_name="t5-small", input_tokens=int(input_token_count), output_tokens=output_token_count, latency_ms=int((end_time - start_time) * 1000) ) return outputs def log_billing_event(user_id, model_name, input_tokens, output_tokens, latency_ms): """异步上报至Kafka或时序数据库""" total_tokens = input_tokens + output_tokens print(f"[BILLING] User: {user_id}, Model: {model_name}") print(f" Input: {input_tokens}, Output: {output_tokens}, Total: {total_tokens}") print(f" Latency: {latency_ms}ms")这段代码的关键在于三点:
使用与训练一致的分词器
很多团队为了省事用字符串长度估算Token数,结果发现中文“你好”算两个字符,实际被BPE拆成四个子词。只有用Hugging Face官方Tokenizer才能保证一致性。利用
tf.shape()动态获取张量尺寸
不依赖Python层面的len()操作,而是在图内直接提取input_ids第二维大小,既高效又兼容批量推理。解码后重新编码统计输出Token
模型输出的是ID序列,但最终返回给用户的可能是纯文本。中间可能经过截断、过滤EOS等处理,所以必须以最终呈现内容为准重新计算Token数。
更重要的是,log_billing_event可以包装成非阻塞调用,比如提交到线程池或发布到Kafka主题,完全不影响主流程响应速度。
实际架构如何落地?
在一个典型的AI服务平台中,我们不会让每个模型都自己上报日志,而是通过分层设计实现统一管控:
+------------------+ +---------------------+ | 用户请求入口 | ----> | API网关 / 认证层 | +------------------+ +----------+----------+ | +-------------v-------------+ | 请求预处理与分词 | | (获取input_tokens) | +-------------+-------------+ | +-----------------------v------------------------+ | TensorFlow推理服务 | | - 模型加载 | | - 执行预测 | | - 输出解码 → 获取output_tokens | +-----------------------+------------------------+ | +---------------v------------------+ | 计费事件收集与上报 | | (异步写入Kafka / 数据库) | +---------------+------------------+ | +----------------v------------------+ | 计费数据聚合与账单生成 | | (按小时/天/用户维度汇总Token用量) | +------------------------------------+每一层都有明确职责:
- API网关负责身份识别、限流、缓存判断。如果是命中缓存的请求,可以直接跳过计量;
- 预处理模块统一进行分词并记录
input_tokens,避免各模型重复实现; - TensorFlow Serving实例承载模型运行,在输出阶段补充
output_tokens; - 上报层采用异步持久化机制,确保即使下游短暂故障也不丢事件;
- 聚合层则定时运行Spark或Flink任务,按用户、模型、时间段生成可用度量指标。
这种架构最大的好处是解耦清晰。比如换了一个新模型,只要遵循相同的输入输出规范,就能自动接入现有计费流水线,无需额外开发。
遇到过哪些坑?该怎么规避?
再完美的设计也会遇到现实冲击。我们在实践中总结出几个高频问题及其应对方案:
1. 分词不一致导致计费偏差
同一个句子,在客户端分词和服务器端结果不一样?很可能是Tokenizer版本差异。建议:
- 所有环境强制锁定transformers版本;
- 或干脆把分词逻辑封装成独立微服务,统一提供/tokenize接口。
2. 批处理掩盖真实消耗
TensorFlow Serving默认开启批处理,一次处理32个请求。如果只上报总输入长度而不拆分到单个用户,就会造成“集体买单”的混乱。解决方案是:
- 在请求头中携带唯一request_id;
- 上报时绑定该ID,确保后续能按粒度归因。
3. 模型异常导致空输出刷量
恶意用户发送无效提示(prompt),触发模型快速返回空串或极短文本。虽然耗时低,但如果按请求收费仍会造成不公平。对策包括:
- 设置最小输出Token阈值(如低于10个不计费);
- 引入“有效生成率”指标,用于识别可疑行为。
4. 冷启动延迟污染计费数据
首次加载模型可能耗时数秒,这部分不应计入用户请求延迟。建议:
- 在服务启动后预热模型,执行一次dummy推理;
- 或在前N分钟内标记为“调试期”,不上报正式账单。
5. 审计合规与隐私保护
原始文本不能出现在日志里!但我们又需要保留足够信息用于纠纷核查。折中做法是:
- 日志中仅保存Token数量、时间戳、用户ID哈希值;
- 原始请求体加密存储于独立审计库,访问需审批授权。
更进一步:不只是“计费”,更是“治理”
当你有了精确的Token消耗数据之后,它的用途远不止开发票。事实上,这类细粒度度量正在推动AI系统的精细化治理。
比如:
- 成本分摊:部门A用了多少GPT-4 Token,部门B调了多少次本地T5模型?有了数据就可以内部结算。
- QoS分级:高付费用户优先分配资源,低优先级请求进入排队通道——这一切都建立在可量化的使用基础上。
- 模型优化反馈:发现某个模型平均输出Token特别长,是否说明它啰嗦?可以据此调整解码参数。
- 容量规划:根据历史增长曲线预测下季度GPU需求,提前采购或扩容。
某种程度上说,计量系统其实是AI平台的“神经系统”——没有它,你根本不知道身体哪里疼、什么时候该休息。
结语
精准追踪Token消耗,听上去是个技术细节,实则是AI服务商业化的基石。而这条路的最佳实践,不是另起炉灶造轮子,而是站在像TensorFlow这样的工业级框架肩膀上,把模型执行过程变成可信的数据源。
未来随着小型化模型向边缘设备迁移,类似的计量逻辑也会延伸到手机、IoT终端上。那时我们可能不再依赖中心化服务,而是通过本地签名+离线汇总的方式完成去中心化计费。但无论形态如何演进,核心原则不变:可验证、低干扰、端到端闭环。
现在的每一次tf.shape(input_ids)[1],都是在为那个更智能、更透明的AI经济生态铺路。