news 2026/7/4 7:05:41

CANN算子白盒用例合并扩展

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
CANN算子白盒用例合并扩展

Step 5c:合并 + 空 tensor 补全 + data_range 展开

【免费下载链接】cannbot-skillsCANNBot 是面向 CANN 开发的用于提升开发效率的系列智能体,本仓库为其提供可复用的 Skills 模块。项目地址: https://gitcode.com/cann/cannbot-skills

前置条件:Step 5b 完成(S5_mapped_cases_network.json 已写入)


概述

输入文件输出文件
S5_mapped_cases_path.jsonS5_mapped_cases_network.jsonS5_merge_expand.pyS5_mapped_cases_low.jsonS5_mapped_cases_high.json

通过S5_merge_expand.py依次完成:合并 path + network + 空 tensor → low.json(all normal)→ data_range 交叉展开 → high.json。


S5_merge_expand.py

脚本分为通用部分(照搬)和算子专属部分supplement_empty_cases,见 §supplement_empty_cases 策略)。子 agent 先照搬通用部分,再根据算子 param_type 生成supplement_empty_cases并插入脚本,最后按 §执行 中的步骤依次运行。

"""S5_merge_expand.py — 合并 + 空 tensor 补全 + 元素数过滤 + data_range 展开""" import json, copy, math, os, re, sys out_dir = os.path.dirname(os.path.abspath(__file__)) # === 共享工具函数 === NON_NORMAL = ["zero", "extreme", "negative", "tiny_pos", "all_ones", "near_zero", "with_inf", "with_nan"] _DOMAIN_EXCLUDE = { "positive": {"zero", "negative", "near_zero", "with_nan"}, "non_negative": {"negative", "near_zero", "with_nan"}, "non_zero": {"zero", "near_zero", "with_nan"}, } def _allowed_ranges(value_domain): """根据 value_domain 返回允许的 NON_NORMAL 标签列表。""" if not value_domain: return list(NON_NORMAL) vd_type = value_domain["type"] if vd_type == "range": lo, hi = value_domain.get("min"), value_domain.get("max") excluded = {"extreme", "with_nan", "with_inf"} if lo is not None and lo > 0: excluded.add("zero") excluded.add("negative") elif lo is not None and lo == 0: excluded.add("negative") elif hi is not None and hi < 0: excluded.add("zero") if hi is not None and hi < 1e-7: excluded.add("tiny_pos") if (lo is not None and lo > 1) or (hi is not None and hi < 1): excluded.add("all_ones") return [dr for dr in NON_NORMAL if dr not in excluded] excluded = _DOMAIN_EXCLUDE.get(vd_type, set()) return [dr for dr in NON_NORMAL if dr not in excluded] def set_all_data_range(case, dr): for spec in case["tensors"]["inputs"].values(): if spec is None: continue if isinstance(spec, list): for sub in spec: sub["_data_range"] = dr else: spec["_data_range"] = dr case.pop("_data_range", None) return case def expand_high(cases): """one-hot + 全统一展开。按 _value_domain 过滤不兼容 data_range。""" if not cases: return [] expanded = [] input_names = [n for n, s in cases[0]["tensors"]["inputs"].items() if s is not None] for c in cases: nc = copy.deepcopy(c) nc["id"] = f"{c['id']}_all_normal" set_all_data_range(nc, "normal") expanded.append(nc) all_allowed = set(NON_NORMAL) for spec in c["tensors"]["inputs"].values(): if spec is None: continue s = spec[0] if isinstance(spec, list) else spec all_allowed &= set(_allowed_ranges(s.get("_value_domain"))) for dr in sorted(all_allowed): nc = copy.deepcopy(c) nc["id"] = f"{c['id']}_all_{dr}" set_all_data_range(nc, dr) expanded.append(nc) for inp in input_names: s = c["tensors"]["inputs"][inp] s0 = s[0] if isinstance(s, list) else s allowed = _allowed_ranges(s0.get("_value_domain")) for dr in allowed: nc = copy.deepcopy(c) nc["id"] = f"{c['id']}_{inp}_{dr}" for name, spec in nc["tensors"]["inputs"].items(): if spec is None: continue target_dr = dr if name == inp else "normal" if isinstance(spec, list): for sub in spec: sub["_data_range"] = target_dr else: spec["_data_range"] = target_dr expanded.append(nc) return expanded _COMPACT_KEYS = ("shape", "dtype", "id", "_group", "_data_range", "_value_domain") def _compact_json(text): for key in _COMPACT_KEYS: text = re.sub( rf'("{key}"): \[\s*\n((?:\s+[^\n]+,\n)*\s+[^\n]+\n\s*)\]', lambda m: f'{m.group(1)}: [{", ".join(l.strip().rstrip(",") for l in m.group(2).strip().splitlines() if l.strip())}]', text, flags=re.MULTILINE ) text = re.sub( rf'("{key}"): ("[^"]*"|\d+(?:\.\d+)?(?:e[+-]?\d+)?)', rf'\1: \2', text, flags=re.MULTILINE ) return text # === 算子专属:supplement_empty_cases(根据 param_type 生成,见下方 §策略) === def supplement_empty_cases(mapped_path_cases, seed=42): """从 1 个随机 path case 模板生成空 tensor 变体。 REQUIRED 和 DYNAMIC 分支实现不同,子 agent 根据算子 param_type 选择对应模板生成。 """ pass # 子 agent 根据 §supplement_empty_cases 策略 替换此实现 # === 5c 入口:合并 + 空 tensor + 过滤 → low === def _max_numel(spec): """计算 tensor spec 的最大元素数(DYNAMIC 取各子 tensor 最大值)""" if spec is None: return 0 if isinstance(spec, list): return max((math.prod(sub["shape"]) for sub in spec), default=0) return math.prod(spec["shape"]) def merge_low(): with open(os.path.join(out_dir, "S5_mapped_cases_path.json")) as f: path_cases = json.load(f)["cases"] with open(os.path.join(out_dir, "S5_mapped_cases_network.json")) as f: net_cases = json.load(f)["cases"] path_cases = [c for c in path_cases if all(_max_numel(v) <= 100_000_000 for v in c["tensors"]["inputs"].values())] net_cases = [c for c in net_cases if all(_max_numel(v) <= 100_000_000 for v in c["tensors"]["inputs"].values())] empty_cases = supplement_empty_cases(path_cases) vd_map = {} model_path = os.path.join(out_dir, "S2P1_operator_model.json") if os.path.exists(model_path): with open(model_path) as f: op_model = json.load(f) for inp in op_model.get("inputs", []): vd = inp.get("value_domain") if vd: vd_map[inp["name"]] = vd def _attach_vd(case): for name, spec in case["tensors"]["inputs"].items(): if spec is None: continue vd = vd_map.get(name) if not vd: continue if isinstance(spec, list): for sub in spec: sub["_value_domain"] = vd else: spec["_value_domain"] = vd return case combined = path_cases + net_cases + empty_cases low = [] for c in combined: c = set_all_data_range(copy.deepcopy(c), "normal") _attach_vd(c) low.append(c) with open(os.path.join(out_dir, "S5_mapped_cases_low.json"), "w") as f: raw = json.dumps({"cases": low}, indent=2, ensure_ascii=False) f.write(_compact_json(raw)) # === 5d 入口:data_range 展开 → high === def expand_high_main(): with open(os.path.join(out_dir, "S5_mapped_cases_low.json")) as f: low_cases = json.load(f)["cases"] def _is_empty(c): cid = c.get("id", "") return cid.endswith("_empty") empty_cases = [c for c in low_cases if _is_empty(c)] path_cases = [c for c in low_cases if not _is_empty(c) and c["params"].get("_group") != "network"] net_cases = [c for c in low_cases if c["params"].get("_group") == "network"] high = expand_high(path_cases) + empty_cases + expand_high(net_cases) with open(os.path.join(out_dir, "S5_mapped_cases_high.json"), "w") as f: json.dump({"cases": high}, f, ensure_ascii=False) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "5d": expand_high_main() else: merge_low()

逻辑说明

过滤规则

剔除任意输入 tensor 元素数 > 1 亿的 path/network case(规则相同)。DYNAMIC(list[dict])取各子 tensor 元素数最大值。

supplement_empty_cases 策略

函数supplement_empty_cases定义在 S5_merge_expand.py 中(算子专属部分)。 从 1 个随机 path case 模板生成空 tensor 变体。

通用约束

  • 签名:supplement_empty_cases(mapped_path_cases, seed=42)
  • 返回:list[dict],格式同 mapped_path_cases 元素(含 id/params/tensors)
  • 每个空 case 的_data_range = "normal"
  • ID 后缀:REQUIRED 统一为_empty(含全零兜底);DYNAMIC 为_empty(位置变体)或_all_empty(全零兜底)。_is_empty通过endswith("_empty")检测,兼容两种
REQUIRED 分支(单 tensor)

REQUIRED 空 tensor 实现高度算子专属,无通用代码模板。子 agent 根据 S5_mapping_spec.md 和 operator_model 生成具体实现。

策略

Phase 1 — 可空化维度识别:从 S5_mapping_spec.md §shape 构造参数 中,读取每个 shape 参数的控制关系(参数名 → 控制的 tensor → 控制的 dim 索引),确定哪些 input tensor 的哪些维度可以被置零。const input(值和 shape 均固定的 tensor,不随 case 参数变化)不参与空化。

Phase 2 — 变体生成:

  • 对每个可空化的(tensor, dim)组合,从 1 个随机 path case 模板生成 1 个变体:仅将该维度置 0,其余维度保持原值
  • 跨变体去重(按所有 input shape 签名去重)
  • 追加 1 个全零兜底变体:仅将 Phase 1 识别的可空化 tensor 全维度置 0,const input 保持原值
  • 按 sync_with 传播零化到关联 tensor,更新 outputs
  • 将 params 中受 shape 变更影响的字段同步为与修改后 shape 一致的值

不变约束

  • 签名:supplement_empty_cases(mapped_path_cases, seed=42)
  • 返回:list[dict],格式同 mapped_path_cases 元素
  • 每个空 case 的_data_range = "normal"
  • 设置params["_group"](如"{affected_key}_empty"
  • ID 后缀统一为_empty(含全零兜底),供_is_empty检测
DYNAMIC 分支(TensorList)

DYNAMIC 空 tensor 不逐子 tensor 逐维度枚举(子 tensor 数量可达 50,逐维度枚举导致场景爆炸),改为按子 tensor 在列表中的位置采样。

策略(从 1 个随机 path case 模板生成变体):

  • 筛选 ndim>0 的子 tensor 索引(跳过 scalar,scalar 无法置零单个维度)
  • first/middle/last:取筛选后索引列表的首/中/末位置各 1 个变体,将该子 tensor 全维度置 0
  • partial:取 ndim 最高的子 tensor,将其最大维度值置 0(保留其余维度结构,仅 ndim≥2 时生成)
  • all_empty:所有子 tensor 全维度置 0(兜底,ID 后缀_all_empty
  • 位置去重:若首/中/末指向同一索引(仅 1 个非 scalar 子 tensor),只生成 1 个变体
  • sync_with 传播:全维度置 0 → 目标对应位置全维度置 0;partial → 目标同维度置 0
  • _group:DYNAMIC 空 case 不设置 _group(继承模板值),空 tensor 检测依赖 ID 后缀(见通用约束),不依赖 _group

DYNAMIC 代码模板(子 agent 替换name/out变量值(L226-227 引号内的 {name}/{out})+{target}/{target_out}文本替换(L272 等引号内的占位符)):

输出推导假设:以下模板假设输出 shape 规则为same_as_input(02 §4)。若算子输出规则为derivedfixed,子 agent 需修改模板中 outputs 更新逻辑。

def supplement_empty_cases(mapped_path_cases, seed=42): if not mapped_path_cases: return [] import random, copy rng = random.Random(seed) template = rng.choice(mapped_path_cases) name = "{name}" out = "{out}" max_id = 0 for c in mapped_path_cases: cid = c.get("id", "case00000") num = int(cid.replace("case", "")) if num > max_id: max_id = num # Phase 1:筛选 ndim>0 的子 tensor 索引 valid_indices = [i for i, sub in enumerate(template["tensors"]["inputs"][name]) if len(sub["shape"]) > 0] positions = [] seen = set() for label, pos in [("first_empty", valid_indices[0] if valid_indices else -1), ("middle_empty", valid_indices[len(valid_indices) // 2] if valid_indices else -1), ("last_empty", valid_indices[-1] if valid_indices else -1)]: if pos >= 0 and pos not in seen: positions.append((label, pos, "full")) seen.add(pos) if valid_indices: partial_idx = max(valid_indices, key=lambda i: len(template["tensors"]["inputs"][name][i]["shape"])) partial_ndim = len(template["tensors"]["inputs"][name][partial_idx]["shape"]) if partial_ndim >= 2: positions.append(("partial_empty", partial_idx, "partial")) # Phase 2:变体生成 empty_cases = [] idx = 0 for label, tensor_idx, mode in positions: variant = copy.deepcopy(template) sub = variant["tensors"]["inputs"][name][tensor_idx] if mode == "full": sub["shape"] = tuple([0] * len(sub["shape"])) else: # partial: 最大维度值置零 shape = list(sub["shape"]) max_dim = shape.index(max(shape)) shape[max_dim] = 0 sub["shape"] = tuple(shape) # 更新 outputs(same_as_input 同步) variant["tensors"]["outputs"][out][tensor_idx]["shape"] = sub["shape"] # sync_with 传播(无 sync_with 时删除此段) if "{target}" in variant["tensors"]["inputs"]: target_sub = variant["tensors"]["inputs"]["{target}"][tensor_idx] if mode == "full": target_sub["shape"] = tuple([0] * len(target_sub["shape"])) else: target_shape = list(target_sub["shape"]) if len(target_shape) > max_dim: target_shape[max_dim] = 0 target_sub["shape"] = tuple(target_shape) if "{target_out}" in variant["tensors"]["outputs"]: variant["tensors"]["outputs"]["{target_out}"][tensor_idx]["shape"] = target_sub["shape"] variant["id"] = f"case{max_id+1+idx:05d}_{name}_{label}" variant["_data_range"] = "normal" idx += 1 empty_cases.append(variant) # all_empty 兜底 all_empty = copy.deepcopy(template) for sub in all_empty["tensors"]["inputs"][name]: sub["shape"] = tuple([0] * len(sub["shape"])) for sub in all_empty["tensors"]["outputs"][out]: sub["shape"] = tuple([0] * len(sub["shape"])) if "{target}" in all_empty["tensors"]["inputs"]: for sub in all_empty["tensors"]["inputs"]["{target}"]: sub["shape"] = tuple([0] * len(sub["shape"])) if "{target_out}" in all_empty["tensors"]["outputs"]: for sub in all_empty["tensors"]["outputs"]["{target_out}"]: sub["shape"] = tuple([0] * len(sub["shape"])) all_empty["id"] = f"case{max_id+1+idx:05d}_{name}_all_empty" all_empty["_data_range"] = "normal" empty_cases.append(all_empty) return empty_cases

空 tensor 在 low / high 中的传递

  • low:由supplement_empty_cases生成后合并,同其他 case 一样设_data_range: "normal"
  • high:按id后缀(_empty)从 low 中分离,直接保留(不经过expand_high

low 档位:全 normal

所有输入 tensor_data_range统一设为"normal"

输出格式

文件格式
S5_mapped_cases_low.json缩进 + 选择性压缩(_compact_json压缩 shape/dtype/id/_group/_data_range)
S5_mapped_cases_high.json紧凑格式,无缩进

输出示例

以下示例展示_compact_json()的实际压缩效果,供子 agent 生成脚本时参考。

示例:S5_mapped_cases_low.json 压缩前 vs 压缩后

_compact_json()将 5 个 key(shapedtypeid_group_data_range)从多行缩进展平为单行。

压缩前(JSON indent=2 展开形式):

{ "cases": [ { "id": "case00000", "params": { "_group": "{group_id}", "{param_1}": "value_1", "{param_2}": "value_2" }, "tensors": { "inputs": { "{tensor_a}": { "shape": [ 4, 8, 16 ], "dtype": "float16" } }, "outputs": { "{tensor_out}": { "shape": [ 4, 8, 16 ], "dtype": "float16" } } } } ] }

压缩后(low.json 实际写入格式;5 个 key 展平为单行,其他结构不变):

{ "cases": [ { "id": "case00000", "params": { "_group": "{group_id}", "{param_1}": "value_1", "{param_2}": "value_2" }, "tensors": { "inputs": { "{tensor_a}": {"shape": [4, 8, 16], "dtype": "float16"} }, "outputs": { "{tensor_out}": {"shape": [4, 8, 16], "dtype": "float16"} } } } ] }

压缩规则

  • 被压缩的 key:shapedtypeid_group_data_range
  • 不被压缩的 key(如paramstensors、各 tensor 名)保持原有缩进
  • S5_mapped_cases_low.json中所有 case 的_data_range均为"normal"

约束

  • _data_range存储在tensors.inputs.{name}._data_range,per-input 独立(set_all_data_range统一写入,同时清除 5a 遗留的顶层_data_range
  • DYNAMIC 输入(list[dict])的每个子 tensor 均写入_data_range,同一个 TensorList 内所有子 tensor 的值相同(语义上共享同一个 data_range)
  • 空 tensor 不参与 data_range 交叉,固定_data_range: "normal"
  • 空 tensor 通过id后缀(_empty)检测,不依赖_group字段
  • 每个输入 tensor 元素数 ≤ 100,000,000(1 亿)。超出的 path/network case 直接丢弃

执行

子 agent 须按以下顺序完成:

  1. 将上方通用部分写入{whitebox}/S5_merge_expand.py
  2. 根据算子 param_type 生成supplement_empty_cases(REQUIRED 或 DYNAMIC 分支),替换脚本中的pass占位
  3. 运行python S5_merge_expand.pyS5_mapped_cases_low.json
  4. 运行python S5_merge_expand.py 5dS5_mapped_cases_high.json

检查清单

  • S5_merge_expand.py已写入{whitebox}/
  • supplement_empty_cases已根据算子 param_type 正确生成(REQUIRED 或 DYNAMIC 分支)
  • python S5_merge_expand.py退出码 0
  • S5_mapped_cases_low.json已写入(缩进 + 压缩格式)
  • python S5_merge_expand.py 5d退出码 0
  • S5_mapped_cases_high.json已写入(紧凑格式)
  • 空 tensor 变体数量 > 0

【免费下载链接】cannbot-skillsCANNBot 是面向 CANN 开发的用于提升开发效率的系列智能体,本仓库为其提供可复用的 Skills 模块。项目地址: https://gitcode.com/cann/cannbot-skills

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/4 7:04:37

Obsidian知识图谱可视化:Obsidian-zola的图形功能深度解析

Obsidian知识图谱可视化&#xff1a;Obsidian-zola的图形功能深度解析 【免费下载链接】obsidian-zola A no-brainer solution to turning your Obsidian PKM into a Zola site. 项目地址: https://gitcode.com/gh_mirrors/ob/obsidian-zola Obsidian-zola是一款将Obsid…

作者头像 李华
网站建设 2026/7/4 7:04:16

Statsig Status Page代码解析:JavaScript模板引擎实现原理

Statsig Status Page代码解析&#xff1a;JavaScript模板引擎实现原理 【免费下载链接】statuspage A simple, zero-dependency, pure js/html status page based on GitHub Pages and Actions. 项目地址: https://gitcode.com/gh_mirrors/sta/statuspage Statsig Statu…

作者头像 李华
网站建设 2026/7/4 7:02:17

CANN/ops-math Roll循环位移算子

Roll 【免费下载链接】ops-math 本项目是CANN提供的数学类基础计算算子库&#xff0c;实现网络在NPU上加速计算。 项目地址: https://gitcode.com/cann/ops-math 贡献说明 贡献者贡献方贡献算子贡献时间贡献内容boxw987个人开发者Roll2026/06Roll 算子适配开源仓 支持…

作者头像 李华
网站建设 2026/7/4 7:01:32

CANN/ops-tensor MX量化分组矩阵乘法Block组件

Block Mmad Qgmm Mx 【免费下载链接】ops-tensor ops-tensor 是 CANN &#xff08;Compute Architecture for Neural Networks&#xff09;算子库中提供张量类计算的基础算子库&#xff0c;采用模块化设计&#xff0c;支持灵活的算子开发和管理。 项目地址: https://gitcode.…

作者头像 李华