news 2026/2/15 10:25:21

编写店铺客流热力分析程序,基于监控视频提取客流数据,生成店内热力图,识别高峰时段与热力区域,提供货架摆放优化建议。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
编写店铺客流热力分析程序,基于监控视频提取客流数据,生成店内热力图,识别高峰时段与热力区域,提供货架摆放优化建议。

我将为您设计一个基于创新思维与战略管理的店铺客流热力分析程序。这个项目将商业智能分析与战略决策支持相结合。

项目结构

store_heatmap_analysis/

├── main.py # 主程序入口

├── video_processor.py # 视频处理模块

├── heatmap_generator.py # 热力图生成模块

├── analytics_engine.py # 数据分析引擎

├── strategic_advisor.py # 战略建议模块

├── data/

│ ├── sample_video.mp4

│ ├── customer_data.csv

│ └── store_layout.json

├── output/

│ ├── heatmaps/

│ ├── reports/

│ └── recommendations/

├── utils/

│ ├── config.py

│ └── helpers.py

├── README.md

└── requirements.txt

1. 依赖文件 (requirements.txt)

opencv-python==4.8.1.78

numpy==1.24.3

matplotlib==3.7.2

seaborn==0.12.2

pandas==2.0.3

pillow==10.0.1

scikit-learn==1.3.0

plotly==5.15.0

scipy==1.11.2

2. 配置文件 (utils/config.py)

"""

配置管理模块

基于战略管理中的标准化原则,统一管理项目配置

"""

import os

from datetime import timedelta

class Config:

# 基础路径配置

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

DATA_DIR = os.path.join(BASE_DIR, 'data')

OUTPUT_DIR = os.path.join(BASE_DIR, 'output')

HEATMAP_DIR = os.path.join(OUTPUT_DIR, 'heatmaps')

REPORT_DIR = os.path.join(OUTPUT_DIR, 'reports')

RECOMMENDATION_DIR = os.path.join(OUTPUT_DIR, 'recommendations')

# 视频处理配置

FRAME_SAMPLE_RATE = 30 # 每30帧采样一次

DETECTION_CONFIDENCE = 0.6

MIN_DETECTION_AREA = 500 # 最小检测区域像素

# 热力图配置

GRID_SIZE = (20, 15) # 将店铺划分为20x15的网格

HEATMAP_COLORS = ['#00FF00', '#FFFF00', '#FF0000'] # 绿->黄->红

OPACITY = 0.7

# 分析时间配置

PEAK_HOUR_THRESHOLD = 0.8 # 超过平均客流80%算作高峰期

TIME_SLOT_MINUTES = 60 # 时间段划分粒度

# 商业策略配置

HIGH_TRAFFIC_ZONE_RATIO = 0.3 # 高流量区域占比

PRODUCT_CATEGORY_WEIGHTS = {

'premium': 2.0, # 高端产品权重

'popular': 1.5, # 热销产品权重

'regular': 1.0, # 常规产品权重

'clearance': 0.8 # 清仓产品权重

}

# 创新分析维度

INNOVATION_METRICS = [

'customer_flow_efficiency',

'space_utilization_rate',

'conversion_opportunity_index',

'strategic_positioning_score'

]

@classmethod

def create_directories(cls):

"""创建必要的目录结构"""

directories = [cls.DATA_DIR, cls.HEATMAP_DIR, cls.REPORT_DIR, cls.RECOMMENDATION_DIR]

for directory in directories:

os.makedirs(directory, exist_ok=True)

@classmethod

def get_time_slots(cls, start_hour=9, end_hour=22):

"""生成时间段列表"""

slots = []

current_time = start_hour

while current_time < end_hour:

slot_end = min(current_time + cls.TIME_SLOT_MINUTES // 60, end_hour)

slots.append(f"{current_time:02d}:00-{slot_end:02d}:00")

current_time = slot_end

return slots

3. 工具函数 (utils/helpers.py)

"""

辅助函数模块

提供通用的工具函数,体现模块化设计思维

"""

import cv2

import numpy as np

import pandas as pd

from datetime import datetime, timedelta

import json

import matplotlib.pyplot as plt

from scipy.spatial.distance import cdist

class DataProcessor:

@staticmethod

def preprocess_frame(frame):

"""

预处理视频帧

创新思维:多步骤预处理提高检测准确性

"""

# 转换为灰度图

gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# 高斯模糊去噪

blurred = cv2.GaussianBlur(gray, (5, 5), 0)

# 背景减除(简单版本)

# 在实际项目中可以使用更复杂的背景建模算法

return blurred

@staticmethod

def detect_motion_regions(prev_frame, curr_frame, threshold=25):

"""

检测运动区域

基于差分法检测人流

"""

if prev_frame is None or curr_frame is None:

return []

# 计算帧差

frame_diff = cv2.absdiff(prev_frame, curr_frame)

thresh = cv2.threshold(frame_diff, threshold, 255, cv2.THRESH_BINARY)[1]

# 形态学操作去除噪声

kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))

thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)

thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)

# 查找轮廓

contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

motion_regions = []

for contour in contours:

area = cv2.contourArea(contour)

if area > 500: # 过滤小面积噪声

x, y, w, h = cv2.boundingRect(contour)

center_x = x + w // 2

center_y = y + h // 2

motion_regions.append((center_x, center_y, area))

return motion_regions

@staticmethod

def create_store_grid(frame_shape, grid_size):

"""

创建店铺网格系统

战略管理思维:将连续空间离散化便于分析

"""

height, width = frame_shape[:2]

rows, cols = grid_size

grid_cells = []

cell_width = width / cols

cell_height = height / rows

for i in range(rows):

row_cells = []

for j in range(cols):

x1 = int(j * cell_width)

y1 = int(i * cell_height)

x2 = int((j + 1) * cell_width)

y2 = int((i + 1) * cell_height)

row_cells.append({

'bounds': (x1, y1, x2, y2),

'grid_id': f"{i}_{j}",

'coordinates': (i, j)

})

grid_cells.append(row_cells)

return grid_cells

@staticmethod

def calculate_distance_matrix(points):

"""

计算点之间的距离矩阵

用于分析顾客流动路径

"""

if len(points) < 2:

return np.array([])

points_array = np.array(points)

distance_matrix = cdist(points_array, points_array, metric='euclidean')

return distance_matrix

class ReportGenerator:

@staticmethod

def generate_summary_report(analysis_data, output_path):

"""

生成分析报告

体现战略管理中的数据驱动决策理念

"""

report = {

'report_generated': datetime.now().isoformat(),

'summary_statistics': {},

'peak_analysis': {},

'strategic_recommendations': [],

'innovation_insights': {}

}

# 基础统计

if 'hourly_stats' in analysis_data:

hourly_data = analysis_data['hourly_stats']

total_customers = sum(stats['count'] for stats in hourly_data.values())

avg_customers = total_customers / len(hourly_data) if hourly_data else 0

report['summary_statistics'] = {

'total_customers': total_customers,

'average_customers_per_hour': round(avg_customers, 2),

'peak_hour': max(hourly_data.items(),

key=lambda x: x[1]['count'],

default=('N/A', {}))[0],

'analysis_period': f"{len(hourly_data)} hours"

}

# 保存报告

with open(output_path, 'w', encoding='utf-8') as f:

json.dump(report, f, ensure_ascii=False, indent=2)

return report

4. 视频处理模块 (video_processor.py)

"""

视频处理模块

基于创新思维的多层次分析方法,从原始视频中提取有价值的客流数据

"""

import cv2

import numpy as np

import pandas as pd

from datetime import datetime, timedelta

from collections import defaultdict, Counter

import os

from utils.config import Config

from utils.helpers import DataProcessor

class VideoProcessor:

def __init__(self, video_path=None):

self.video_path = video_path

self.cap = None

self.frame_count = 0

self.processed_frames = 0

self.store_grid = None

self.customer_trajectories = defaultdict(list)

self.hourly_data = defaultdict(lambda: defaultdict(int))

# 创新指标

self.innovation_metrics = {

'flow_efficiency': [],

'space_utilization': [],

'conversion_zones': []

}

def initialize_video(self, video_path=None):

"""初始化视频捕获"""

if video_path:

self.video_path = video_path

if not self.video_path or not os.path.exists(self.video_path):

raise ValueError(f"视频文件不存在: {self.video_path}")

self.cap = cv2.VideoCapture(self.video_path)

if not self.cap.isOpened():

raise ValueError("无法打开视频文件")

# 获取视频基本信息

self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))

self.fps = self.cap.get(cv2.CAP_PROP_FPS)

self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))

self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

print(f"视频信息: {self.width}x{self.height}, {self.fps} FPS, {self.frame_count} 帧")

# 创建店铺网格

self.store_grid = DataProcessor.create_store_grid(

(self.height, self.width), Config.GRID_SIZE

)

return True

def process_video(self, progress_callback=None):

"""

处理视频并提取客流数据

创新思维:分层处理,从原始数据到结构化信息

"""

if not self.cap:

self.initialize_video()

prev_frame = None

current_time = 0

frame_interval = Config.FRAME_SAMPLE_RATE

print("开始处理视频...")

for frame_num in range(0, self.frame_count, frame_interval):

self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)

ret, frame = self.cap.read()

if not ret:

continue

# 更新进度

if progress_callback and frame_num % (frame_interval * 10) == 0:

progress = (frame_num / self.frame_count) * 100

progress_callback(progress)

# 预处理帧

processed_frame = DataProcessor.preprocess_frame(frame)

# 检测运动区域

motion_regions = DataProcessor.detect_motion_regions(prev_frame, processed_frame)

if motion_regions:

# 分析时间

timestamp = frame_num / self.fps

dt_object = datetime(2024, 1, 1, 9, 0) + timedelta(seconds=timestamp)

hour_key = dt_object.strftime("%H:00")

# 更新网格数据

self._update_grid_data(motion_regions, hour_key, dt_object)

# 记录轨迹

self._record_trajectories(motion_regions, dt_object)

prev_frame = processed_frame

self.processed_frames += 1

print(f"视频处理完成,共处理 {self.processed_frames} 个采样帧")

return self._compile_analysis_data()

def _update_grid_data(self, regions, hour_key, timestamp):

"""更新网格统计数据"""

for region in regions:

x, y, area = region

# 确定所在网格

grid_i = min(int(y / (self.height / Config.GRID_SIZE[0])), Config.GRID_SIZE[0] - 1)

grid_j = min(int(x / (self.width / Config.GRID_SIZE[1])), Config.GRID_SIZE[1] - 1)

grid_id = f"{grid_i}_{grid_j}"

# 更新计数

self.hourly_data[hour_key][grid_id] += 1

def _record_trajectories(self, regions, timestamp):

"""记录顾客轨迹"""

for region in regions:

x, y, area = region

time_key = timestamp.strftime("%H:%M")

self.customer_trajectories[time_key].append((x, y))

def _compile_analysis_data(self):

"""编译分析数据"""

analysis_data = {

'video_info': {

'path': self.video_path,

'duration': self.frame_count / self.fps,

'resolution': f"{self.width}x{self.height}",

'processed_frames': self.processed_frames

},

'hourly_stats': dict(self.hourly_data),

'trajectory_summary': {

'total_time_points': len(self.customer_trajectories),

'peak_traffic_minute': max(self.customer_trajectories.items(),

key=lambda x: len(x[1]),

default=('N/A', []))[0]

},

'grid_statistics': self._calculate_grid_statistics()

}

return analysis_data

def _calculate_grid_statistics(self):

"""计算网格统计信息"""

grid_stats = {}

for hour, grid_data in self.hourly_data.items():

total_count = sum(grid_data.values())

if total_count > 0:

# 找出最热的网格

hottest_grid = max(grid_data.items(), key=lambda x: x[1])

grid_stats[hour] = {

'total_customers': total_count,

'hottest_grid': hottest_grid[0],

'hottest_count': hottest_grid[1],

'avg_per_grid': total_count / len(grid_data) if grid_data else 0

}

return grid_stats

def identify_peak_hours(self, threshold_ratio=None):

"""识别高峰时段"""

if threshold_ratio is None:

threshold_ratio = Config.PEAK_HOUR_THRESHOLD

if not self.hourly_data:

return []

# 计算每小时总客流

hourly_totals = {}

for hour, grid_data in self.hourly_data.items():

hourly_totals[hour] = sum(grid_data.values())

if not hourly_totals:

return []

# 计算阈值

avg_traffic = sum(hourly_totals.values()) / len(hourly_totals)

threshold = avg_traffic * threshold_ratio

# 识别高峰时段

peak_hours = [hour for hour, count in hourly_totals.items()

if count >= threshold]

return sorted(peak_hours, key=lambda x: hourly_totals[x], reverse=True)

def get_popular_zones(self, top_n=5):

"""获取热门区域"""

zone_counts = defaultdict(int)

for hour_data in self.hourly_data.values():

for grid_id, count in hour_data.items():

zone_counts[grid_id] += count

# 排序并返回前N个热门区域

popular_zones = sorted(zone_counts.items(), key=lambda x: x[1], reverse=True)

return popular_zones[:top_n]

def close(self):

"""释放资源"""

if self.cap:

self.cap.release()

5. 热力图生成模块 (heatmap_generator.py)

"""

热力图生成模块

基于数据分析结果生成直观的可视化热力图

体现创新思维的数据可视化呈现

"""

import cv2

import numpy as np

import matplotlib.pyplot as plt

import seaborn as sns

from matplotlib.colors import LinearSegmentedColormap

import plotly.graph_objects as go

import plotly.express as px

from datetime import datetime

from utils.config import Config

from utils.helpers import DataProcessor

class HeatmapGenerator:

def __init__(self):

self.heatmap_data = None

self.store_layout = None

self.time_series_data = []

def generate_static_heatmap(self, analysis_data, time_slot=None, save_path=None):

"""

生成静态热力图

创新思维:多层次可视化展示

"""

if time_slot and time_slot in analysis_data.get('hourly_stats', {}):

# 生成特定时间的热力图

grid_data = analysis_data['hourly_stats'][time_slot]

else:

# 生成总体热力图

grid_data = self._aggregate_all_data(analysis_data)

if not grid_data:

print("没有数据可生成热力图")

return None

# 创建热力图矩阵

heatmap_matrix = self._create_heatmap_matrix(grid_data)

# 使用matplotlib生成热力图

fig, ax = plt.subplots(figsize=(12, 8))

# 自定义颜色映射

colors = Config.HEATMAP_COLORS

cmap = LinearSegmentedColormap.from_list("custom_heatmap", colors, N=256)

# 绘制热力图

sns.heatmap(heatmap_matrix, annot=True, fmt='.0f', cmap=cmap,

ax=ax, cbar_kws={'label': '客流密度'})

ax.set_title(f'店铺客流热力图{" - " + time_slot if time_slot else ""}',

fontsize=16, fontweight='bold')

ax.set_xlabel('店铺宽度方向', fontsize=12)

ax.set_ylabel('店铺深度方向', fontsize=12)

plt.tight_layout()

if save_path:

plt.savefig(save_path, dpi=300, bbox_inches='tight')

print(f"热力图已保存到: {save_path}")

return fig

def generate_interactive_heatmap(self, analysis_data, save_path=None):

"""

生成交互式热力图

创新亮点:使用Plotly提供交互体验

"""

# 准备数据

grid_data = self._prepare_interactive_data(analysis_data)

if not grid_data:

return None

# 创建DataFrame

df = pd.DataFrame(grid_data)

# 使用Plotly生成交互式热力图

fig = px.imshow(df.pivot(index='row', columns='col', values='density'),

title='交互式店铺客流热力图',

color_continuous_scale=['green', 'yellow', 'red'],

labels={'color': '客流密度'})

fig.update_layout(width=800, height=600)

if save_path:

fig.write_html(save_path)

print(f"交互式热力图已保存到: {save_path}")

return fig

def generate_timeline_heatmap(self, analysis_data, save_path=None):

"""

生成时间轴热力图

战略管理视角:展示客流随时间变化趋势

"""

hourly_data = analysis_data.get('hourly_stats', {})

if not hourly_data:

return None

# 准备时间轴数据

time_labels = sorted(hourly_data.keys())

grid_ids = set()

for hour_data in hourly_data.values():

grid_ids.update(hour_data.keys())

grid_ids = sorted(list(grid_ids))

# 创建时间-网格矩阵

timeline_matrix = np.zeros((len(time_labels), len(grid_ids)))

for i, time_label in enumerate(time_labels):

for j, grid_id in enumerate(grid_ids):

timeline_matrix[i, j] = hourly_data[time_label].get(grid_id, 0)

# 生成热力图

fig, ax = plt.subplots(figsize=(15, 8))

im = ax.imshow(timeline_matrix, cmap='YlOrRd', aspect='auto')

# 设置标签

ax.set_xticks(range(len(grid_ids)))

ax.set_xticklabels(grid_ids, rotation=45)

ax.set_yticks(range(len(time_labels)))

ax.set_yticklabels(time_labels)

ax.set_title('客流时间轴热力图', fontsize=16, fontweight='bold')

ax.set_xlabel('店铺网格', fontsize=12)

ax.set_ylabel('时间段', fontsize=12)

# 添加颜色条

plt.colorbar(im, ax=ax, label='客流数量')

plt.tight_layout()

if save_path:

plt.savefig(save_path, dpi=300, bbox_inches='tight')

print(f"时间轴热力图已保存到: {save_path}")

return fig

def _aggregate_all_data(self, analysis_data):

"""聚合所有时间的数据"""

aggregated = defaultdict(int)

for hour_data in analysis_data.get('hourly_stats', {}).values():

for grid_id, count in hour_data.items():

aggregated[grid_id] += count

return dict(aggregated)

def _create_heatmap_matrix(self, grid_data):

"""创建热力图矩阵"""

rows, cols = Config.GRID_SIZE

matrix = np.zeros((rows, cols))

for grid_id, count in grid_data.items():

try:

i, j = map(int, grid_id.split('_'))

if 0 <= i < rows and 0 <= j < cols:

matrix[i, j] = count

except (ValueError, IndexError):

continue

return matrix

def _prepare_interactive_data(self, analysis_data):

"""准备交互式数据"""

grid_data = self._aggregate_all_data(analysis_data)

interactive_data = []

for grid_id, density in grid_data.items():

try:

i, j = map(int, grid_id.split('_'))

interactive_data.append({

'row': i,

'col': j,

'grid_id': grid_id,

'density': density

})

except (ValueError, IndexError):

continue

return interactive_data

def generate_comparison_heatmap(self, before_data, after_data, save_path=None):

"""

生成对比

关注我,有更多实用程序等着你!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/7 3:18:23

解决LLM返回JSON格式错误的3大技巧

怎么处理LLM 返回json格式不正确的问题 这种问题是设置的并发太多了,需要适当降低并发访问LLM API的数量,一般 gemini 也就是10个就是上线了; 最下面给出代码实现,包括怎么调用 gemini API 函数 extract_valid_llm_data 功能实现详细解析 步骤1:初始化默认返回值 val…

作者头像 李华
网站建设 2026/2/14 2:42:25

使用Miniconda-Python3.9搭建BERT文本分类PyTorch实验环境

使用Miniconda-Python3.9搭建BERT文本分类PyTorch实验环境 在自然语言处理&#xff08;NLP&#xff09;研究日益深入的今天&#xff0c;越来越多的研究者和工程师面临一个共同挑战&#xff1a;如何快速、稳定地复现BERT等预训练模型在具体任务上的表现。尤其是在情感分析、新闻…

作者头像 李华
网站建设 2026/2/14 22:09:20

Miniconda-Python3.9环境下实现PyTorch模型蓝绿部署流程

Miniconda-Python3.9环境下实现PyTorch模型蓝绿部署流程 在AI系统从实验室走向生产环境的过程中&#xff0c;一个常被低估但至关重要的问题浮现出来&#xff1a;如何在不中断服务的前提下安全地更新深度学习模型&#xff1f; 设想这样一个场景&#xff1a;你正在维护一个基于Re…

作者头像 李华
网站建设 2026/2/11 9:41:11

Miniconda-Python3.9环境下实现PyTorch模型安全沙箱运行

Miniconda-Python3.9环境下实现PyTorch模型安全沙箱运行 在高校实验室和AI初创团队中&#xff0c;你是否经历过这样的场景&#xff1a;刚接手一个项目&#xff0c;pip install -r requirements.txt 之后&#xff0c;系统Python环境直接崩溃&#xff1f;或者同事跑通的模型&…

作者头像 李华
网站建设 2026/2/13 20:42:36

Miniconda-Python3.9环境下使用Hydra管理PyTorch配置文件

Miniconda-Python3.9环境下使用Hydra管理PyTorch配置文件 在深度学习项目中&#xff0c;你有没有遇到过这样的场景&#xff1a;昨天还能复现的实验结果&#xff0c;今天却因为某个参数被悄悄修改而再也跑不出来&#xff1f;或者团队协作时&#xff0c;别人运行你的代码总是报错…

作者头像 李华