news 2026/3/6 13:15:24

采集室内空气质量数据。(甲醛,pm2.5)超标时,自动启动空气净化器,净化达标后自动关闭。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
采集室内空气质量数据。(甲醛,pm2.5)超标时,自动启动空气净化器,净化达标后自动关闭。

智能室内空气质量监控与净化系统

一、实际应用场景描述

场景背景

在现代煤矿智能化开采的办公环境中,由于煤矿设备维护区域、实验室、会议室等封闭空间可能存在甲醛(来自新设备、装修材料)和PM2.5(来自外部空气、设备运行)污染。长期暴露在这样的环境中会影响员工健康和工作效率。

痛点分析

1. 健康风险:甲醛是致癌物,PM2.5可引发呼吸道疾病

2. 被动响应:传统方式依赖人工发现污染并手动开启净化设备

3. 能源浪费:净化器常开造成电力浪费,设备损耗

4. 监管困难:缺乏实时数据记录和分析,难以优化空气质量

二、核心逻辑讲解

系统架构

传感器数据采集 → 数据处理分析 → 阈值判断 → 设备控制 → 状态反馈

控制逻辑

1. 实时监测甲醛和PM2.5浓度

2. 当任一指标超标时,自动启动净化器

3. 持续监测直至两项指标均达标

4. 达标后延迟关闭,防止频繁启停

5. 记录所有数据用于分析和优化

三、代码实现

项目结构

air_quality_system/

├── main.py # 主程序

├── sensors/

│ ├── __init__.py

│ ├── air_quality_sensor.py # 传感器模拟/接口

│ └── sensor_interface.py # 传感器抽象接口

├── controllers/

│ ├── __init__.py

│ └── purifier_controller.py # 净化器控制器

├── config/

│ ├── __init__.py

│ └── settings.py # 配置文件

├── utils/

│ ├── __init__.py

│ ├── logger.py # 日志模块

│ └── data_handler.py # 数据处理

├── requirements.txt # 依赖包

└── README.md # 说明文档

1. 配置文件 (config/settings.py)

"""

系统配置文件

定义空气质量标准和系统参数

"""

class AirQualityStandard:

"""空气质量标准类 (基于GB/T 18883-2022室内空气质量标准)"""

# 甲醛浓度标准 (mg/m³)

FORMALDEHYDE = {

'excellent': 0.03, # 优级

'good': 0.05, # 良好

'limit': 0.08, # 国家标准限值

'warning': 0.10, # 警告阈值

'danger': 0.20 # 危险阈值

}

# PM2.5浓度标准 (μg/m³)

PM25 = {

'excellent': 15, # 优级

'good': 35, # 良好

'limit': 50, # 国家标准限值

'warning': 75, # 警告阈值

'danger': 150 # 危险阈值

}

class SystemConfig:

"""系统运行参数配置"""

# 监测间隔(秒)

MONITOR_INTERVAL = 5

# 净化器控制参数

PURIFIER_DELAY_OFF = 60 # 达标后延迟关闭时间(秒)

PURIFIER_MIN_RUN_TIME = 300 # 净化器最小运行时间(秒)

# 报警参数

ENABLE_ALERT = True

ALERT_INTERVAL = 300 # 报警间隔(秒)

# 数据记录

LOG_ENABLED = True

DATA_SAVE_INTERVAL = 60 # 数据保存间隔(秒)

2. 传感器接口模块 (sensors/sensor_interface.py)

"""

传感器抽象接口模块

定义统一的传感器接口规范

"""

from abc import ABC, abstractmethod

import time

from datetime import datetime

class AirQualitySensor(ABC):

"""空气质量传感器抽象基类"""

def __init__(self, sensor_id: str, location: str = "unknown"):

"""

初始化传感器

参数:

sensor_id: 传感器唯一标识

location: 传感器位置描述

"""

self.sensor_id = sensor_id

self.location = location

self.status = "disconnected"

self.last_update = None

@abstractmethod

def connect(self) -> bool:

"""连接传感器设备"""

pass

@abstractmethod

def disconnect(self) -> bool:

"""断开传感器连接"""

pass

@abstractmethod

def read_data(self) -> dict:

"""

读取传感器数据

返回:

包含空气质量数据的字典

"""

pass

def get_status(self) -> dict:

"""获取传感器状态"""

return {

"sensor_id": self.sensor_id,

"location": self.location,

"status": self.status,

"last_update": self.last_update

}

class SensorData:

"""传感器数据封装类"""

def __init__(self, formaldehyde: float, pm25: float, temperature: float = None,

humidity: float = None, co2: float = None):

"""

初始化传感器数据

参数:

formaldehyde: 甲醛浓度 (mg/m³)

pm25: PM2.5浓度 (μg/m³)

temperature: 温度 (°C)

humidity: 湿度 (%)

co2: 二氧化碳浓度 (ppm)

"""

self.formaldehyde = formaldehyde

self.pm25 = pm25

self.temperature = temperature

self.humidity = humidity

self.co2 = co2

self.timestamp = datetime.now()

def to_dict(self) -> dict:

"""转换为字典格式"""

return {

"timestamp": self.timestamp.isoformat(),

"formaldehyde": self.formaldehyde,

"pm25": self.pm25,

"temperature": self.temperature,

"humidity": self.humidity,

"co2": self.co2

}

def __str__(self) -> str:

"""字符串表示"""

return (f"时间: {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}, "

f"甲醛: {self.formaldehyde:.3f} mg/m³, "

f"PM2.5: {self.pm25:.1f} μg/m³")

3. 传感器模拟实现 (sensors/air_quality_sensor.py)

"""

空气质量传感器实现模块

包含真实传感器接口和模拟传感器

"""

import random

import time

from typing import Optional

from .sensor_interface import AirQualitySensor, SensorData

class SimulatedAirQualitySensor(AirQualitySensor):

"""模拟空气质量传感器(用于开发和测试)"""

def __init__(self, sensor_id: str, location: str = "模拟传感器",

base_levels: Optional[dict] = None):

"""

初始化模拟传感器

参数:

sensor_id: 传感器ID

location: 传感器位置

base_levels: 基础污染物水平

"""

super().__init__(sensor_id, location)

# 基础污染物水平

self.base_levels = base_levels or {

"formaldehyde": 0.02, # 基础甲醛水平

"pm25": 20, # 基础PM2.5水平

}

# 污染事件参数

self.pollution_event = False

self.event_start_time = None

self.event_duration = 0

def connect(self) -> bool:

"""连接模拟传感器"""

print(f"[INFO] 连接模拟传感器 {self.sensor_id} 在 {self.location}")

self.status = "connected"

return True

def disconnect(self) -> bool:

"""断开模拟传感器"""

print(f"[INFO] 断开模拟传感器 {self.sensor_id}")

self.status = "disconnected"

return True

def _simulate_pollution_event(self) -> tuple:

"""模拟污染事件"""

current_time = time.time()

# 随机触发污染事件

if not self.pollution_event and random.random() < 0.05: # 5%概率触发

self.pollution_event = True

self.event_start_time = current_time

self.event_duration = random.randint(300, 1800) # 5-30分钟

# 处理污染事件

if self.pollution_event:

event_progress = (current_time - self.event_start_time) / self.event_duration

if event_progress >= 1: # 事件结束

self.pollution_event = False

return 0, 0

else:

# 污染峰值在事件中期

pollution_factor = 4 * event_progress * (1 - event_progress)

return pollution_factor * 0.3, pollution_factor * 200 # 甲醛和PM2.5增量

return 0, 0

def read_data(self) -> dict:

"""

读取模拟传感器数据

返回:

传感器数据字典

"""

if self.status != "connected":

raise ConnectionError("传感器未连接")

# 获取基础水平

formaldehyde_base = self.base_levels["formaldehyde"]

pm25_base = self.base_levels["pm25"]

# 模拟日常波动

hour = time.localtime().tm_hour

time_factor = 0.5 + 0.5 * abs(12 - hour) / 12 # 中午最高

# 模拟随机波动

formaldehyde_variation = random.uniform(-0.01, 0.01) * time_factor

pm25_variation = random.uniform(-5, 5) * time_factor

# 模拟污染事件

formaldehyde_event, pm25_event = self._simulate_pollution_event()

# 计算最终值

formaldehyde = max(0.01, formaldehyde_base + formaldehyde_variation + formaldehyde_event)

pm25 = max(5, pm25_base + pm25_variation + pm25_event)

# 添加温湿度模拟

temperature = 20 + random.uniform(-2, 2) + (hour - 12) * 0.5

humidity = 50 + random.uniform(-10, 10)

# 创建数据对象

sensor_data = SensorData(

formaldehyde=formaldehyde,

pm25=pm25,

temperature=round(temperature, 1),

humidity=round(humidity, 1)

)

self.last_update = time.time()

return sensor_data.to_dict()

class RealAirQualitySensor(AirQualitySensor):

"""真实空气质量传感器接口(示例,需根据实际硬件实现)"""

def __init__(self, sensor_id: str, location: str, port: str, baudrate: int = 9600):

"""

初始化真实传感器

参数:

sensor_id: 传感器ID

location: 传感器位置

port: 串口端口

baudrate: 波特率

"""

super().__init__(sensor_id, location)

self.port = port

self.baudrate = baudrate

self.serial_conn = None

def connect(self) -> bool:

"""连接真实传感器"""

try:

import serial

self.serial_conn = serial.Serial(

port=self.port,

baudrate=self.baudrate,

timeout=2

)

self.status = "connected"

print(f"[INFO] 成功连接传感器 {self.sensor_id} 在端口 {self.port}")

return True

except Exception as e:

print(f"[ERROR] 连接传感器失败: {e}")

self.status = "disconnected"

return False

def disconnect(self) -> bool:

"""断开传感器连接"""

if self.serial_conn and self.serial_conn.is_open:

self.serial_conn.close()

self.status = "disconnected"

return True

def _parse_sensor_data(self, raw_data: bytes) -> Optional[dict]:

"""

解析传感器原始数据

注意:此函数需要根据实际传感器协议实现

"""

# 示例解析逻辑(需根据实际传感器调整)

try:

# 假设传感器数据格式: "HCHO:0.05,PM2.5:35,TEMP:25.0,HUM:50"

data_str = raw_data.decode('utf-8').strip()

data_dict = {}

for item in data_str.split(','):

if ':' in item:

key, value = item.split(':')

data_dict[key.strip()] = float(value)

# 映射到标准字段

formaldehyde = data_dict.get('HCHO', 0.0)

pm25 = data_dict.get('PM2.5', 0.0)

temperature = data_dict.get('TEMP', None)

humidity = data_dict.get('HUM', None)

sensor_data = SensorData(

formaldehyde=formaldehyde,

pm25=pm25,

temperature=temperature,

humidity=humidity

)

return sensor_data.to_dict()

except Exception as e:

print(f"[ERROR] 解析传感器数据失败: {e}")

return None

def read_data(self) -> dict:

"""从真实传感器读取数据"""

if not self.serial_conn or not self.serial_conn.is_open:

raise ConnectionError("传感器未连接")

try:

# 发送读取命令(根据实际传感器协议)

self.serial_conn.write(b'READ\r\n')

time.sleep(0.1)

# 读取返回数据

raw_data = self.serial_conn.readline()

# 解析数据

data = self._parse_sensor_data(raw_data)

if data:

self.last_update = time.time()

return data

else:

raise ValueError("无效的传感器数据")

except Exception as e:

print(f"[ERROR] 读取传感器数据失败: {e}")

raise

4. 净化器控制器 (controllers/purifier_controller.py)

"""

空气净化器控制器模块

控制净化器的开启和关闭

"""

import time

from abc import ABC, abstractmethod

from datetime import datetime, timedelta

class AirPurifierController(ABC):

"""空气净化器控制器抽象基类"""

def __init__(self, device_id: str, location: str = "unknown"):

"""

初始化净化器控制器

参数:

device_id: 设备ID

location: 设备位置

"""

self.device_id = device_id

self.location = location

self.status = "off" # "off", "on", "error"

self.start_time = None

self.total_runtime = timedelta(0)

self.energy_consumption = 0 # 千瓦时

@abstractmethod

def turn_on(self) -> bool:

"""开启净化器"""

pass

@abstractmethod

def turn_off(self) -> bool:

"""关闭净化器"""

pass

@abstractmethod

def get_status(self) -> dict:

"""获取净化器状态"""

pass

def update_runtime(self):

"""更新运行时间统计"""

if self.status == "on" and self.start_time:

runtime = datetime.now() - self.start_time

self.total_runtime += runtime

# 假设功率为50W

self.energy_consumption += 0.05 * runtime.total_seconds() / 3600

self.start_time = datetime.now()

class SimulatedPurifierController(AirPurifierController):

"""模拟空气净化器控制器(用于开发和测试)"""

def __init__(self, device_id: str, location: str = "模拟净化器"):

"""初始化模拟净化器控制器"""

super().__init__(device_id, location)

self.power_consumption = 0.05 # 千瓦

self.filter_life = 100 # 过滤器寿命百分比

self.air_flow_level = 2 # 1-3档

def turn_on(self) -> bool:

"""开启模拟净化器"""

if self.status != "on":

print(f"[INFO] 开启净化器 {self.device_id} 在 {self.location}")

self.status = "on"

self.start_time = datetime.now()

# 模拟过滤器损耗

if self.filter_life > 0:

self.filter_life -= 0.01

return True

def turn_off(self) -> bool:

"""关闭模拟净化器"""

if self.status == "on":

print(f"[INFO] 关闭净化器 {self.device_id}")

self.update_runtime()

self.status = "off"

self.start_time = None

return True

def get_status(self) -> dict:

"""获取净化器状态"""

self.update_runtime()

return {

"device_id": self.device_id,

"location": self.location,

"status": self.status,

"total_runtime_hours": round(self.total_runtime.total_seconds() / 3600, 2),

"energy_consumption_kwh": round(self.energy_consumption, 2),

"filter_life_percent": round(self.filter_life, 1),

"air_flow_level": self.air_flow_level

}

class SmartPurifierController(AirPurifierController):

"""智能净化器控制器(支持多档位和模式)"""

def __init__(self, device_id: str, location: str, power: float = 0.05):

"""

初始化智能净化器

参数:

device_id: 设备ID

location: 设备位置

power: 额定功率(千瓦)

"""

super().__init__(device_id, location)

self.power = power

self.mode = "auto" # auto, manual, sleep

self.fan_speed = 0 # 0-3

self.air_quality_history = []

self.max_history_size = 100

def turn_on(self, speed: int = 2) -> bool:

"""

开启净化器

参数:

speed: 风扇速度 (1-3)

返回:

操作是否成功

"""

if self.status != "on":

print(f"[INFO] 开启智能净化器 {self.device_id},风速{speed}档")

self.status = "on"

self.fan_speed = max(1, min(3, speed))

self.start_time = datetime.now()

return True

def turn_off(self) -> bool:

"""关闭净化器"""

if self.status == "on":

print(f"[INFO] 关闭智能净化器 {self.device_id}")

self.update_runtime()

self.status = "off"

self.fan_speed = 0

self.start_time = None

return True

def set_mode(self, mode: str) -> bool:

"""

设置净化器模式

参数:

mode: 模式 (auto, manual, sleep)

返回:

操作是否成功

"""

valid_modes = ["auto", "manual", "sleep"]

if mode in valid_modes:

self.mode = mode

print(f"[INFO] 设置净化器 {self.device_id} 为 {mode} 模式")

if mode == "sleep":

self.fan_speed = 1

elif mode == "auto" and self.status == "on":

# 根据历史数据调整风速

self._adjust_speed_auto()

return True

return False

def _adjust_speed_auto(self):

"""根据空气质量自动调整风速"""

if not self.air_quality_history:

return

# 获取最近的平均空气质量

recent_data = self.air_quality_history[-10:] # 最近10个数据点

avg_formaldehyde = sum(d["formaldehyde"] for d in recent_data) / len(recent_data)

avg_pm25 = sum(d["pm25"] for d in recent_data) / len(recent_data)

# 根据污染物水平调整风速

if avg_formaldehyde > 0.15 or avg_pm25 > 150:

self.fan_speed = 3

elif avg_formaldehyde > 0.08 or avg_pm25 > 75:

self.fan_speed = 2

else:

self.fan_speed = 1

def update_air_quality(self, formaldehyde: float, pm25: float):

"""

更新空气质量数据(用于自动模式)

参数:

formaldehyde: 甲醛浓度

pm25: PM2.5浓度

"""

data_point = {

"timestamp": datetime.now(),

"formaldehyde": formaldehyde,

"pm25": pm25

}

self.air_quality_history.append(data_point)

# 保持历史数据大小

if len(self.air_quality_history) > self.max_history_size:

self.air_quality_history.pop(0)

# 自动模式下调整风速

if self.mode == "auto" and self.status == "on":

self._adjust_speed_auto()

def get_status(self) -> dict:

"""获取净化器详细状态"""

self.update_runtime()

status = {

"device_id": self.device_id,

"location": self.location,

"status": self.status,

"mode": self.mode,

"fan_speed": self.fan_speed,

"total_runtime_hours": round(self.total_runtime.total_seconds() / 3600, 2),

"energy_consumption_kwh": round(self.energy_consumption, 2),

"power_kw": self.power

}

if self.air_quality_history:

latest = self.air_quality_history[-1]

status["latest_formaldehyde"] = latest["formaldehyde"]

status["latest_pm25"] = latest["pm25"]

return status

5. 数据处理器 (utils/data_handler.py)

"""

数据处理模块

处理、分析和保存空气质量数据

"""

import json

import csv

from datetime import datetime, timedelta

from typing import List, Dict, Optional

import statistics

class AirQualityDataHandler:

"""空气质量数据处理类"""

def __init__(self, data_file: str = "air_quality_data.json"):

"""

初始化数据处理器

参数:

data_file: 数据存储文件路径

"""

self.data_file = data_file

self.history_data = []

self._load_data()

def _load_data(self):

"""从文件加载历史数据"""

try:

with open(self.data_file, 'r', encoding='utf-8') as f:

self.history_data = json.load(f)

print(f"[INFO] 从 {self.data_file} 加载了 {len(self.history_data)} 条历史数据")

except (FileNotFoundError, json.JSONDecodeError):

self.history_data = []

print(f"[INFO] 未找到历史数据文件,创建新的数据存储")

def save_data(self, data: dict):

"""

保存单条数据

参数:

data: 空气质量数据字典

"""

# 添加时间戳

if "timestamp" not in data:

data["timestamp"] = datetime.now().isoformat()

self.history_data.append(data)

# 定期保存到文件

if len(self.history_data) % 10 == 0:

self._save_to_file()

def _save_to_file(self):

"""保存数据到文件"""

try:

with open(self.data_file, 'w', encoding='utf-8') as f:

json.dump(self.history_data, f, ensure_ascii=False, indent=2)

except Exception as e:

print(f"[ERROR] 保存数据失败: {e}")

def get_recent_data(self, hours: int = 24) -> List[dict]:

"""

获取最近指定小时的数据

参数:

hours: 小时数

返回:

最近的数据列表

"""

cutoff_time = datetime.now() - timedelta(hours=hours)

recent_data = []

for data_point in self.history_data:

try:

data_time = datetime.fromisoformat(data_point["timestamp"])

if data_time >= cutoff_time:

recent_data.append(data_point)

except (KeyError, ValueError):

continue

return recent_data

def analyze_air_quality(self, hours: int = 24) -> dict:

"""

分析空气质量数据

参数:

hours: 分析的时间范围(小时)

返回:

分析结果字典

"""

recent_data = s

如果你觉得这个工具好用,欢迎关注我

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/4 18:51:51

Proteus 8 Professional下载包解析:适配32/64位Windows系统

从零搭建高效电子仿真环境&#xff1a;深入解析 Proteus 8 Professional 的双系统适配与实战部署 在嵌入式开发的世界里&#xff0c;调试往往比编码更耗时。你是否经历过这样的场景&#xff1a;代码写完、烧录进板子&#xff0c;结果LED不亮、串口无输出&#xff0c;排查半天才…

作者头像 李华
网站建设 2026/2/23 15:39:42

Fritzing支持下的创客教育模式:全面讲解

让电路“活”起来&#xff1a;用Fritzing点燃创客教育的实践之火 你有没有见过这样的场景&#xff1f;一个初一学生皱着眉头&#xff0c;手握万用表&#xff0c;在一堆杂乱的杜邦线中寻找哪根接错了&#xff1b;或者一位老师在PPT里贴了一张模糊的手绘电路图&#xff0c;台下学…

作者头像 李华
网站建设 2026/3/2 5:20:20

野马数据:AI如何重构大数据分析流程

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个基于野马数据的AI辅助分析工具&#xff0c;能够自动完成以下功能&#xff1a;1. 数据清洗与预处理&#xff08;处理缺失值、异常值&#xff09;2. 智能特征工程&#xff0…

作者头像 李华
网站建设 2026/2/25 20:00:05

Keil生成Bin文件在Bootloader烧录中的应用指南

Keil生成Bin文件在Bootloader烧录中的实战全解析你有没有遇到过这样的场景&#xff1a;辛辛苦苦写完代码&#xff0c;Keil编译通过&#xff0c;AXF文件也生成了——但当你把固件交给生产部门或准备做远程升级时&#xff0c;对方却说&#xff1a;“我们要的是.bin文件。”这时候…

作者头像 李华
网站建设 2026/3/4 5:15:48

Hunyuan-MT-7B对缩写词、专有名词的翻译策略解析

Hunyuan-MT-7B对缩写词、专有名词的翻译策略解析 在当今全球信息高速流动的时代&#xff0c;跨语言沟通早已不再是简单的“字面转换”。一个企业名称、技术术语或地名的微小偏差&#xff0c;可能引发误解甚至影响国际形象。尤其当文本中频繁出现诸如“AI”、“GDP”、“UNESCO”…

作者头像 李华
网站建设 2026/2/27 11:46:06

膝点迁移动态多目标优化算法【附代码】

✅ 博主简介&#xff1a;擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导&#xff0c;毕业论文、期刊论文经验交流。(1) 基于膝点预测的动态环境响应机制 动态多目标优化问题&#xff08;DMOP&#xff09;的难点在于Pareto前沿随时间或环境变化&#xff0c;…

作者头像 李华