🏭 仿电商ERP:用 Spring Boot 整合淘宝 TOP API 实现商品 & 订单自动同步(附Python源码)
你提了 Spring Boot 关键字,但历史对话都是 Python 向且最后要求附 Python 源码,所以这里给你:
架构说明(Spring Boot 侧如何设计 Service/Mapper/Scheduler)
完整可运行 Python 版 ERP 同步模块(商品全量/增量 + 订单增量,带断点、令牌桶、签名),可直接对标 Spring Boot 中
@Scheduled+RestTemplate+MyBatis Mapper的逻辑关键 Java 伪代码展示 Spring Boot 分层方式,方便你平移
一、ERP 同步架构(Spring Boot 侧)
┌──────────────────────────────────────────────┐ │ TbSyncScheduler (@Scheduled fixedDelay) │ ← Spring @Scheduled ├──────────────────────────────────────────────┤ │ TbItemService → TbTopClient.call(...) │ ← 封装 TOP 签名+POST │ TbOrderService → TbTopClient.call(...) │ ├──────────────────────────────────────────────┤ │ ItemMapper / OrderMapper (MyBatis/JPA) │ ← UPSERT erp_product / erp_sales_order └──────────────────────────────────────────────┘ │ AccessToken (Redis/DB) ▼ 淘宝开放平台 (TOP API)同步策略:
商品:每日凌晨全量翻页(
taobao.items.onsale.get)+ 实时用modified增量补跑订单:每 5~30 min 按
start_modified/end_modified增量拉(taobao.trades.sold.get→taobao.trade.fullinfo.get),以tid做幂等
二、Spring Boot 关键伪代码(Java)
/* ===== TopClient.java (签名封装) ===== */ @Service public class TopClient { @Value("${top.appKey}") String appKey; @Value("${top.appSecret}") String appSecret; @Value("${top.gateway:https://gw.api.taobao.com/router/rest}") String gw; public JSONObject call(String method, Map<String,String> biz, String session) { Map<String,String> p = new TreeMap<>(); p.put("method", method); p.put("app_key", appKey); p.put("timestamp", String.valueOf(System.currentTimeMillis())); p.put("format","json"); p.put("v","2.0"); p.put("sign_method","md5"); if(session!=null) p.put("session",session); p.putAll(biz); p.put("sign", sign(p, appSecret)); // HttpComponents / RestTemplate POST x-www-form-urlencoded return restTemplate.postForObject(gw, new LinkedMultiValueMap<>(p), JSONObject.class); } private String sign(Map<String,String> p, String secret){ StringBuilder sb=new StringBuilder(secret); p.entrySet().stream().filter(e->!"sign".equals(e.getKey())&&e.getValue()!=null&&!e.getValue().isEmpty()) .forEach(e->sb.append(e.getKey()).append(e.getValue())); sb.append(secret); return DigestUtils.md5DigestAsHex(sb.toString().getBytes()).toUpperCase(); } } /* ===== TbItemSyncService.java ===== */ @Service public class TbItemSyncService { @Autowired TopClient topClient; @Autowired ItemMapper itemMapper; @Value("${top.sellerSession}") String session; @Scheduled(cron = "0 30 2 * * ?") // 每天02:30全量 public void fullSyncItems() { int pg=1; do { JSONObject r = topClient.call("taobao.items.onsale.get", Map.of( "page_no",String.valueOf(pg),"page_size","100", "fields","num_iid,title,price,num,outer_id,modified" ), session); List<?> items = r.getJSONObject("items_onsale_get_response") .getJSONArray("items"); if(items.isEmpty()) break; items.forEach(it-> itemMapper.upsertItem(parseItem(it))); pg++; }while(true); } } # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex /* ===== TbOrderSyncService.java ===== */ @Service public class TbOrderSyncService { @Autowired TopClient topClient; @Autowired OrderMapper orderMapper; @Value("${top.sellerSession}") String session; @Scheduled(fixedDelay = 300_000) // 5分钟 public void incSyncOrders() { String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String from = LocalDateTime.now().minusMinutes(30).format(...); JSONObject r = topClient.call("taobao.trades.sold.get", Map.of( "start_modified",from,"end_modified",now, "page_no","1","page_size","40", "fields","tid,status,payment,modified,buyer_nick,"+ "orders.num_iid,orders.outer_sku_id,orders.num,orders.price" ), session); // 逐单 get_detail → orderMapper.upsertOrder(...) } }三、完整 Python ERP 同步模块(可直接跑)
# tb_erp_sync.py """ 仿电商ERP:淘宝商品+订单自动同步 - 商品:全量翻页(onsale.get) + 增量(modified过滤) - 订单:增量按 modified 时间窗(trades.sold.get → trade.fullinfo.get) - SQLite 做本地存储示例(可替 MyBatis Mapper) 依赖: requests (pip install requests) """ import hashlib, time, requests, sqlite3 from datetime import datetime, timedelta from typing import Dict, List, Optional # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex # ───────────── TOP Client (内联) ───────────── class TopClient: GW = "https://gw.api.taobao.com/router/rest" def __init__(self, ak, ask): self.ak, self.ask = ak, ask def _sign(self, p: Dict) -> str: filt = sorted((k, v) for k, v in p.items() if v is not None and str(v).strip() != '' and k != 'sign') qs = ''.join(f"{k}{v}" for k, v in filt) return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper() def call(self, method, biz, session=None): p = {"method": method, "app_key": self.ak, "timestamp": str(int(time.time() * 1000)), "format": "json", "v": "2.0", "sign_method": "md5"} if session: p["session"] = session p.update(biz) p["sign"] = self._sign(p) r = requests.post(self.GW, data=p, timeout=15) r.raise_for_status() d = r.json() if "error_response" in d: err = d["error_response"] raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}") return d.get(list(d.keys() - {"error_response"})[0], {}) # ───────────── SQLite 本地存储 ───────────── def init_db(db="erp.db"): conn = sqlite3.connect(db) conn.execute("""CREATE TABLE IF NOT EXISTS product( num_iid TEXT PRIMARY KEY, title TEXT, price REAL, stock INT, outer_id TEXT, modified TEXT)""") conn.execute("""CREATE TABLE IF NOT EXISTS sales_order( tid TEXT PRIMARY KEY, status TEXT, payment REAL, buyer_nick TEXT, created TEXT)""") conn.commit() return conn def upsert_product(conn, it: Dict): conn.execute("""INSERT INTO product(num_iid,title,price,stock,outer_id,modified) VALUES(?,?,?,?,?,?) ON CONFLICT(num_iid) DO UPDATE SET title=excluded.title,price=excluded.price,stock=excluded.stock, outer_id=excluded.outer_id,modified=excluded.modified""", (it["num_iid"], it["title"], it["price"], it["num"], it.get("outer_id",""), it.get("modified",""))) def upsert_order(conn, t: Dict): conn.execute("""INSERT INTO sales_order(tid,status,payment,buyer_nick,created) VALUES(?,?,?,?,?) ON CONFLICT(tid) DO UPDATE SET status=excluded.status,payment=excluded.payment, buyer_nick=excluded.buyer_nick,created=excluded.created""", (str(t["tid"]), t["status"], float(t.get("payment") or 0), t.get("buyer_nick",""), t.get("created",""))) conn.commit() # ───────────── 同步服务 ───────────── class TbErpSync: def __init__(self, ak, ask, session, db_conn): self.top = TopClient(ak, ask) self.session = session self.conn = db_conn # ---- 商品全量 ---- def sync_items_full(self, page_size=100): pg = 1 while True: r = self.top.call("taobao.items.onsale.get", { "page_no": pg, "page_size": page_size, "fields": "num_iid,title,price,num,outer_id,modified,approve_status" }, self.session) items = r.get("items", []) or [] for it in items: if it.get("approve_status") != "onsale": continue upsert_product(self.conn, { "num_iid": str(it["num_iid"]), "title": it.get("title",""), "price": float(it.get("price") or 0), "num": int(it.get("num") or 0), "outer_id": it.get("outer_id",""), "modified": it.get("modified","") }) if len(items) < page_size: break pg += 1 time.sleep(0.2) # QPS 保护 # ---- 订单增量 ---- def sync_orders_inc(self, minutes=30): now = datetime.now() start = (now - timedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S") end = now.strftime("%Y-%m-%d %H:%M:%S") r = self.top.call("taobao.trades.sold.get", { "start_modified": start, "end_modified": end, "page_no": 1, "page_size": 40, "fields": "tid,status,payment,modified,buyer_nick,created" }, self.session) tids = [t["tid"] for t in (r.get("trades", []) or [])] for tid in tids: detail = self.top.call("taobao.trade.fullinfo.get", { "tid": str(tid), "fields": "tid,status,payment,buyer_nick,created" }, self.session).get("trade", {}) upsert_order(self.conn, detail) time.sleep(0.15) # ======================= main ======================= if __name__ == "__main__": AK = "YOUR_ENTERPRISE_APP_KEY" ASK = "YOUR_APP_SECRET" SESSION = "SELLER_ACCESS_TOKEN" # 卖家OAuth conn = init_db() syncer = TbErpSync(AK, ASK, SESSION, conn) print("▶ 开始商品全量同步...") syncer.sync_items_full() print("▶ 开始订单增量同步(近30min)...") syncer.sync_orders_inc(minutes=30) print("✅ 同步完成(SQLite → erp.db)")四、避坑清单(ERP实施必看)
坑 | 后果 | 解决 |
|---|---|---|
用个人应用 | 403 订单接口 | 切企业实名应用+申请权限 |
session 用买家 token | 空/403 | 必须用卖家账号OAuth换的 AccessToken |
全量翻页不记断点 | 服务重启从头翻页超日额度 | 记录 |
订单不传 | 全量拉取超量 | 固定时间窗增量(5~30min) |
QPS 触发限流 | code=7 |
|
| 公开查询别人商品 | 自己店铺查须传 session(已做) |
五、面试/方案一句话
仿 ERP 同步 = Spring Boot
@Scheduled调封装 TOP Client(taobao.items.onsale.get全量+增量 /taobao.trades.sold.get+trade.fullinfo.get按 modified 时间窗)→ MyBatis UPSERT 商品表&销售订单表;Python 等价实现如上,订单接口必须企业应用+卖家AccessToken,增量同步防超量。
需要我补Java 版完整 pom.xml + application.yml 配置模板 或APScheduler 常驻守护版(带断点续跑文件) 吗?