邮箱客户端实现 该Python代码实现了一个2925.com邮箱客户端类,主要功能包括: 发送邮件: 支持纯文本和HTML格式邮件 使用SMTP_SSL协议通过465端口发送 包含发件人、收件人和主题设置 接收邮件: 通过IMAP4_SSL协议从993端口获取 提供两种获取方式:UID命令(更可靠)和序号获取(备用) 解析邮件主题、发件人、日期和正文内容 支持限制获取邮件数量 其他特性: 邮件正文预览功能(截取前200字符) 完善的错误处理和日志输出 支持中文字符解码 该客户端类封装了完整的
importsmtplibimportimaplibimportemailfromemail.mime.textimportMIMETextfromemail.mime.multipartimportMIMEMultipartfromemail.headerimportdecode_headerimportosclassEmailClient2925:def__init__(self,email_address,password):""" 初始化2925.com邮箱客户端 参数: email_address: 邮箱地址 password: 邮箱密码 """self.email_address=email_address self.password=password# 2925.com服务器配置self.smtp_server="smtp.2925.com"self.smtp_port=465# SSL端口 465self.imap_server="imap.2925.com"self.imap_port=993defsend_email(self,to_email,subject,body,is_html=False):""" 发送邮件 参数: to_email: 收件人邮箱 subject: 邮件主题 body: 邮件正文 is_html: 是否为HTML格式 """try:# 创建邮件msg=MIMEMultipart()msg['From']=self.email_address msg['To']=to_email msg['Subject']=subject# 添加邮件正文ifis_html:msg.attach(MIMEText(body,'html'))else:msg.attach(MIMEText(body,'plain'))# 连接SMTP服务器并发送print(f"正在连接到SMTP服务器:{self.smtp_server}:{self.smtp_port}")# 使用SSL连接server=smtplib.SMTP_SSL(self.smtp_server,self.smtp_port)server.login(self.email_address,self.password)server.send_message(msg)server.quit()print(f"邮件发送成功!收件人:{to_email}")returnTrueexceptExceptionase:print(f"发送邮件失败:{str(e)}")returnFalsedefreceive_emails(self,limit=10):""" 接收邮件 - 改进版本,使用UID命令避免SEARCH命令问题 参数: limit: 最多获取的邮件数量 """emails=[]try:print(f"正在连接到IMAP服务器:{self.imap_server}:{self.imap_port}")# 连接到IMAP服务器mail=imaplib.IMAP4_SSL(self.imap_server,self.imap_port)mail.login(self.email_address,self.password)# 选择收件箱status,select_info=mail.select('INBOX')ifstatus!='OK':print(f"选择收件箱失败:{select_info}")return[]# 获取邮件总数total_msgs=int(select_info[0].decode())print(f"收件箱中共有{total_msgs}封邮件")iftotal_msgs==0:print("收件箱为空")mail.logout()return[]# 方法1: 使用UID命令获取邮件(更可靠)print("尝试使用UID命令获取邮件...")# 先获取所有邮件的UIDstatus,uid_data=mail.uid('SEARCH',None,'ALL')ifstatus!='OK'ornotuid_data[0]:print("UID SEARCH失败,尝试方法2...")# 方法2: 直接按序号获取邮件returnself._receive_by_sequence(mail,total_msgs,limit)# 获取UID列表uids=uid_data[0].split()print(f"找到{len(uids)}封邮件的UID")# 反转列表,获取最新的邮件uids=uids[::-1]# 限制获取数量uids=uids[:limit]fori,uidinenumerate(uids):try:# 使用UID获取邮件status,msg_data=mail.uid('FETCH',uid,'(RFC822)')ifstatus=='OK'andmsg_data[0]isnotNone:# 解析邮件ifisinstance(msg_data[0],tuple):raw_email=msg_data[0][1]else:raw_email=msg_data[0]msg=email.message_from_bytes(raw_email)# 解码主题subject="无主题"ifmsg["Subject"]:subject_info=decode_header(msg["Subject"])[0]subject_text=subject_info[0]encoding=subject_info[1]ifisinstance(subject_text,bytes):subject=subject_text.decode(encodingifencodingelse'utf-8',errors='ignore')else:subject=str(subject_text)# 获取发件人from_=msg.get("From","未知发件人")# 获取日期date_=msg.get("Date","未知日期")# 获取邮件正文body=self._extract_email_body(msg)emails.append({'id':i+1,'uid':uid.decode(),'subject':subject,'from':from_,'date':date_,'body':body,'body_preview':body[:200]+"..."iflen(body)>200elsebody})print(f"邮件{i+1}:")print(f" UID:{uid.decode()}")print(f" 主题:{subject}")print(f" 发件人:{from_}")print(f" 日期:{date_}")print(f" 预览:{body[:100]}...")print("-"*50)else:print(f"获取邮件UID{uid}失败")exceptExceptionase:print(f"处理邮件UID{uid}时出错:{str(e)}")continue# 关闭连接mail.close()mail.logout()exceptimaplib.IMAP4.errorase:print(f"IMAP协议错误:{str(e)}")# 如果UID命令也失败,尝试最基本的FETCH方法returnself._try_basic_fetch()exceptExceptionase:print(f"接收邮件失败:{str(e)}")returnemailsdef_receive_by_sequence(self,mail,total_msgs,limit):"""方法2: 按序号直接获取邮件"""emails=[]# 计算起始和结束位置start=max(1,total_msgs-limit+1)forseq_numinrange(total_msgs,start-1,-1):try:status,msg_data=mail.fetch(str(seq_num).encode(),'(RFC822)')ifstatus=='OK'andmsg_data[0]isnotNone:ifisinstance(msg_data[0],tuple):raw_email=msg_data[0][1]else:raw_email=msg_data[0]msg=email.message_from_bytes(raw_email)# 解码主题subject="无主题"ifmsg["Subject"]:subject_info=decode_header(msg["Subject"])[0]subject_text=subject_info[0]encoding=subject_info[1]ifisinstance(subject_text,bytes):subject=subject_text.decode(encodingifencodingelse'utf-8',errors='ignore')else:subject=str(subject_text)# 获取邮件正文body=self._extract_email_body(msg)emails.append({'id':total_msgs-seq_num+1,'seq':seq_num,'subject':subject,'from':msg.get("From","未知发件人"),'date':msg.get("Date","未知日期"),'body_preview':body[:200]+"..."iflen(body)>200elsebody})print(f"邮件{total_msgs-seq_num+1}(序号:{seq_num}):{subject}")exceptExceptionase:print(f"处理邮件序号{seq_num}时出错:{str(e)}")continuemail.logout()returnemailsdef_extract_email_body(self,msg):"""提取邮件正文内容"""body=""ifmsg.is_multipart():forpartinmsg.walk():content_type=part.get_content_type()content_disposition=str(part.get("Content-Disposition"))# 跳过附件if"attachment"incontent_disposition:continueifcontent_typein["text/plain","text/html"]:try:body_bytes=part.get_payload(decode=True)ifbody_bytes:charset=part.get_content_charset()or'utf-8'try:body=body_bytes.decode(charset,errors='ignore')except:body=body_bytes.decode('utf-8',errors='ignore')else:body=part.get_payload()except:body=part.get_payload()# 优先使用纯文本正文ifcontent_type=="text/plain"andbody.strip():breakelse:try:body_bytes=msg.get_payload(decode=True)ifbody_bytes:charset=msg.get_content_charset()or'utf-8'try:body=body_bytes.decode(charset,errors='ignore')except:body=body_bytes.decode('utf-8',errors='ignore')else:body=msg.get_payload()except:body=msg.get_payload()returnbodydef_try_basic_fetch(self):"""方法3: 尝试最基本的连接和获取"""try:print("尝试最基本的邮件获取方法...")mail=imaplib.IMAP4_SSL(self.imap_server,self.imap_port)mail.login(self.email_address,self.password)# 直接尝试获取第一封邮件mail.select('INBOX')status,msg_data=mail.fetch('1','(RFC822)')ifstatus=='OK':print("成功获取到第一封邮件!")# 这里可以添加解析代码else:print("连最基本的FETCH也失败了")mail.logout()exceptExceptionase:print(f"基本方法也失败:{str(e)}")return[]defcheck_connection(self):"""测试服务器连接"""print("正在测试SMTP连接...")try:server=smtplib.SMTP_SSL(self.smtp_server,self.smtp_port)server.login(self.email_address,self.password)server.quit()print("SMTP连接成功!")exceptExceptionase:print(f"SMTP连接失败:{str(e)}")print("\n正在测试IMAP连接...")try:mail=imaplib.IMAP4_SSL(self.imap_server,self.imap_port)mail.login(self.email_address,self.password)# 测试是否能选择收件箱status,info=mail.select('INBOX')ifstatus=='OK':print(f"IMAP连接成功!收件箱邮件数:{info[0].decode()}")else:print(f"IMAP登录成功,但选择收件箱失败:{info}")mail.logout()exceptExceptionase:print(f"IMAP连接失败:{str(e)}")deftestSend1():list_=["第一个","第二个","第三个","第四个",]print("\n发送测试邮件...")foriinrange(200):client.send_email(to_email="laocooon@qq.com",# 替换为实际收件人subject=list_[i%len(list_)]+",测试邮件",body="这是一封来自2925.com的测试邮件。")# 使用示例if__name__=="__main__":# 您的邮箱信息EMAIL_ADDRESS="xxxxxx"EMAIL_PASSWORD="xxxxx"# 创建邮箱客户端client=EmailClient2925(EMAIL_ADDRESS,EMAIL_PASSWORD)# 测试连接client.check_connection()# 发送邮件示例# testSend1()print("\n发送测试邮件...")client.send_email(to_email="xxxx@xxx.xx",# 替换为实际收件人subject="测试邮件",body="这是一封来自xxx.com的测试邮件。")# 接收邮件示例print("\n接收最新邮件...")emails=client.receive_emails(limit=10)ifemails:print(emails)print(f"共收到{len(emails)}封邮件")else:print("没有收到邮件")