Python实战:百度慧眼数据爬取与坐标转换全流程解析
当我们需要分析城市人流分布时,百度慧眼提供的热力图数据是个不错的选择。但直接从API获取的数据往往需要经过一系列处理才能用于分析。本文将带你完整走通从数据获取到坐标转换的整个流程,使用Python和sklearn构建一个健壮的数据处理管道。
1. 准备工作与环境搭建
在开始之前,确保你已经安装了以下Python库:
pip install requests pandas scikit-learn这些库将分别用于:
- requests:发送HTTP请求获取数据
- pandas:数据处理和分析
- scikit-learn:机器学习模型(用于坐标转换)
此外,建议使用Jupyter Notebook进行交互式开发,方便调试和可视化中间结果。
2. 数据获取:从抓包到API请求
2.1 分析API接口
通过浏览器开发者工具(F12)分析百度慧眼的热力图请求,我们发现关键API端点:
https://huiyan.baidu.com/openapi/v1/heatmap/heatmapsearch这个接口需要两个关键参数:
cityId:城市代码(如深圳为440300)ak:百度开发者密钥
2.2 构建Python请求
使用requests库构建请求:
import requests headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', 'Referer': 'http://huiyan.baidu.com/' } params = { 'cityId': '440300', # 深圳城市代码 'ak': '你的百度API密钥' # 替换为实际密钥 } response = requests.get( 'https://huiyan.baidu.com/openapi/v1/heatmap/heatmapsearch', headers=headers, params=params ) if response.status_code == 200: data = response.json() else: print(f"请求失败,状态码:{response.status_code}")2.3 数据解析
获取的数据格式通常如下:
"12679967_2573996_15|12667228_2576368_8|..."我们需要将其转换为结构化的DataFrame:
import pandas as pd def parse_heatmap_data(json_data): raw_str = json_data['result']['data'] records = [x.split('_') for x in raw_str.split('|') if x] df = pd.DataFrame(records, columns=['x', 'y', 'value']) df = df.apply(pd.to_numeric, errors='coerce').dropna() return df heatmap_df = parse_heatmap_data(data)3. 坐标转换:从墨卡托到经纬度
3.1 理解坐标系统
百度慧眼返回的是墨卡托坐标(bd09mc),而通常我们需要的是经纬度坐标(bd09ll)。虽然百度提供了官方转换API,但我们可以通过机器学习方法建立近似映射。
3.2 收集参考点
首先需要收集一组已知的墨卡托坐标和对应经纬度的参考点:
| 经度(lng) | 纬度(lat) | 墨卡托X(x) | 墨卡托Y(y) |
|---|---|---|---|
| 113.904579 | 22.656801 | 12679967 | 2573996 |
| 113.790251 | 22.676882 | 12667228 | 2576368 |
| ... | ... | ... | ... |
3.3 建立线性回归模型
使用sklearn的LinearRegression建立映射关系:
from sklearn.linear_model import LinearRegression # 准备训练数据 reference_df = pd.read_csv('reference_points.csv') # 上面表格保存为CSV X_train = reference_df[['x', 'y']] y_train = reference_df[['lng', 'lat']] # 训练模型 model = LinearRegression() model.fit(X_train, y_train) # 评估模型 print(f"模型R²分数:{model.score(X_train, y_train):.6f}")3.4 应用坐标转换
将模型应用到原始数据:
def convert_coordinates(df, model): coordinates = model.predict(df[['x', 'y']]) df['lng'] = coordinates[:, 0] df['lat'] = coordinates[:, 1] return df heatmap_df = convert_coordinates(heatmap_df, model)4. 工程化实现:构建完整数据处理管道
4.1 模块化设计
将上述步骤封装为可重用的函数:
class HeatmapProcessor: def __init__(self, api_key, city_code): self.api_key = api_key self.city_code = city_code self.model = None def load_reference_points(self, filepath): """加载参考点并训练模型""" ref_df = pd.read_csv(filepath) self.model = LinearRegression() self.model.fit(ref_df[['x', 'y']], ref_df[['lng', 'lat']]) def fetch_data(self): """获取原始热力图数据""" # 实现请求逻辑... pass def process(self): """完整处理流程""" raw_data = self.fetch_data() df = parse_heatmap_data(raw_data) if self.model: df = convert_coordinates(df, self.model) return df4.2 错误处理与日志
添加健壮的错误处理和日志记录:
import logging from datetime import datetime logging.basicConfig( filename='heatmap.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class HeatmapProcessor: # ... 其他代码 ... def fetch_data(self): try: response = requests.get(self.api_url, headers=self.headers, params=self.params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(f"API请求失败: {str(e)}") raise4.3 定时任务集成
使用schedule库实现定时获取:
import schedule import time def job(): try: processor = HeatmapProcessor(API_KEY, CITY_CODE) processor.load_reference_points('ref_points.csv') df = processor.process() df.to_csv(f'data/heatmap_{datetime.now().strftime("%Y%m%d_%H%M")}.csv') except Exception as e: logging.error(f"任务执行失败: {str(e)}") # 每小时执行一次 schedule.every().hour.do(job) while True: schedule.run_pending() time.sleep(60)5. 数据可视化与应用
5.1 使用Pyecharts可视化
from pyecharts.charts import Geo from pyecharts import options as opts def visualize_heatmap(df): geo = Geo() geo.add_schema(maptype="深圳") # 添加数据点 for _, row in df.iterrows(): geo.add_coordinate( f"point_{row.name}", row['lng'], row['lat'] ) geo.add( "", [(f"point_{row.name}", row['value'])], type_="scatter", symbol_size=5 ) geo.set_global_opts( title_opts=opts.TitleOpts(title="深圳人流热力图"), visualmap_opts=opts.VisualMapOpts(max_=df['value'].max()) ) return geo heatmap_chart = visualize_heatmap(heatmap_df) heatmap_chart.render("heatmap.html")5.2 数据分析应用示例
计算各区域人流密度:
# 使用KMeans聚类识别热点区域 from sklearn.cluster import KMeans coordinates = heatmap_df[['lng', 'lat']].values kmeans = KMeans(n_clusters=10) heatmap_df['cluster'] = kmeans.fit_predict(coordinates) # 计算每个聚类的人流总量 cluster_stats = heatmap_df.groupby('cluster')['value'].agg(['sum', 'count']) print("人流热点区域统计:") print(cluster_stats.sort_values('sum', ascending=False))6. 性能优化与扩展
6.1 批量处理与并行化
对于大规模数据,可以使用多线程/进程:
from concurrent.futures import ThreadPoolExecutor def process_city(city_code): processor = HeatmapProcessor(API_KEY, city_code) return processor.process() city_codes = ['440300', '310000'] # 深圳、上海 with ThreadPoolExecutor() as executor: results = list(executor.map(process_city, city_codes))6.2 模型优化
尝试更复杂的回归模型提高坐标转换精度:
from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import cross_val_score # 使用随机森林回归 rf_model = RandomForestRegressor(n_estimators=100) scores = cross_val_score(rf_model, X_train, y_train, cv=5) print(f"交叉验证R²: {scores.mean():.4f} (±{scores.std():.4f})")6.3 数据持久化
使用SQLite存储历史数据:
import sqlite3 from contextlib import closing def save_to_db(df, db_path='heatmap.db'): with closing(sqlite3.connect(db_path)) as conn: df.to_sql('heatmap_data', conn, if_exists='append', index=False)