微信公众号文章转语音程序 - 盲人无障碍收听助手
一、实际应用场景与痛点
应用场景
视障用户小张每天都会浏览微信公众号文章获取资讯,但由于视觉障碍,他无法直接阅读屏幕上的文字内容。虽然有些手机有读屏功能,但在微信公众号环境下体验不佳,且无法离线收听。他需要一款能自动将文章转换为清晰语音文件的工具,方便在通勤、休息时通过耳机收听。
核心痛点
1. 屏幕阅读器兼容性差:微信公众号特殊排版常导致读屏软件识别错误
2. 网络依赖强:在线收听需要稳定网络,流量消耗大
3. 无法离线使用:没有网络时无法获取内容
4. 语音质量差:系统TTS机械感强,缺乏自然度
5. 操作复杂:多步骤操作对视障用户不友好
二、核心逻辑设计
1. 输入微信公众号文章URL
2. 爬取文章内容并清洗(移除广告、无关元素)
3. 提取正文、标题、作者信息
4. 智能分段处理(保持语义连贯性)
5. 调用高质量TTS引擎转换为语音
6. 添加导语和结语(提升体验)
7. 保存为MP3文件并添加ID3标签
8. 提供多种输出和分享方式
三、模块化代码实现
主程序文件:wechat_tts_converter.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
微信公众号文章转语音转换器
为视障用户提供无障碍阅读体验
作者:无障碍智能助手
版本:1.0.0
"""
import os
import re
import json
import time
import requests
from datetime import datetime
from bs4 import BeautifulSoup
import html2text
from urllib.parse import urlparse, parse_qs
import hashlib
from typing import Optional, Tuple, List, Dict
import warnings
warnings.filterwarnings('ignore')
# 语音合成模块(可根据需要切换不同引擎)
try:
from TTS.api import TTS
TTS_AVAILABLE = True
except ImportError:
TTS_AVAILABLE = False
print("提示: 高级TTS引擎未安装,将使用pyttsx3作为后备方案")
try:
import pyttsx3
PYTTSX3_AVAILABLE = True
except ImportError:
PYTTSX3_AVAILABLE = False
try:
from pydub import AudioSegment
from pydub.effects import normalize
AUDIO_PROCESSING_AVAILABLE = True
except ImportError:
AUDIO_PROCESSING_AVAILABLE = False
class WeChatArticleFetcher:
"""微信公众号文章获取器"""
def __init__(self, timeout=30):
"""
初始化文章获取器
Args:
timeout: 请求超时时间(秒)
"""
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
def fetch_article(self, url: str) -> Optional[Dict]:
"""
获取微信公众号文章内容
Args:
url: 文章URL
Returns:
包含文章信息的字典,或None(失败时)
"""
try:
print(f"正在获取文章: {url}")
response = self.session.get(url, timeout=self.timeout)
response.encoding = 'utf-8'
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
return None
return self._parse_html(response.text, url)
except Exception as e:
print(f"获取文章失败: {str(e)}")
return None
def _parse_html(self, html: str, url: str) -> Dict:
"""
解析HTML,提取文章内容
Args:
html: HTML内容
url: 文章URL
Returns:
文章信息字典
"""
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title_elem = soup.find('h1', class_='rich_media_title') or soup.find('h1', id='activity-name')
title = title_elem.get_text().strip() if title_elem else "未知标题"
# 提取作者
author_elem = soup.find('span', class_='rich_media_meta rich_media_meta_text')
author = author_elem.get_text().strip() if author_elem else "未知作者"
# 提取正文
content_elem = soup.find('div', class_='rich_media_content')
if not content_elem:
# 备用选择器
content_elem = soup.find('div', id='js_content')
if not content_elem:
return {
'title': title,
'author': author,
'content': "无法提取正文内容",
'url': url,
'success': False
}
# 清理不需要的元素
for elem in content_elem.find_all(['script', 'style', 'iframe', 'ins', 'ads']):
elem.decompose()
# 移除特定类名的元素(通常是广告)
for class_name in ['ad-slot', 'ad-wrap', 'advertisement', 'ad_', 'adsbygoogle']:
for elem in content_elem.find_all(class_=re.compile(class_name)):
elem.decompose()
# 转换HTML为纯文本
h2t = html2text.HTML2Text()
h2t.ignore_links = False
h2t.ignore_images = True
h2t.ignore_emphasis = False
h2t.body_width = 0
content_text = h2t.handle(str(content_elem))
# 清理文本
content_text = self._clean_content(content_text)
return {
'title': title,
'author': author,
'content': content_text,
'url': url,
'success': True,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def _clean_content(self, text: str) -> str:
"""
清理文本内容
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除多余空行
text = re.sub(r'\n\s*\n', '\n\n', text)
# 移除特定模式(如图片描述)
text = re.sub(r'!\[.*?\]\(.*?\)', '', text)
# 移除网址(但保留文字链接)
text = re.sub(r'http[s]?://\S+', '', text)
# 标准化标点
text = text.replace('’', "'").replace('—', '—')
return text.strip()
class ArticleProcessor:
"""文章内容处理器"""
def __init__(self, max_chunk_length=1000):
"""
初始化处理器
Args:
max_chunk_length: 每个语音片段的最大字符数
"""
self.max_chunk_length = max_chunk_length
def add_audio_intro(self, article: Dict, user_name: str = "用户") -> str:
"""
为文章添加语音导语
Args:
article: 文章信息字典
user_name: 用户名称
Returns:
带导语的完整文本
"""
intro = f"【微信公众号文章语音版】\n\n"
intro += f"您好{user_name},现在是 {datetime.now().strftime('%Y年%m月%d日 %H点%M分')}。\n\n"
intro += f"接下来为您播报微信公众号文章:{article['title']}。\n"
intro += f"作者:{article['author']}。\n\n"
intro += "以下是正文内容:\n\n"
outro = f"\n\n【文章播报结束】\n感谢您的收听,再见。"
return intro + article['content'] + outro
def smart_split(self, text: str) -> List[str]:
"""
智能分段,保持语义完整性
Args:
text: 完整文本
Returns:
分段后的文本列表
"""
if len(text) <= self.max_chunk_length:
return [text]
# 按段落分割
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
# 如果段落本身很长,需要进一步分割
if len(para) > self.max_chunk_length:
if current_chunk:
chunks.append(current_chunk)
current_chunk = ""
# 按句子分割长段落
sentences = re.split(r'(?<=[。!?.?!])', para)
temp_chunk = ""
for sentence in sentences:
if len(temp_chunk) + len(sentence) <= self.max_chunk_length:
temp_chunk += sentence
else:
if temp_chunk:
chunks.append(temp_chunk)
temp_chunk = sentence
if temp_chunk:
chunks.append(temp_chunk)
else:
if len(current_chunk) + len(para) + 2 <= self.max_chunk_length:
if current_chunk:
current_chunk += '\n\n' + para
else:
current_chunk = para
else:
chunks.append(current_chunk)
current_chunk = para
if current_chunk:
chunks.append(current_chunk)
return chunks
class TTSEngine:
"""语音合成引擎"""
def __init__(self, engine_type='pyttsx3', voice_speed=1.0, output_dir='output'):
"""
初始化TTS引擎
Args:
engine_type: 引擎类型 ('pyttsx3', 'coqui', 'edge')
voice_speed: 语速 (0.5-2.0)
output_dir: 输出目录
"""
self.engine_type = engine_type
self.voice_speed = voice_speed
self.output_dir = output_dir
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 初始化引擎
self.engine = None
self._init_engine()
def _init_engine(self):
"""初始化TTS引擎"""
if self.engine_type == 'coqui' and TTS_AVAILABLE:
try:
# 使用Coqui TTS(高质量开源TTS)
self.engine = TTS("tts_models/zh-CN/baker/tacotron2-DDC-GST")
print("已加载Coqui TTS引擎(高质量)")
except:
print("Coqui TTS加载失败,回退到pyttsx3")
self.engine_type = 'pyttsx3'
if self.engine_type == 'pyttsx3' and PYTTSX3_AVAILABLE:
try:
self.engine = pyttsx3.init()
# 设置语音属性
self.engine.setProperty('rate', 180 * self.voice_speed)
# 尝试获取中文语音
voices = self.engine.getProperty('voices')
for voice in voices:
if 'chinese' in voice.name.lower() or 'zh' in voice.id.lower():
self.engine.setProperty('voice', voice.id)
break
print("已加载pyttsx3引擎")
except Exception as e:
print(f"pyttsx3初始化失败: {str(e)}")
self.engine = None
def text_to_speech(self, text: str, filename: str) -> bool:
"""
文本转语音
Args:
text: 要转换的文本
filename: 输出文件名(不含扩展名)
Returns:
是否成功
"""
if not self.engine:
print("没有可用的TTS引擎")
return False
try:
output_path = os.path.join(self.output_dir, f"{filename}.mp3")
if self.engine_type == 'coqui':
# Coqui TTS
self.engine.tts_to_file(text=text, file_path=output_path)
else:
# pyttsx3
temp_path = os.path.join(self.output_dir, f"{filename}_temp.wav")
self.engine.save_to_file(text, temp_path)
self.engine.runAndWait()
# 转换为MP3
if AUDIO_PROCESSING_AVAILABLE:
audio = AudioSegment.from_wav(temp_path)
audio.export(output_path, format="mp3", bitrate="64k")
os.remove(temp_path)
else:
os.rename(temp_path, output_path.replace('.mp3', '.wav'))
print(f"语音文件已保存: {output_path}")
return True
except Exception as e:
print(f"语音合成失败: {str(e)}")
return False
def batch_text_to_speech(self, text_chunks: List[str], base_filename: str) -> List[str]:
"""
批量文本转语音
Args:
text_chunks: 文本分块列表
base_filename: 基础文件名
Returns:
生成的音频文件路径列表
"""
audio_files = []
for i, chunk in enumerate(text_chunks):
print(f"正在合成第 {i+1}/{len(text_chunks)} 段...")
chunk_filename = f"{base_filename}_part{i+1:02d}"
if self.text_to_speech(chunk, chunk_filename):
audio_files.append(os.path.join(self.output_dir, f"{chunk_filename}.mp3"))
return audio_files
class AudioMerger:
"""音频合并器"""
def __init__(self, output_dir='output'):
"""
初始化音频合并器
Args:
output_dir: 输出目录
"""
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
def merge_audio_files(self, audio_files: List[str], output_filename: str) -> Optional[str]:
"""
合并多个音频文件
Args:
audio_files: 音频文件路径列表
output_filename: 输出文件名
Returns:
合并后的文件路径,或None(失败时)
"""
if not AUDIO_PROCESSING_AVAILABLE:
print("警告: pydub未安装,无法合并音频文件")
return None
if not audio_files:
print("没有音频文件可合并")
return None
try:
if len(audio_files) == 1:
# 只有一个文件,直接重命名
output_path = os.path.join(self.output_dir, f"{output_filename}.mp3")
os.rename(audio_files[0], output_path)
return output_path
# 合并多个文件
combined = AudioSegment.empty()
for i, audio_file in enumerate(audio_files):
print(f"正在合并第 {i+1}/{len(audio_files)} 个文件...")
audio = AudioSegment.from_file(audio_file)
# 添加短暂静音(除了第一个文件)
if i > 0:
combined += AudioSegment.silent(duration=500)
combined += audio
# 标准化音频
combined = normalize(combined)
# 保存文件
output_path = os.path.join(self.output_dir, f"{output_filename}.mp3")
combined.export(output_path, format="mp3", bitrate="64k", tags={
'title': output_filename,
'artist': '微信公众号语音转换器',
'album': '无障碍阅读',
'date': datetime.now().strftime('%Y')
})
print(f"音频合并完成: {output_path}")
return output_path
except Exception as e:
print(f"音频合并失败: {str(e)}")
return None
class WeChatTTSConverter:
"""微信公众号文章转语音主类"""
def __init__(self, output_dir='output', tts_engine='pyttsx3'):
"""
初始化转换器
Args:
output_dir: 输出目录
tts_engine: TTS引擎类型
"""
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
# 初始化各模块
self.fetcher = WeChatArticleFetcher()
self.processor = ArticleProcessor()
self.tts_engine = TTSEngine(engine_type=tts_engine, output_dir=output_dir)
self.merger = AudioMerger(output_dir=output_dir)
# 创建日志目录
self.log_dir = os.path.join(output_dir, 'logs')
os.makedirs(self.log_dir, exist_ok=True)
def convert(self, url: str, user_name: str = "用户") -> Dict:
"""
转换微信公众号文章为语音
Args:
url: 文章URL
user_name: 用户名称(用于个性化问候)
Returns:
转换结果字典
"""
result = {
'success': False,
'message': '',
'article_info': {},
'audio_file': '',
'log_file': ''
}
try:
# 1. 获取文章
print("步骤1/4: 获取文章内容...")
article = self.fetcher.fetch_article(url)
if not article or not article.get('success', False):
result['message'] = '获取文章失败'
return result
result['article_info'] = {
'title': article['title'],
'author': article['author'],
'url': url,
'timestamp': article.get('timestamp', '')
}
# 2. 处理文章内容
print("步骤2/4: 处理文章内容...")
full_text = self.processor.add_audio_intro(article, user_name)
text_chunks = self.processor.smart_split(full_text)
print(f"文章处理完成,共分为 {len(text_chunks)} 段")
# 3. 生成唯一文件名
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
safe_title = re.sub(r'[^\w\u4e00-\u9fff]+', '_', article['title'])[:50]
base_filename = f"{safe_title}_{url_hash}"
# 4. 文本转语音
print("步骤3/4: 文本转语音合成...")
audio_files = self.tts_engine.batch_text_to_speech(text_chunks, base_filename)
if not audio_files:
result['message'] = '语音合成失败'
return result
# 5. 合并音频文件
print("步骤4/4: 合并音频文件...")
final_audio = self.merger.merge_audio_files(audio_files, base_filename)
if not final_audio:
# 如果没有合并,使用第一个音频文件
final_audio = audio_files[0]
# 6. 清理临时文件
for audio_file in audio_files:
if audio_file != final_audio and os.path.exists(audio_file):
os.remove(audio_file)
# 7. 记录日志
log_data = {
'timestamp': datetime.now().isoformat(),
'url': url,
'title': article['title'],
'audio_file': final_audio,
'chunks_count': len(text_chunks)
}
log_file = os.path.join(self.log_dir, f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{url_hash}.json")
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
result.update({
'success': True,
'message': '转换成功',
'audio_file': final_audio,
'log_file': log_file
})
print(f"转换完成!音频文件: {final_audio}")
except Exception as e:
result['message'] = f'转换过程中发生错误: {str(e)}'
print(f"错误: {str(e)}")
return result
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='微信公众号文章转语音转换器')
parser.add_argument('url', help='微信公众号文章URL')
parser.add_argument('--name', default='用户', help='用户名称(用于个性化问候)')
parser.add_argument('--output', default='output', help='输出目录')
parser.add_argument('--engine', default='pyttsx3', choices=['pyttsx3', 'coqui'],
help='TTS引擎类型 (pyttsx3 或 coqui)')
args = parser.parse_args()
# 检查必要库
print("=" * 60)
print("微信公众号文章转语音转换器 - 无障碍阅读助手")
print("=" * 60)
# 创建转换器实例
converter = WeChatTTSConverter(output_dir=args.output, tts_engine=args.engine)
# 执行转换
result = converter.convert(args.url, args.name)
# 显示结果
print("\n" + "=" * 60)
if result['success']:
print("✓ 转换成功!")
print(f" 标题: {result['article_info'].get('title', '未知')}")
print(f" 作者: {result['article_info'].get('author', '未知')}")
print(f" 音频文件: {result['audio_file']}")
print(f" 日志文件: {result['log_file']}")
else:
print("✗ 转换失败")
print(f" 错误: {result.get('message', '未知错误')}")
print("=" * 60)
return 0 if result['success'] else 1
if __name__ == "__main__":
# 示例用法
if len(os.sys.argv) == 1:
print("使用方法: python wechat_tts_converter.py <文章URL> [--name 用户名] [--output 输出目录]")
print("示例: python wechat_tts_converter.py https://mp.weixin.qq.com/s/xxx --name 张三")
# 测试模式
test_url = input("\n请输入测试文章URL(直接回车使用示例URL): ").strip()
if not test_url:
# 使用一个示例URL(实际使用时需要替换)
test_url = "https://mp.weixin.qq.com/s/示例文章ID"
print(f"使用示例URL: {test_url}")
if test_url and "weixin.qq.com" in test_url:
os.sys.argv = [os.sys.argv[0], test_url, "--name", "测试用户"]
exit(main())
配置文件:config.json
{
"app_name": "微信公众号文章转语音转换器",
"version": "1.0.0",
"settings": {
"output_dir": "output",
"default_tts_engine": "pyttsx3",
"voice_speed": 1.0,
"chunk_size": 1000,
"add_intro": true,
"add_outro": true,
"auto_open_folder": false
},
"accessibility": {
"high_contrast": false,
"screen_reader": true,
"large_font": false
},
"user": {
"name": "用户",
"remember_setting
如果你觉得这个工具好用,欢迎关注我!