news 2026/2/7 8:42:11

MobaXterm高效运维实战:Linux运维高级技巧与自动化脚本万字详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
MobaXterm高效运维实战:Linux运维高级技巧与自动化脚本万字详解

第一章:MobaXterm核心优势与架构解析

1.1 为什么选择MobaXterm进行Linux运维?

MobaXterm作为Windows平台下最强大的远程计算工具箱,为Linux运维工程师提供了无可比拟的便利性:

核心优势对比:

  • 一体化集成:SSH、X11、RDP、VNC、FTP、SFTP等协议全支持

  • 便携性:提供便携版,无需安装即可运行

  • X服务器集成:完美支持Linux GUI应用程序

  • 多标签管理:支持多会话并行操作

  • 插件生态:丰富的扩展插件支持

1.2 MobaXterm架构解析

text

MobaXterm架构层次: ┌─────────────────────────┐ │ 图形界面层 │ │ - 标签页管理 │ │ - 会话管理器 │ │ - 工具集 │ ├─────────────────────────┤ │ 协议层 │ │ - SSH2 │ │ - X11转发 │ │ - SFTP/SCP │ │ - RDP/VNC │ ├─────────────────────────┤ │ 网络层 │ │ - 加密通信 │ │ - 端口转发 │ │ - 代理支持 │ └─────────────────────────┘

第二章:高效连接与会话管理

2.1 高级SSH配置技巧

2.1.1 SSH配置文件优化

bash

# ~/.ssh/config 高级配置示例 Host prod-* User admin IdentityFile ~/.ssh/prod_key Port 2222 ServerAliveInterval 60 ServerAliveCountMax 3 Compression yes ControlMaster auto ControlPath ~/.ssh/ssh_mux_%h_%p_%r ControlPersist 4h Host prod-web* Hostname 192.168.1.%h ProxyJump bastion-host Host * AddKeysToAgent yes UseKeychain yes TCPKeepAlive yes LogLevel VERBOSE
2.1.2 MobaXterm会话模板

创建可复用的会话模板:

xml

<!-- 会话配置示例 --> <MobaXterm> <Bookmark> <Type>0</Type> <Name>Production-Server</Name> <Host>192.168.1.100</Host> <Port>22</Port> <Username>admin</Username> <Key>C:\Users\user\.ssh\id_rsa.ppk</Key> <SSHConfig>Y29tcHJlc3Npb249eWVzCmxvZ2xldmVsPVZFUkJPU0U=</SSHConfig> <X11Forwarding>yes</X11Forwarding> <X11Display>localhost:10.0</X11Display> <StartupDir>/home/admin</StartupDir> <TerminalRows>40</TerminalRows> <TerminalCols>120</TerminalCols> <FontSize>11</FontSize> <FontName>Consolas</FontName> <Colorscheme>0</Colorscheme> <BackgroundColor>0,0,0</BackgroundColor> <TextColor>255,255,255</TextColor> <CursorColor>255,255,255</CursorColor> </Bookmark> </MobaXterm>

2.2 批量服务器管理

2.2.1 服务器分组管理

bash

# servers.list - 服务器清单 # 格式:服务器名称|IP地址|端口|用户名|密钥路径|标签 web-server-01|192.168.1.101|22|admin|~/.ssh/web_key|web,prod web-server-02|192.168.1.102|22|admin|~/.ssh/web_key|web,prod db-master|192.168.1.201|22|dba|~/.ssh/db_key|db,prod,master db-slave|192.168.1.202|22|dba|~/.ssh/db_key|db,prod,slave redis-01|192.168.1.301|22|redis|~/.ssh/redis_key|cache,prod
2.2.2 并行连接脚本

bash

#!/bin/bash # parallel_connect.sh - 并行连接多台服务器 CONFIG_FILE="servers.list" TIMEOUT=5 MAX_PARALLEL=10 # 读取服务器配置 declare -A servers while IFS='|' read -r name ip port user key tags; do [[ "$name" =~ ^#.* ]] && continue servers["$name"]="$ip|$port|$user|$key|$tags" done < "$CONFIG_FILE" # 根据标签筛选服务器 filter_by_tag() { local tag="$1" for name in "${!servers[@]}"; do IFS='|' read -r ip port user key tags <<< "${servers[$name]}" if [[ $tags == *"$tag"* ]]; then echo "$name|$ip|$port|$user|$key" fi done } # 并行执行命令 parallel_execute() { local servers_list="$1" local command="$2" echo "开始并行执行命令: $command" echo "=====================================" # 使用GNU parallel进行并行执行 echo "$servers_list" | parallel -j $MAX_PARALLEL --colsep '\|' \ --tagstring "{1}:" \ 'timeout '"$TIMEOUT"' ssh -p {3} -i {5} {4}@{2} "'"$command"'" 2>&1' } # 交互式菜单 menu() { echo "服务器分组:" echo "1) 所有服务器" echo "2) Web服务器" echo "3) 数据库服务器" echo "4) 缓存服务器" echo "5) 生产环境" echo "6) 自定义标签" read -p "选择分组: " choice case $choice in 1) selected=$(printf "%s\n" "${!servers[@]}") ;; 2) selected=$(filter_by_tag "web") ;; 3) selected=$(filter_by_tag "db") ;; 4) selected=$(filter_by_tag "cache") ;; 5) selected=$(filter_by_tag "prod") ;; 6) read -p "输入标签: " tag && selected=$(filter_by_tag "$tag") ;; *) echo "无效选择" && exit 1 ;; esac read -p "输入要执行的命令: " cmd parallel_execute "$selected" "$cmd" } menu

第三章:高级文件传输与同步

3.1 SFTP高级用法

3.1.1 SFTP批量操作

bash

#!/bin/bash # sftp_batch_operations.sh BATCH_FILE="sftp_commands.txt" LOG_FILE="sftp_transfer_$(date +%Y%m%d_%H%M%S).log" REMOTE_HOST="server.example.com" REMOTE_USER="admin" SSH_KEY="~/.ssh/id_rsa" # 创建SFTP批处理文件 cat > "$BATCH_FILE" << 'EOF' # 批量SFTP命令 cd /var/log lcd ~/backups/logs get -r apache2/ get -r nginx/ cd /etc lcd ~/backups/configs get -r nginx/conf.d/ get mysql/my.cnf get -r ssh/ cd /home/admin/scripts lcd ~/backups/scripts get *.sh get *.py # 上传文件 cd /tmp lcd ~/uploads put deploy_package.tar.gz put config_update.json # 目录同步 mirror -R ~/local/webroot /var/www/html EOF # 执行SFTP批处理 sftp -b "$BATCH_FILE" -i "$SSH_KEY" "${REMOTE_USER}@${REMOTE_HOST}" 2>&1 | tee "$LOG_FILE" # 检查执行结果 if grep -q "Transfer complete" "$LOG_FILE"; then echo "SFTP批量操作完成" # 发送通知 echo "文件传输完成于 $(date)" | mail -s "SFTP批量传输完成" admin@example.com else echo "SFTP批量操作失败" # 发送错误通知 grep -E "error|failed|denied" "$LOG_FILE" | mail -s "SFTP批量传输错误" admin@example.com fi
3.1.2 实时文件同步监控

bash

#!/bin/bash # realtime_sync_monitor.sh REMOTE_HOST="backup-server" REMOTE_PATH="/backup/files" LOCAL_PATH="/data/files" SYNC_INTERVAL=30 LOG_FILE="/var/log/realtime_sync.log" # 安装inotify-tools(如果未安装) install_inotify() { if ! command -v inotifywait &> /dev/null; then echo "安装inotify-tools..." apt-get update && apt-get install -y inotify-tools fi } # 同步函数 sync_files() { local event="$1" local file="$2" echo "[$(date '+%Y-%m-%d %H:%M:%S')] $event: $file" >> "$LOG_FILE" case $event in MODIFY|CREATE|CLOSE_WRITE) rsync -avz -e "ssh -i ~/.ssh/sync_key" \ "$LOCAL_PATH/$file" \ "$REMOTE_HOST:$REMOTE_PATH/$file" ;; DELETE|MOVED_FROM) ssh -i ~/.ssh/sync_key "$REMOTE_HOST" \ "rm -f '$REMOTE_PATH/$file'" ;; MOVED_TO) rsync -avz -e "ssh -i ~/.ssh/sync_key" \ "$LOCAL_PATH/$file" \ "$REMOTE_HOST:$REMOTE_PATH/$file" ;; esac } # 主监控循环 main_monitor() { install_inotify echo "开始实时文件同步监控..." | tee -a "$LOG_FILE" echo "本地目录: $LOCAL_PATH" | tee -a "$LOG_FILE" echo "远程目录: $REMOTE_HOST:$REMOTE_PATH" | tee -a "$LOG_FILE" # 初始同步 rsync -avz -e "ssh -i ~/.ssh/sync_key" \ "$LOCAL_PATH/" \ "$REMOTE_HOST:$REMOTE_PATH/" # 监控文件系统事件 inotifywait -m -r -e modify,create,delete,move \ --format '%e %w%f' "$LOCAL_PATH" | while read -r event file do # 获取相对路径 relative_path="${file#$LOCAL_PATH/}" # 如果是目录事件,跳过 if [ -d "$file" ]; then continue fi # 同步文件 sync_files "${event%%,*}" "$relative_path" done } # 启动监控 main_monitor

3.2 高级rsync自动化

bash

#!/bin/bash # advanced_rsync_backup.sh # 配置参数 CONFIG_FILE="$(dirname "$0")/rsync_backup.conf" LOCK_FILE="/tmp/rsync_backup.lock" LOG_DIR="/var/log/backup" RETENTION_DAYS=30 # 加载配置 source "$CONFIG_FILE" 2>/dev/null || { cat > "$CONFIG_FILE" << 'EOF' # Rsync备份配置 SOURCE_DIRS=( "/etc" "/home" "/var/www" "/var/log" "/opt/applications" ) EXCLUDE_PATTERNS=( "*.tmp" "*.log" "*.cache" "*.swp" ".git/" "node_modules/" "__pycache__/" ) REMOTE_BACKUP_HOST="backup-server" REMOTE_BACKUP_USER="backup" REMOTE_BACKUP_PATH="/backup/$(hostname)" SSH_KEY="/root/.ssh/backup_key" BANDWIDTH_LIMIT="5000" # KB/s COMPRESSION_LEVEL=6 EOF echo "配置文件已创建,请编辑: $CONFIG_FILE" exit 1 } # 防止重复执行 exec 200>"$LOCK_FILE" flock -n 200 || { echo "另一个备份进程正在运行" exit 1 } # 创建排除文件 create_exclude_file() { local exclude_file="/tmp/rsync_exclude_$(date +%s).txt" for pattern in "${EXCLUDE_PATTERNS[@]}"; do echo "$pattern" >> "$exclude_file" done # 添加系统特定排除 echo "/proc/*" >> "$exclude_file" echo "/sys/*" >> "$exclude_file" echo "/dev/*" >> "$exclude_file" echo "/tmp/*" >> "$exclude_file" echo "$exclude_file" } # 执行备份 perform_backup() { local timestamp=$(date '+%Y%m%d_%H%M%S') local log_file="$LOG_DIR/backup_$timestamp.log" local exclude_file=$(create_exclude_file) echo "开始备份: $(date)" | tee "$log_file" echo "=====================================" | tee -a "$log_file" # 检查远程目录 if ! ssh -i "$SSH_KEY" "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}" \ "test -d '$REMOTE_BACKUP_PATH'" 2>/dev/null; then ssh -i "$SSH_KEY" "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}" \ "mkdir -p '$REMOTE_BACKUP_PATH'" fi # 创建快照目录 local snapshot_dir="$REMOTE_BACKUP_PATH/snapshot_$timestamp" ssh -i "$SSH_KEY" "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}" \ "mkdir -p '$snapshot_dir'" # 备份每个目录 for source_dir in "${SOURCE_DIRS[@]}"; do if [ ! -d "$source_dir" ]; then echo "警告: 目录不存在 - $source_dir" | tee -a "$log_file" continue fi echo "备份: $source_dir" | tee -a "$log_file" # 使用rsync进行增量备份 rsync -avzh \ --progress \ --delete \ --bwlimit="$BANDWIDTH_LIMIT" \ --compress-level="$COMPRESSION_LEVEL" \ --link-dest="../latest" \ --exclude-from="$exclude_file" \ --log-file="$log_file" \ -e "ssh -i '$SSH_KEY'" \ "$source_dir/" \ "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}:$snapshot_dir${source_dir}" local rsync_status=$? if [ $rsync_status -eq 0 ]; then echo "成功: $source_dir" | tee -a "$log_file" else echo "失败: $source_dir (代码: $rsync_status)" | tee -a "$log_file" fi done # 更新latest符号链接 ssh -i "$SSH_KEY" "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}" \ "cd '$REMOTE_BACKUP_PATH' && rm -f latest && ln -s snapshot_$timestamp latest" # 清理旧备份 cleanup_old_backups # 清理临时文件 rm -f "$exclude_file" echo "备份完成: $(date)" | tee -a "$log_file" echo "日志文件: $log_file" | tee -a "$log_file" # 发送摘要 send_summary "$log_file" } # 清理旧备份 cleanup_old_backups() { echo "清理超过${RETENTION_DAYS}天的旧备份..." | tee -a "$log_file" ssh -i "$SSH_KEY" "${REMOTE_BACKUP_USER}@${REMOTE_BACKUP_HOST}" \ "find '$REMOTE_BACKUP_PATH' -name 'snapshot_*' -type d -mtime +$RETENTION_DAYS | while read dir; do echo \"删除: \$dir\"; rm -rf \"\$dir\"; done" } # 发送摘要 send_summary() { local log_file="$1" local summary_file="/tmp/backup_summary_$(date +%s).txt" # 生成摘要 { echo "备份摘要 - $(date)" echo "主机: $(hostname)" echo "时间: $(date)" echo "目标: $REMOTE_BACKUP_HOST:$REMOTE_BACKUP_PATH" echo "" echo "备份目录:" for source_dir in "${SOURCE_DIRS[@]}"; do echo " - $source_dir" done echo "" echo "排除模式:" for pattern in "${EXCLUDE_PATTERNS[@]}"; do echo " - $pattern" done echo "" echo "日志摘要:" grep -E "^(成功|失败|警告|备份:|总计)" "$log_file" | tail -20 } > "$summary_file" # 发送邮件(如果配置了邮件) if command -v mail &> /dev/null; then mail -s "备份完成 - $(hostname) - $(date)" \ admin@example.com < "$summary_file" fi rm -f "$summary_file" } # 主函数 main() { # 创建日志目录 mkdir -p "$LOG_DIR" # 检查网络连接 if ! ping -c 1 -W 2 "$REMOTE_BACKUP_HOST" &> /dev/null; then echo "错误: 无法连接到备份服务器" exit 1 fi # 检查SSH密钥 if [ ! -f "$SSH_KEY" ]; then echo "错误: SSH密钥不存在 - $SSH_KEY" exit 1 fi # 执行备份 perform_backup } main

第四章:网络诊断与监控

4.1 高级网络诊断脚本

bash

#!/bin/bash # network_diagnostics_suite.sh LOG_FILE="/tmp/network_diagnostics_$(date +%Y%m%d_%H%M%S).log" REPORT_FILE="/tmp/network_report_$(date +%Y%m%d_%H%M%S).html" # 颜色定义 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # No Color # 日志函数 log() { local level="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S') case $level in INFO) echo -e "${GREEN}[INFO]${NC} $message" ;; WARN) echo -e "${YELLOW}[WARN]${NC} $message" ;; ERROR) echo -e "${RED}[ERROR]${NC} $message" ;; *) echo -e "${BLUE}[$level]${NC} $message" ;; esac echo "[$timestamp] [$level] $message" >> "$LOG_FILE" } # 检查网络连接 check_connectivity() { log INFO "检查网络连接性..." # 测试网关 local gateway=$(ip route | grep default | awk '{print $3}') if [ -n "$gateway" ]; then ping -c 3 -W 1 "$gateway" &> /dev/null if [ $? -eq 0 ]; then log INFO "网关 $gateway 可达" else log ERROR "网关 $gateway 不可达" fi fi # 测试DNS if dig +short google.com &> /dev/null; then log INFO "DNS解析正常" else log ERROR "DNS解析失败" fi # 测试外部连接 local test_urls=("8.8.8.8" "cloudflare.com" "baidu.com") for url in "${test_urls[@]}"; do if ping -c 2 -W 1 "$url" &> /dev/null; then log INFO "$url 可达" else log WARN "$url 不可达" fi done } # 检查端口和服务 check_services() { log INFO "检查关键服务端口..." local services=( "SSH:22" "HTTP:80" "HTTPS:443" "MySQL:3306" "Redis:6379" "Nginx:80" "PostgreSQL:5432" ) for service in "${services[@]}"; do local name="${service%:*}" local port="${service#*:}" if ss -tln | grep -q ":$port "; then log INFO "$name ($port) 正在监听" else log WARN "$name ($port) 未监听" fi done } # 网络性能测试 network_performance() { log INFO "网络性能测试..." # 带宽测试(使用iperf3,如果可用) if command -v iperf3 &> /dev/null; then log INFO "执行带宽测试..." # 这里可以添加iperf3测试代码 fi # 延迟测试 log INFO "延迟测试..." ping -c 10 8.8.8.8 | tail -2 # MTU测试 log INFO "MTU测试..." ping -M do -s 1472 -c 2 8.8.8.8 &> /dev/null if [ $? -eq 0 ]; then log INFO "MTU 1500字节正常" else log WARN "MTU可能有问题" fi } # 路由追踪 trace_route() { log INFO "路由追踪..." local targets=("8.8.8.8" "cloudflare.com" "本地网关") for target in "${targets[@]}"; do log INFO "追踪到 $target 的路由:" traceroute -n -m 15 -w 1 "$target" 2>/dev/null | head -20 echo "" done } # 生成HTML报告 generate_html_report() { log INFO "生成HTML报告..." cat > "$REPORT_FILE" << EOF <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>网络诊断报告 - $(hostname) - $(date)</title> <style> body { font-family: Arial, sans-serif; margin: 40px; } .container { max-width: 1200px; margin: auto; } .header { background: #333; color: white; padding: 20px; } .section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; } .success { color: green; } .warning { color: orange; } .error { color: red; } pre { background: #f5f5f5; padding: 10px; overflow: auto; } </style> </head> <body> <div class="container"> <div class="header"> <h1>网络诊断报告</h1> <p>主机: $(hostname) | 时间: $(date)</p> </div> <div class="section"> <h2>系统信息</h2> <pre>$(uname -a)</pre> <pre>$(uptime)</pre> </div> <div class="section"> <h2>网络接口</h2> <pre>$(ip addr show)</pre> </div> <div class="section"> <h2>路由表</h2> <pre>$(ip route show)</pre> </div> <div class="section"> <h2>连接检查</h2> <pre>$(grep -E "(可达|失败|正常)" "$LOG_FILE")</pre> </div> <div class="section"> <h2>服务状态</h2> <pre>$(grep "正在监听\|未监听" "$LOG_FILE")</pre> </div> <div class="section"> <h2>诊断日志</h2> <pre>$(tail -50 "$LOG_FILE")</pre> </div> </div> </body> </html> EOF log INFO "HTML报告已生成: $REPORT_FILE" } # 主函数 main() { echo "开始网络诊断..." echo "日志文件: $LOG_FILE" echo "" # 执行所有检查 check_connectivity echo "" check_services echo "" network_performance echo "" trace_route echo "" # 生成报告 generate_html_report echo "诊断完成" echo "查看报告: $REPORT_FILE" echo "查看日志: $LOG_FILE" } main

4.2 实时网络监控面板

python

#!/usr/bin/env python3 """ network_monitor_dashboard.py 实时网络监控面板 """ import psutil import socket import time import json import threading import subprocess from datetime import datetime from collections import deque import curses class NetworkMonitor: def __init__(self, update_interval=1): self.update_interval = update_interval self.network_stats = { 'interfaces': {}, 'connections': [], 'bandwidth': {}, 'latency': {} } self.history = deque(maxlen=100) def get_interface_stats(self): """获取网络接口统计信息""" stats = psutil.net_if_stats() io_counters = psutil.net_io_counters(pernic=True) for interface, info in stats.items(): self.network_stats['interfaces'][interface] = { 'isup': info.isup, 'duplex': info.duplex, 'speed': info.speed, 'mtu': info.mtu, 'bytes_sent': io_counters.get(interface, {}).bytes_sent, 'bytes_recv': io_counters.get(interface, {}).bytes_recv, 'packets_sent': io_counters.get(interface, {}).packets_sent, 'packets_recv': io_counters.get(interface, {}).packets_recv } def get_connections(self): """获取网络连接""" connections = [] for conn in psutil.net_connections(kind='inet'): try: connections.append({ 'fd': conn.fd, 'family': str(conn.family), 'type': str(conn.type), 'laddr': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else None, 'raddr': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else None, 'status': conn.status, 'pid': conn.pid }) except: continue self.network_stats['connections'] = connections def measure_latency(self, target="8.8.8.8"): """测量网络延迟""" try: start = time.time() socket.create_connection((target, 53), timeout=2) latency = (time.time() - start) * 1000 # 转换为毫秒 if target not in self.network_stats['latency']: self.network_stats['latency'][target] = deque(maxlen=10) self.network_stats['latency'][target].append(latency) except Exception as e: self.network_stats['latency'][target] = None def calculate_bandwidth(self): """计算带宽使用率""" for interface, stats in self.network_stats['interfaces'].items(): if interface not in self.network_stats['bandwidth']: self.network_stats['bandwidth'][interface] = { 'upload_speed': 0, 'download_speed': 0, 'history': deque(maxlen=60) } # 这里需要保存上一次的值来计算速度 # 简化实现,实际需要保存历史数据 def update(self): """更新所有统计数据""" self.get_interface_stats() self.get_connections() self.measure_latency() self.calculate_bandwidth() # 保存历史记录 self.history.append({ 'timestamp': datetime.now().isoformat(), 'stats': self.network_stats.copy() }) def run_monitor(self, stdscr): """运行监控界面""" curses.curs_set(0) stdscr.nodelay(1) while True: stdscr.clear() # 更新数据 self.update() # 显示标题 height, width = stdscr.getmaxyx() title = f"网络监控面板 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" stdscr.addstr(0, (width - len(title)) // 2, title, curses.A_BOLD) # 显示接口信息 row = 2 stdscr.addstr(row, 2, "网络接口:", curses.A_BOLD) row += 1 for interface, stats in self.network_stats['interfaces'].items(): if row >= height - 5: break status = "▲" if stats['isup'] else "▼" color = curses.color_pair(2) if stats['isup'] else curses.color_pair(1) info = f" {status} {interface}: " info += f"{stats['speed']}Mb/s | " info += f"↑{self.format_bytes(stats['bytes_sent'])} " info += f"↓{self.format_bytes(stats['bytes_recv'])}" stdscr.addstr(row, 2, info, color) row += 1 # 显示连接信息 row += 1 stdscr.addstr(row, 2, "活动连接:", curses.A_BOLD) row += 1 for conn in self.network_stats['connections'][:10]: # 只显示前10个 if row >= height - 2: break conn_str = f" {conn['laddr'] or 'N/A'} -> {conn['raddr'] or 'N/A'}" conn_str += f" [{conn['status']}]" if len(conn_str) > width - 4: conn_str = conn_str[:width-7] + "..." stdscr.addstr(row, 2, conn_str) row += 1 # 显示延迟 row += 1 stdscr.addstr(row, 2, "网络延迟:", curses.A_BOLD) row += 1 for target, latencies in self.network_stats['latency'].items(): if latencies: avg_latency = sum(latencies) / len(latencies) color = curses.color_pair(2) if avg_latency < 50 else \ curses.color_pair(3) if avg_latency < 100 else \ curses.color_pair(1) latency_str = f" {target}: {avg_latency:.1f}ms" stdscr.addstr(row, 2, latency_str, color) row += 1 # 刷新屏幕 stdscr.refresh() # 检查按键 key = stdscr.getch() if key == ord('q'): break time.sleep(self.update_interval) @staticmethod def format_bytes(bytes_size): """格式化字节大小""" for unit in ['B', 'KB', 'MB', 'GB']: if bytes_size < 1024.0: return f"{bytes_size:.1f}{unit}" bytes_size /= 1024.0 return f"{bytes_size:.1f}TB" def main(): # 初始化curses stdscr = curses.initscr() curses.start_color() curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) # 错误/关闭 curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # 正常/开启 curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # 警告 monitor = NetworkMonitor() try: monitor.run_monitor(stdscr) except KeyboardInterrupt: pass finally: curses.endwin() print("监控已停止") if __name__ == "__main__": main()

第五章:自动化运维脚本案例

5.1 服务器健康检查自动化

bash

#!/bin/bash # server_health_check.sh CONFIG_FILE="/etc/health_check.conf" REPORT_DIR="/var/log/health_check" ALERT_THRESHOLD=85 CHECK_INTERVAL=300 # 5分钟 # 加载配置 load_config() { if [ -f "$CONFIG_FILE" ]; then source "$CONFIG_FILE" else # 默认配置 CHECK_CPU=true CHECK_MEMORY=true CHECK_DISK=true CHECK_SERVICES=true CHECK_LOGS=true ALERT_EMAIL="admin@example.com" WARNING_THRESHOLD=70 CRITICAL_THRESHOLD=90 fi } # 检查CPU使用率 check_cpu() { local cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1) local load_avg=$(awk '{print $1,$2,$3}' /proc/loadavg) local cpu_cores=$(nproc) echo "CPU使用率: ${cpu_usage}%" echo "负载平均: $load_avg" echo "CPU核心数: $cpu_cores" if (( $(echo "$cpu_usage > $CRITICAL_THRESHOLD" | bc -l) )); then echo "状态: CRITICAL" send_alert "CPU" "$cpu_usage%" return 2 elif (( $(echo "$cpu_usage > $WARNING_THRESHOLD" | bc -l) )); then echo "状态: WARNING" return 1 else echo "状态: OK" return 0 fi } # 检查内存使用 check_memory() { local mem_info=$(free -m | awk 'NR==2{printf "%.2f", $3*100/$2}') local swap_info=$(free -m | awk 'NR==3{printf "%.2f", $3*100/$2}') local total_mem=$(free -m | awk 'NR==2{print $2}') local used_mem=$(free -m | awk 'NR==2{print $3}') echo "内存使用率: ${mem_info}%" echo "交换分区: ${swap_info}%" echo "总内存: ${total_mem}MB" echo "已使用: ${used_mem}MB" if (( $(echo "$mem_info > $CRITICAL_THRESHOLD" | bc -l) )); then echo "状态: CRITICAL" send_alert "内存" "$mem_info%" return 2 elif (( $(echo "$mem_info > $WARNING_THRESHOLD" | bc -l) )); then echo "状态: WARNING" return 1 else echo "状态: OK" return 0 fi } # 检查磁盘使用 check_disk() { echo "磁盘使用情况:" local disk_status=0 df -h | grep -E '^/dev/' | while read line; do local filesystem=$(echo $line | awk '{print $1}') local size=$(echo $line | awk '{print $2}') local used=$(echo $line | awk '{print $3}') local avail=$(echo $line | awk '{print $4}') local use_percent=$(echo $line | awk '{print $5}' | sed 's/%//') local mount=$(echo $line | awk '{print $6}') echo " $filesystem ($mount): $used/$size ($use_percent%)" if (( $(echo "$use_percent > $CRITICAL_THRESHOLD" | bc -l) )); then echo " 状态: CRITICAL" send_alert "磁盘" "$mount: $use_percent%" disk_status=2 elif (( $(echo "$use_percent > $WARNING_THRESHOLD" | bc -l) )); then echo " 状态: WARNING" [ $disk_status -eq 0 ] && disk_status=1 fi done # 检查inode使用 echo "" echo "Inode使用情况:" df -i | grep -E '^/dev/' | while read line; do local filesystem=$(echo $line | awk '{print $1}') local inode_use=$(echo $line | awk '{print $5}' | sed 's/%//') local mount=$(echo $line | awk '{print $6}') echo " $filesystem ($mount): $inode_use%" if (( $(echo "$inode_use > $CRITICAL_THRESHOLD" | bc -l) )); then echo " 状态: CRITICAL" send_alert "Inode" "$mount: $inode_use%" disk_status=2 fi done return $disk_status } # 检查服务状态 check_services() { echo "服务状态检查:" local services=( "sshd" "nginx" "mysql" "postgresql" "redis" "docker" ) local service_status=0 for service in "${services[@]}"; do if systemctl is-active --quiet "$service"; then echo " ✓ $service: 运行中" else echo " ✗ $service: 未运行" send_alert "服务" "$service 未运行" service_status=2 fi done # 检查端口监听 echo "" echo "端口监听检查:" local ports=("22" "80" "443" "3306" "5432" "6379") for port in "${ports[@]}"; do if ss -tln | grep -q ":$port "; then echo " ✓ 端口 $port: 监听中" else echo " ✗ 端口 $port: 未监听" service_status=2 fi done return $service_status } # 检查日志文件 check_logs() { echo "日志检查:" local log_status=0 # 检查错误日志 local error_patterns=("error" "failed" "critical" "emergency" "panic") local log_files=("/var/log/syslog" "/var/log/messages" "/var/log/nginx/error.log") for log_file in "${log_files[@]}"; do if [ -f "$log_file" ]; then echo "检查 $log_file:" for pattern in "${error_patterns[@]}"; do local error_count=$(grep -i "$pattern" "$log_file" | tail -5 | wc -l) if [ $error_count -gt 0 ]; then echo " ⚠ 发现 $pattern 错误: $error_count 条" grep -i "$pattern" "$log_file" | tail -3 | while read error_line; do echo " - $error_line" done log_status=1 fi done fi done return $log_status } # 发送告警 send_alert() { local component="$1" local value="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S') local subject="服务器告警: $component 异常 - $(hostname)" local message="时间: $timestamp\n主机: $(hostname)\n组件: $component\n值: $value\n状态: CRITICAL" if [ -n "$ALERT_EMAIL" ]; then echo -e "$message" | mail -s "$subject" "$ALERT_EMAIL" fi # 记录到日志 echo "[$timestamp] ALERT: $component - $value" >> "$REPORT_DIR/alert.log" } # 生成报告 generate_report() { local timestamp=$(date '+%Y%m%d_%H%M%S') local report_file="$REPORT_DIR/health_report_$timestamp.txt" { echo "服务器健康检查报告" echo "====================" echo "主机名: $(hostname)" echo "检查时间: $(date)" echo "" echo "1. 系统概览" echo "-----------" echo "系统: $(uname -a)" echo "运行时间: $(uptime -p)" echo "当前用户: $(whoami)" echo "" echo "2. CPU检查" echo "----------" check_cpu local cpu_result=$? echo "" echo "3. 内存检查" echo "-----------" check_memory local mem_result=$? echo "" echo "4. 磁盘检查" echo "-----------" check_disk local disk_result=$? echo "" echo "5. 服务检查" echo "-----------" check_services local service_result=$? echo "" echo "6. 日志检查" echo "-----------" check_logs local log_result=$? echo "" echo "7. 总结" echo "------" echo "CPU状态: $( [ $cpu_result -eq 0 ] && echo "正常" || ([ $cpu_result -eq 1 ] && echo "警告" || echo "严重") )" echo "内存状态: $( [ $mem_result -eq 0 ] && echo "正常" || ([ $mem_result -eq 1 ] && echo "警告" || echo "严重") )" echo "磁盘状态: $( [ $disk_result -eq 0 ] && echo "正常" || ([ $disk_result -eq 1 ] && echo "警告" || echo "严重") )" echo "服务状态: $( [ $service_result -eq 0 ] && echo "正常" || echo "异常")" echo "日志状态: $( [ $log_result -eq 0 ] && echo "正常" || echo "警告")" if [ $cpu_result -eq 2 ] || [ $mem_result -eq 2 ] || [ $disk_result -eq 2 ] || [ $service_result -eq 2 ]; then echo "总体状态: ❌ 异常" elif [ $cpu_result -eq 1 ] || [ $mem_result -eq 1 ] || [ $disk_result -eq 1 ] || [ $log_result -eq 1 ]; then echo "总体状态: ⚠ 警告" else echo "总体状态: ✓ 正常" fi } > "$report_file" echo "报告已生成: $report_file" } # 主函数 main() { load_config # 创建报告目录 mkdir -p "$REPORT_DIR" # 单次检查模式 if [ "$1" = "--once" ]; then generate_report exit 0 fi # 守护进程模式 echo "启动健康检查守护进程..." echo "检查间隔: $CHECK_INTERVAL 秒" echo "报告目录: $REPORT_DIR" echo "按 Ctrl+C 停止" while true; do generate_report sleep $CHECK_INTERVAL done } # 处理信号 trap 'echo "停止健康检查"; exit 0' INT TERM main "$@"

5.2 自动化部署脚本

bash

#!/bin/bash # automated_deployment.sh DEPLOY_ENV="${1:-staging}" CONFIG_DIR="./configs" BACKUP_DIR="./backups" LOG_DIR="./logs" TIMESTAMP=$(date +%Y%m%d_%H%M%S) LOCK_FILE="/tmp/deploy_${DEPLOY_ENV}.lock" # 颜色输出 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # 加载环境配置 load_environment() { local env_file="${CONFIG_DIR}/${DEPLOY_ENV}.env" if [ ! -f "$env_file" ]; then echo -e "${RED}错误: 环境配置文件不存在: $env_file${NC}" exit 1 fi source "$env_file" # 验证必要变量 local required_vars=("DEPLOY_HOSTS" "DEPLOY_USER" "DEPLOY_PATH" "APP_NAME") for var in "${required_vars[@]}"; do if [ -z "${!var}" ]; then echo -e "${RED}错误: 必须配置变量: $var${NC}" exit 1 fi done } # 初始化 initialize() { mkdir -p "$BACKUP_DIR" "$LOG_DIR" # 创建锁文件,防止重复部署 exec 200>"$LOCK_FILE" if ! flock -n 200; then echo -e "${RED}错误: 另一个部署进程正在运行${NC}" exit 1 fi echo $$ > "$LOCK_FILE" } # 记录日志 log() { local level="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S') local log_file="${LOG_DIR}/deploy_${DEPLOY_ENV}_${TIMESTAMP}.log" case $level in INFO) echo -e "${GREEN}[INFO]${NC} $message" ;; WARN) echo -e "${YELLOW}[WARN]${NC} $message" ;; ERROR) echo -e "${RED}[ERROR]${NC} $message" ;; *) echo -e "${BLUE}[$level]${NC} $message" ;; esac echo "[$timestamp] [$level] $message" >> "$log_file" } # 前置检查 pre_deployment_check() { log INFO "执行前置检查..." # 检查git状态 if [ -d ".git" ]; then local git_status=$(git status --porcelain) if [ -n "$git_status" ]; then log WARN "工作区有未提交的更改" fi local current_branch=$(git branch --show-current) if [ "$DEPLOY_ENV" = "production" ] && [ "$current_branch" != "main" ] && [ "$current_branch" != "master" ]; then log ERROR "生产部署必须在main/master分支" return 1 fi fi # 检查依赖 if [ -f "package.json" ]; then if ! command -v npm &> /dev/null; then log ERROR "需要npm但未安装" return 1 fi fi if [ -f "requirements.txt" ]; then if ! command -v pip &> /dev/null; then log ERROR "需要pip但未安装" return 1 fi fi # 检查服务器连接 local first_host=$(echo "$DEPLOY_HOSTS" | awk '{print $1}') if ! ssh "${DEPLOY_USER}@${first_host}" "exit" 2>/dev/null; then log ERROR "无法连接到服务器: $first_host" return 1 fi log INFO "前置检查通过" return 0 } # 构建应用 build_application() { log INFO "开始构建应用..." local build_script="./scripts/build.sh" if [ -f "$build_script" ]; then log INFO "执行构建脚本: $build_script" if ! bash "$build_script" "$DEPLOY_ENV"; then log ERROR "构建失败" return 1 fi else log INFO "未找到构建脚本,跳过构建步骤" fi # 创建部署包 log INFO "创建部署包..." local package_name="${APP_NAME}_${DEPLOY_ENV}_${TIMESTAMP}.tar.gz" tar -czf "${BACKUP_DIR}/${package_name}" \ --exclude=".git" \ --exclude="node_modules" \ --exclude="__pycache__" \ --exclude="*.log" \ --exclude="*.tmp" \ . if [ $? -eq 0 ]; then log INFO "部署包创建成功: ${package_name}" echo "$package_name" > "${BACKUP_DIR}/latest_package.txt" return 0 else log ERROR "部署包创建失败" return 1 fi } # 部署到单个服务器 deploy_to_host() { local host="$1" local package_name="$2" log INFO "部署到服务器: $host" # 创建远程目录 ssh "${DEPLOY_USER}@${host}" " mkdir -p '${DEPLOY_PATH}/releases' mkdir -p '${DEPLOY_PATH}/shared/logs' mkdir -p '${DEPLOY_PATH}/shared/tmp' " # 上传部署包 log INFO "上传部署包..." scp "${BACKUP_DIR}/${package_name}" "${DEPLOY_USER}@${host}:${DEPLOY_PATH}/releases/" # 解压部署包 log INFO "解压部署包..." ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}/releases' && \ tar -xzf '${package_name}' && \ mv '${package_name%.tar.gz}' '${TIMESTAMP}' && \ rm -f '${package_name}' " # 执行部署脚本 log INFO "执行部署脚本..." ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}/releases/${TIMESTAMP}' && \ # 创建符号链接 ln -sfn '${DEPLOY_PATH}/shared/.env' .env && \ ln -sfn '${DEPLOY_PATH}/shared/logs' logs && \ ln -sfn '${DEPLOY_PATH}/shared/tmp' tmp && \ # 安装依赖 if [ -f 'package.json' ]; then npm install --production fi if [ -f 'requirements.txt' ]; then pip install -r requirements.txt fi # 执行数据库迁移 if [ -f 'manage.py' ]; then python manage.py migrate --no-input fi # 收集静态文件 if [ -f 'manage.py' ]; then python manage.py collectstatic --no-input fi " # 切换当前版本 log INFO "切换当前版本..." ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}' && \ ln -sfn 'releases/${TIMESTAMP}' current " # 重启服务 log INFO "重启应用服务..." ssh "${DEPLOY_USER}@${host}" " # 重启应用 if systemctl list-units --full -all | grep -Fq '${APP_NAME}'; then sudo systemctl restart '${APP_NAME}' fi # 重启web服务器 if systemctl is-active --quiet nginx; then sudo systemctl reload nginx fi " # 清理旧版本 log INFO "清理旧版本..." ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}/releases' && \ ls -t | tail -n +6 | xargs -I {} rm -rf {} " # 健康检查 log INFO "执行健康检查..." if ! check_application_health "$host"; then log ERROR "健康检查失败: $host" # 回滚 log WARN "执行回滚..." rollback_deployment "$host" return 1 fi log INFO "部署成功: $host" return 0 } # 检查应用健康状态 check_application_health() { local host="$1" local max_attempts=10 local attempt=1 log INFO "检查应用健康状态..." while [ $attempt -le $max_attempts ]; do if ssh "${DEPLOY_USER}@${host}" " curl -f -s -o /dev/null -w '%{http_code}' \ --max-time 5 \ http://localhost:${APP_PORT:-80}/health " 2>/dev/null | grep -q "200"; then log INFO "应用健康检查通过" return 0 fi log INFO "健康检查尝试 $attempt/$max_attempts 失败,等待重试..." sleep 5 ((attempt++)) done return 1 } # 回滚部署 rollback_deployment() { local host="$1" log WARN "开始回滚部署: $host" # 获取上一个版本 local previous_release=$(ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}/releases' && \ ls -t | head -2 | tail -1 ") if [ -n "$previous_release" ]; then log INFO "回滚到版本: $previous_release" ssh "${DEPLOY_USER}@${host}" " cd '${DEPLOY_PATH}' && \ ln -sfn 'releases/${previous_release}' current && \ # 重启服务 if systemctl list-units --full -all | grep -Fq '${APP_NAME}'; then sudo systemctl restart '${APP_NAME}' fi " log INFO "回滚完成" else log ERROR "没有可用的回滚版本" fi } # 主部署流程 main_deployment() { log INFO "开始部署流程..." log INFO "环境: $DEPLOY_ENV" log INFO "应用: $APP_NAME" log INFO "目标主机: $DEPLOY_HOSTS" # 执行前置检查 if ! pre_deployment_check; then log ERROR "前置检查失败,停止部署" return 1 fi # 构建应用 if ! build_application; then log ERROR "构建失败,停止部署" return 1 fi local package_name=$(cat "${BACKUP_DIR}/latest_package.txt") local failed_hosts="" # 部署到所有主机 for host in $DEPLOY_HOSTS; do if ! deploy_to_host "$host" "$package_name"; then log ERROR "部署失败: $host" failed_hosts="$failed_hosts $host" fi done # 汇总结果 if [ -n "$failed_hosts" ]; then log ERROR "以下主机部署失败: $failed_hosts" # 发送通知 send_notification "failure" "部署失败: $failed_hosts" # 尝试修复 attempt_recovery "$failed_hosts" return 1 else log INFO "所有主机部署成功!" # 发送成功通知 send_notification "success" "部署成功完成" # 清理本地备份 cleanup_local_backups return 0 fi } # 发送通知 send_notification() { local status="$1" local message="$2" # 邮件通知 if [ -n "$NOTIFY_EMAIL" ]; then local subject="部署通知: $APP_NAME - $DEPLOY_ENV - $status" local body="应用: $APP_NAME\n环境: $DEPLOY_ENV\n状态: $status\n消息: $message\n时间: $(date)" echo -e "$body" | mail -s "$subject" "$NOTIFY_EMAIL" fi # Slack通知 if [ -n "$SLACK_WEBHOOK" ]; then local slack_message="{\"text\":\"*部署通知*\n应用: $APP_NAME\n环境: $DEPLOY_ENV\n状态: $status\n消息: $message\"}" curl -X POST -H 'Content-type: application/json' \ --data "$slack_message" \ "$SLACK_WEBHOOK" 2>/dev/null || true fi } # 清理本地备份 cleanup_local_backups() { log INFO "清理本地旧备份..." # 保留最近10个备份 cd "$BACKUP_DIR" && \ ls -t *.tar.gz | tail -n +11 | xargs -I {} rm -f {} } # 主函数 main() { # 初始化 initialize # 加载环境配置 load_environment # 执行部署 if main_deployment; then echo -e "${GREEN}部署成功完成!${NC}" exit 0 else echo -e "${RED}部署失败!${NC}" exit 1 fi } # 清理函数 cleanup() { rm -f "$LOCK_FILE" } trap cleanup EXIT # 启动部署 main "$@"

第六章:MobaXterm高级功能集成

6.1 插件开发与扩展

bash

#!/usr/bin/env python3 """ mobaxterm_plugin.py MobaXterm插件示例 - 服务器批量管理器 """ import json import os import sys import subprocess import tkinter as tk from tkinter import ttk, messagebox import paramiko from pathlib import Path class ServerManagerPlugin: def __init__(self): self.config_file = Path.home() / ".mobaxterm" / "server_manager.json" self.servers = self.load_config() def load_config(self): """加载服务器配置""" if self.config_file.exists(): with open(self.config_file, 'r') as f: return json.load(f) return {} def save_config(self): """保存服务器配置""" self.config_file.parent.mkdir(parents=True, exist_ok=True) with open(self.config_file, 'w') as f: json.dump(self.servers, f, indent=2) def add_server(self, name, host, port, username, key_path, tags): """添加服务器""" self.servers[name] = { 'host': host, 'port': port, 'username': username, 'key_path': key_path, 'tags': tags.split(','), 'last_connected': None } self.save_config() def execute_command(self, server_name, command): """在服务器上执行命令""" if server_name not in self.servers: return "服务器不存在" server = self.servers[server_name] try: # 使用paramiko连接 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) key = paramiko.RSAKey.from_private_key_file(server['key_path']) ssh.connect( hostname=server['host'], port=server['port'], username=server['username'], pkey=key ) stdin, stdout, stderr = ssh.exec_command(command) output = stdout.read().decode() error = stderr.read().decode() ssh.close() return output if output else error except Exception as e: return f"连接失败: {str(e)}" def bulk_operation(self, server_names, command): """批量操作""" results = {} for name in server_names: results[name] = self.execute_command(name, command) return results class PluginGUI: def __init__(self): self.plugin = ServerManagerPlugin() self.root = tk.Tk() self.root.title("MobaXterm服务器管理器") self.root.geometry("800x600") self.setup_ui() def setup_ui(self): """设置用户界面""" # 创建主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # 服务器列表 ttk.Label(main_frame, text="服务器列表:").grid(row=0, column=0, sticky=tk.W) self.server_listbox = tk.Listbox(main_frame, width=50, height=20, selectmode=tk.MULTIPLE) self.server_listbox.grid(row=1, column=0, rowspan=4, padx=5, pady=5) self.refresh_server_list() # 控制按钮 button_frame = ttk.Frame(main_frame) button_frame.grid(row=1, column=1, padx=10) ttk.Button(button_frame, text="添加服务器", command=self.add_server_dialog).pack(pady=5) ttk.Button(button_frame, text="编辑服务器", command=self.edit_server).pack(pady=5) ttk.Button(button_frame, text="删除服务器", command=self.delete_server).pack(pady=5) ttk.Button(button_frame, text="刷新列表", command=self.refresh_server_list).pack(pady=5) # 命令输入 ttk.Label(main_frame, text="执行命令:").grid(row=5, column=0, sticky=tk.W, pady=(10,0)) self.command_entry = ttk.Entry(main_frame, width=70) self.command_entry.grid(row=6, column=0, padx=5, pady=5) self.command_entry.insert(0, "uptime") ttk.Button(main_frame, text="执行", command=self.execute_command).grid(row=6, column=1, padx=5) ttk.Button(main_frame, text="批量执行", command=self.bulk_execute).grid(row=6, column=2, padx=5) # 输出区域 ttk.Label(main_frame, text="输出:").grid(row=7, column=0, sticky=tk.W, pady=(10,0)) self.output_text = tk.Text(main_frame, width=90, height=15) self.output_text.grid(row=8, column=0, columnspan=3, padx=5, pady=5) # 滚动条 scrollbar = ttk.Scrollbar(main_frame, orient=tk.VERTICAL, command=self.output_text.yview) scrollbar.grid(row=8, column=3, sticky=(tk.N, tk.S)) self.output_text['yscrollcommand'] = scrollbar.set def refresh_server_list(self): """刷新服务器列表""" self.server_listbox.delete(0, tk.END) for server_name in self.plugin.servers.keys(): self.server_listbox.insert(tk.END, server_name) def add_server_dialog(self): """添加服务器对话框""" dialog = tk.Toplevel(self.root) dialog.title("添加服务器") dialog.geometry("400x300") # 表单字段 fields = [ ("名称:", "entry"), ("主机:", "entry"), ("端口:", "entry"), ("用户名:", "entry"), ("密钥路径:", "entry"), ("标签:", "entry") ] entries = {} for i, (label, field_type) in enumerate(fields): ttk.Label(dialog, text=label).grid(row=i, column=0, padx=5, pady=5, sticky=tk.W) if field_type == "entry": entry = ttk.Entry(dialog, width=30) entry.grid(row=i, column=1, padx=5, pady=5) entries[label.strip(":")] = entry # 设置默认值 entries["端口"].insert(0, "22") entries["用户名"].insert(0, "root") def save_server(): try: self.plugin.add_server( name=entries["名称"].get(), host=entries["主机"].get(), port=int(entries["端口"].get()), username=entries["用户名"].get(), key_path=entries["密钥路径"].get(), tags=entries["标签"].get() ) self.refresh_server_list() dialog.destroy() messagebox.showinfo("成功", "服务器添加成功") except Exception as e: messagebox.showerror("错误", f"添加失败: {str(e)}") ttk.Button(dialog, text="保存", command=save_server).grid(row=len(fields), column=0, columnspan=2, pady=10) def execute_command(self): """执行命令""" selection = self.server_listbox.curselection() if not selection: messagebox.showwarning("警告", "请选择服务器") return server_name = self.server_listbox.get(selection[0]) command = self.command_entry.get() if not command: messagebox.showwarning("警告", "请输入命令") return self.output_text.delete(1.0, tk.END) self.output_text.insert(tk.END, f"执行命令: {command}\n") self.output_text.insert(tk.END, f"服务器: {server_name}\n") self.output_text.insert(tk.END, "="*50 + "\n\n") result = self.plugin.execute_command(server_name, command) self.output_text.insert(tk.END, result) def bulk_execute(self): """批量执行命令""" selections = self.server_listbox.curselection() if not selections: messagebox.showwarning("警告", "请选择服务器") return server_names = [self.server_listbox.get(i) for i in selections] command = self.command_entry.get() if not command: messagebox.showwarning("警告", "请输入命令") return self.output_text.delete(1.0, tk.END) self.output_text.insert(tk.END, f"批量执行命令: {command}\n") self.output_text.insert(tk.END, f"服务器数量: {len(server_names)}\n") self.output_text.insert(tk.END, "="*50 + "\n\n") results = self.plugin.bulk_operation(server_names, command) for server_name, result in results.items(): self.output_text.insert(tk.END, f"\n[{server_name}]\n") self.output_text.insert(tk.END, "-"*30 + "\n") self.output_text.insert(tk.END, result + "\n") def edit_server(self): """编辑服务器""" pass def delete_server(self): """删除服务器""" selection = self.server_listbox.curselection() if not selection: return server_name = self.server_listbox.get(selection[0]) if messagebox.askyesno("确认", f"确定要删除服务器 '{server_name}' 吗?"): if server_name in self.plugin.servers: del self.plugin.servers[server_name] self.plugin.save_config() self.refresh_server_list() def run(self): """运行GUI""" self.root.mainloop() def main(): """主函数""" # 检查是否在MobaXterm中运行 if 'MOBAXTERM' not in os.environ: print("此插件需要在MobaXterm中运行") sys.exit(1) # 启动GUI app = PluginGUI() app.run() if __name__ == "__main__": main()

6.2 自动化脚本集成

创建MobaXterm宏脚本,实现自动化运维任务:

vbscript

' MobaXterm宏脚本示例 ' 文件名: daily_maintenance.mxt Option Explicit ' 主函数 Sub Main Dim session, result ' 打开日志文件 OpenLogFile "daily_maintenance.log" ' 连接到生产服务器 LogMessage "连接到生产服务器..." Set session = CreateSession("生产服务器", "192.168.1.100", "admin", "C:\ssh\prod_key.ppk") If Not session Is Nothing Then ' 执行日常维护任务 ExecuteMaintenanceTasks session ' 断开连接 session.Disconnect LogMessage "断开连接" Else LogMessage "连接失败" End If ' 发送报告 SendReport LogMessage "日常维护完成" End Sub ' 创建SSH会话 Function CreateSession(name, host, user, keypath) Dim ssh Set ssh = CreateObject("MobaXterm.SSH") ssh.Host = host ssh.Username = user ssh.PrivateKey = keypath ssh.Port = 22 ssh.X11Forwarding = True If ssh.Connect Then LogMessage "成功连接到 " & name Set CreateSession = ssh Else LogMessage "连接失败: " & ssh.LastError Set CreateSession = Nothing End If End Function ' 执行维护任务 Sub ExecuteMaintenanceTasks(session) Dim commands, cmd, result ' 定义要执行的命令 commands = Array( _ "echo '=== 系统信息 ==='", _ "uname -a", _ "uptime", _ "echo '=== 磁盘使用 ==='", _ "df -h", _ "echo '=== 内存使用 ==='", _ "free -m", _ "echo '=== 服务状态 ==='", _ "systemctl list-units --type=service --state=failed", _ "echo '=== 检查日志 ==='", _ "tail -20 /var/log/syslog", _ "echo '=== 清理临时文件 ==='", _ "find /tmp -type f -mtime +7 -delete", _ "echo '=== 更新包列表 ==='", _ "apt-get update", _ "echo '=== 检查安全更新 ==='", _ "apt-get upgrade -s" _ ) ' 执行每个命令 For Each cmd In commands LogMessage "执行命令: " & cmd result = session.Execute(cmd) If session.ExitCode = 0 Then LogMessage "输出: " & Left(result, 200) & "..." Else LogMessage "命令失败: " & session.LastError End If ' 短暂暂停 Sleep 1000 Next End Sub ' 记录日志 Sub LogMessage(message) Dim timestamp timestamp = Now ' 写入日志文件 Print #1, "[" & timestamp & "] " & message ' 显示在MobaXterm输出 MobaXterm.ShowMessage message End Sub ' 打开日志文件 Sub OpenLogFile(filename) Dim logPath logPath = GetMobaXtermPath & "\logs\" & filename Open logPath For Append As #1 LogMessage "开始日常维护" End Sub ' 发送报告 Sub SendReport ' 这里可以添加发送邮件或生成报告的逻辑 LogMessage "报告发送功能" End Sub ' 获取MobaXterm路径 Function GetMobaXtermPath GetMobaXtermPath = CreateObject("WScript.Shell").ExpandEnvironmentStrings("%MOBAXTERM_HOME%") End Function

第七章:最佳实践与性能优化

7.1 MobaXterm性能优化

1. 会话配置优化:

ini

# MobaXterm.ini 优化配置 [Settings] KeepAliveInterval=60 MaxKeepAlives=3 Compression=yes UseDNS=no ForwardAgent=yes ForwardX11=yes X11DisplayOffset=10 X11UseLocalhost=yes TCPKeepAlive=yes ServerAliveCountMax=3 ServerAliveInterval=15

2. 内存优化脚本:

bash

#!/bin/bash # mobaxterm_optimizer.sh CONFIG_DIR="$HOME/.mobaxterm" BACKUP_DIR="$HOME/mobaxterm_backup_$(date +%Y%m%d)" # 备份当前配置 backup_config() { echo "备份MobaXterm配置..." mkdir -p "$BACKUP_DIR" cp -r "$CONFIG_DIR" "$BACKUP_DIR/" cp "/proc/registry/HKEY_CURRENT_USER/Software/Mobatek/MobaXterm"/* "$BACKUP_DIR/registry/" 2>/dev/null || true echo "备份完成: $BACKUP_DIR" } # 清理缓存 clean_cache() { echo "清理缓存..." # 清理会话日志 find "$CONFIG_DIR/sessions" -name "*.log" -mtime +30 -delete # 清理临时文件 find "$CONFIG_DIR/tmp" -type f -mtime +1 -delete # 清理下载缓存 find "$CONFIG_DIR/downloads" -type f -mtime +7 -delete # 清理插件缓存 find "$CONFIG_DIR/plugins" -name "*.cache" -delete echo "缓存清理完成" } # 优化配置 optimize_config() { echo "优化配置..." # 优化SSH配置 cat > "$CONFIG_DIR/ssh_config" << 'EOF' Host * Compression yes ServerAliveInterval 60 ServerAliveCountMax 3 TCPKeepAlive yes ForwardAgent yes ForwardX11 yes X11UseLocalhost yes LogLevel ERROR StrictHostKeyChecking accept-new EOF # 优化X11配置 cat > "$CONFIG_DIR/X11.conf" << 'EOF' X11Forwarding yes X11DisplayOffset 10 X11UseLocalhost yes XAuthLocation /usr/bin/xauth EOF echo "配置优化完成" } # 性能测试 performance_test() { echo "性能测试..." # 测试SSH连接速度 echo "SSH连接测试:" time ssh -o "ConnectTimeout=5" localhost "exit" 2>/dev/null # 测试X11转发 echo -e "\nX11转发测试:" if xclock -version 2>/dev/null; then echo "X11转发正常" else echo "X11转发不可用" fi # 测试SFTP速度 echo -e "\nSFTP测试:" dd if=/dev/zero bs=1M count=10 2>/dev/null | \ time sftp -q user@localhost:/tmp/testfile 2>&1 | \ grep -E "real|user|sys" } main() { echo "MobaXterm性能优化工具" echo "======================" backup_config clean_cache optimize_config performance_test echo -e "\n优化完成!" echo "建议重启MobaXterm使更改生效" } main "$@"

7.2 安全最佳实践

bash

#!/bin/bash # security_hardening.sh LOG_FILE="/var/log/security_hardening.log" REPORT_FILE="/tmp/security_report_$(date +%Y%m%d).html" # 安全加固函数 harden_ssh() { echo "加固SSH配置..." local sshd_config="/etc/ssh/sshd_config" local backup_file="${sshd_config}.backup.$(date +%Y%m%d)" # 备份原始配置 cp "$sshd_config" "$backup_file" # 应用安全配置 cat > /tmp/sshd_hardening.conf << 'EOF' # 安全SSH配置 Port 2222 Protocol 2 PermitRootLogin prohibit-password MaxAuthTries 3 MaxSessions 10 ClientAliveInterval 300 ClientAliveCountMax 2 LoginGraceTime 60 AllowUsers admin deploy monitor DenyUsers root test guest PasswordAuthentication no PubkeyAuthentication yes PermitEmptyPasswords no ChallengeResponseAuthentication no KerberosAuthentication no GSSAPIAuthentication no X11Forwarding yes X11DisplayOffset 10 PrintMotd no PrintLastLog yes TCPKeepAlive yes UsePAM yes AllowTcpForwarding yes UseDNS no Compression delayed EOF # 合并配置 while read -r line; do if [[ ! "$line" =~ ^# ]] && [[ -n "$line" ]]; then local key=$(echo "$line" | awk '{print $1}') sed -i "/^$key/d" "$sshd_config" fi done < /tmp/sshd_hardening.conf cat /tmp/sshd_hardening.conf >> "$sshd_config" # 重启SSH服务 systemctl restart sshd echo "SSH加固完成" } # 防火墙配置 setup_firewall() { echo "配置防火墙..." # 使用ufw(如果可用) if command -v ufw &> /dev/null; then ufw --force reset ufw default deny incoming ufw default allow outgoing ufw allow 2222/tcp # SSH ufw allow 80/tcp # HTTP ufw allow 443/tcp # HTTPS ufw --force enable ufw status verbose else # 使用iptables iptables -F iptables -X iptables -Z # 设置默认策略 iptables -P INPUT DROP iptables -P FORWARD DROP iptables -P OUTPUT ACCEPT # 允许本地回环 iptables -A INPUT -i lo -j ACCEPT iptables -A OUTPUT -o lo -j ACCEPT # 允许已建立的连接 iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT # 开放必要端口 iptables -A INPUT -p tcp --dport 2222 -j ACCEPT # SSH iptables -A INPUT -p tcp --dport 80 -j ACCEPT # HTTP iptables -A INPUT -p tcp --dport 443 -j ACCEPT # HTTPS # 保存规则 iptables-save > /etc/iptables/rules.v4 fi echo "防火墙配置完成" } # 系统安全设置 harden_system() { echo "加固系统设置..." # 禁用不必要的服务 local services_to_disable=( "rpcbind" "nfs-common" "telnet" "vsftpd" "smbd" ) for service in "${services_to_disable[@]}"; do if systemctl is-active --quiet "$service"; then systemctl stop "$service" systemctl disable "$service" echo "已禁用服务: $service" fi done # 配置密码策略 cat > /etc/security/pwquality.conf << 'EOF' # 密码质量配置 minlen = 12 dcredit = -1 ucredit = -1 ocredit = -1 lcredit = -1 minclass = 3 maxrepeat = 2 EOF # 配置登录限制 cat > /etc/security/limits.conf << 'EOF' # 资源限制 * soft nproc 100 * hard nproc 150 * soft nofile 100000 * hard nofile 100000 * soft core 0 * hard core 0 EOF # 禁用核心转储 echo "* hard core 0" >> /etc/security/limits.conf echo "fs.suid_dumpable = 0" >> /etc/sysctl.conf # 配置sysctl安全参数 cat >> /etc/sysctl.conf << 'EOF' # 网络安全 net.ipv4.tcp_syncookies = 1 net.ipv4.ip_forward = 0 net.ipv4.conf.all.send_redirects = 0 net.ipv4.conf.default.send_redirects = 0 net.ipv4.conf.all.accept_source_route = 0 net.ipv4.conf.default.accept_source_route = 0 net.ipv4.conf.all.accept_redirects = 0 net.ipv4.conf.default.accept_redirects = 0 net.ipv4.conf.all.secure_redirects = 0 net.ipv4.conf.default.secure_redirects = 0 net.ipv4.conf.all.log_martians = 1 net.ipv4.conf.default.log_martians = 1 net.ipv4.icmp_echo_ignore_broadcasts = 1 net.ipv4.icmp_ignore_bogus_error_responses = 1 # 内核安全 kernel.randomize_va_space = 2 kernel.exec-shield = 1 kernel.kptr_restrict = 1 EOF # 应用sysctl设置 sysctl -p echo "系统加固完成" } # 文件权限加固 harden_permissions() { echo "加固文件权限..." # 关键目录权限 chmod 750 /home/* chmod 700 /root chmod 644 /etc/passwd chmod 600 /etc/shadow chmod 644 /etc/group # 设置SUID/SGID审计 find / -type f \( -perm -4000 -o -perm -2000 \) -exec ls -la {} \; 2>/dev/null > /tmp/suid_sgid_audit.txt # 配置umask echo "umask 027" >> /etc/profile echo "umask 027" >> /etc/bash.bashrc echo "文件权限加固完成" } # 审计配置 setup_audit() { echo "配置审计..." # 安装auditd(如果未安装) if ! command -v auditctl &> /dev/null; then apt-get install -y auditd audispd-plugins fi # 配置审计规则 cat > /etc/audit/rules.d/security.rules << 'EOF' # 监控系统调用 -a always,exit -F arch=b64 -S adjtimex -S settimeofday -k time-change -a always,exit -F arch=b32 -S adjtimex -S settimeofday -S stime -k time-change -a always,exit -F arch=b64 -S clock_settime -k time-change -a always,exit -F arch=b32 -S clock_settime -k time-change # 监控用户和组变更 -w /etc/passwd -p wa -k identity -w /etc/group -p wa -k identity -w /etc/shadow -p wa -k identity -w /etc/gshadow -p wa -k identity # 监控网络配置 -w /etc/network/ -p wa -k network # 监控文件系统挂载 -a always,exit -F arch=b64 -S mount -S umount2 -k mounts # 监控文件删除 -a always,exit -F arch=b64 -S unlink -S unlinkat -S rename -S renameat -k delete # 监控特权命令 -a always,exit -F arch=b64 -S chmod -S fchmod -S fchmodat -k perm_mod -a always,exit -F arch=b64 -S chown -S fchown -S fchownat -S lchown -k perm_mod EOF # 重启审计服务 systemctl restart auditd echo "审计配置完成" } # 生成安全报告 generate_security_report() { echo "生成安全报告..." cat > "$REPORT_FILE" << EOF <!DOCTYPE html> <html> <head> <title>安全加固报告 - $(hostname)</title> <style> body { font-family: Arial, sans-serif; margin: 40px; } .header { background: #2c3e50; color: white; padding: 20px; } .section { margin: 20px 0; padding: 15px; border-left: 4px solid #3498db; } .critical { border-left-color: #e74c3c; } .warning { border-left-color: #f39c12; } .ok { border-left-color: #2ecc71; } pre { background: #ecf0f1; padding: 10px; overflow: auto; } </style> </head> <body> <div class="header"> <h1>安全加固报告</h1> <p>主机: $(hostname) | 时间: $(date)</p> </div> <div class="section ok"> <h2>SSH安全状态</h2> <pre>$(sshd -T 2>/dev/null | grep -E "(permitrootlogin|passwordauthentication|port)" || echo "检查失败")</pre> </div> <div class="section ok"> <h2>防火墙状态</h2> <pre>$(iptables -L -n 2>/dev/null || ufw status verbose 2>/dev/null || echo "防火墙未配置")</pre> </div> <div class="section"> <h2>系统信息</h2> <pre>$(uname -a)</pre> <pre>$(uptime)</pre> </div> <div class="section warning"> <h2>安全警告</h2> <ul> <li>请定期更新系统补丁</li> <li>监控审计日志</li> <li>定期更换密码</li> </ul> </div> </body> </html> EOF echo "安全报告已生成: $REPORT_FILE" } # 主函数 main() { echo "开始安全加固..." echo "日志文件: $LOG_FILE" # 执行加固步骤 harden_ssh >> "$LOG_FILE" 2>&1 setup_firewall >> "$LOG_FILE" 2>&1 harden_system >> "$LOG_FILE" 2>&1 harden_permissions >> "$LOG_FILE" 2>&1 setup_audit >> "$LOG_FILE" 2>&1 # 生成报告 generate_security_report echo "安全加固完成" echo "请查看报告: $REPORT_FILE" echo "请重启系统使所有更改生效" } # 检查root权限 if [ "$(id -u)" -ne 0 ]; then echo "错误: 需要root权限" exit 1 fi main

第八章:故障排除与调试

8.1 常见问题解决方案

bash

#!/bin/bash # mobaxterm_troubleshooter.sh LOG_FILE="/tmp/mobaxterm_troubleshoot_$(date +%Y%m%d_%H%M%S).log" # 颜色定义 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' # 日志函数 log() { local level="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S') case $level in INFO) echo -e "${GREEN}[INFO]${NC} $message" ;; WARN) echo -e "${YELLOW}[WARN]${NC} $message" ;; ERROR) echo -e "${RED}[ERROR]${NC} $message" ;; esac echo "[$timestamp] [$level] $message" >> "$LOG_FILE" } # 检查网络连接 check_network() { log INFO "检查网络连接..." # 测试本地网络 if ping -c 2 -W 1 127.0.0.1 > /dev/null 2>&1; then log INFO "本地网络正常" else log ERROR "本地网络故障" return 1 fi # 测试网关 local gateway=$(ip route | grep default | awk '{print $3}') if [ -n "$gateway" ]; then if ping -c 2 -W 1 "$gateway" > /dev/null 2>&1; then log INFO "网关 $gateway 可达" else log WARN "网关 $gateway 不可达" fi fi # 测试DNS if nslookup google.com > /dev/null 2>&1; then log INFO "DNS解析正常" else log WARN "DNS解析可能有问题" fi return 0 } # 检查SSH服务 check_ssh() { log INFO "检查SSH服务..." # 检查本地SSH服务 if systemctl is-active ssh > /dev/null 2>&1; then log INFO "SSH服务正在运行" else log WARN "SSH服务未运行" fi # 检查SSH配置 if [ -f /etc/ssh/sshd_config ]; then log INFO "SSH配置文件存在" # 检查关键配置 local port=$(grep -E "^Port" /etc/ssh/sshd_config | awk '{print $2}') local permit_root=$(grep -E "^PermitRootLogin" /etc/ssh/sshd_config | awk '{print $2}') if [ -n "$port" ]; then log INFO "SSH端口: $port" fi if [ -n "$permit_root" ]; then log INFO "Root登录: $permit_root" fi fi # 测试SSH连接 log INFO "测试SSH连接到本地..." if ssh -o "ConnectTimeout=5" localhost "echo SSH测试成功" > /dev/null 2>&1; then log INFO "SSH本地连接测试成功" else log ERROR "SSH本地连接测试失败" fi } # 检查X11转发 check_x11() { log INFO "检查X11转发..." # 检查X11是否安装 if command -v xclock > /dev/null 2>&1; then log INFO "X11客户端已安装" else log WARN "X11客户端未安装" fi # 检查DISPLAY变量 if [ -n "$DISPLAY" ]; then log INFO "DISPLAY变量: $DISPLAY" else log WARN "DISPLAY变量未设置" fi # 测试X11应用 log INFO "测试X11应用..." if timeout 5 xclock > /dev/null 2>&1; then log INFO "X11应用测试成功" else log WARN "X11应用测试失败" fi } # 检查MobaXterm配置 check_mobaxterm() { log INFO "检查MobaXterm配置..." # 检查配置文件 local config_paths=( "$HOME/.mobaxterm" "/proc/registry/HKEY_CURRENT_USER/Software/Mobatek/MobaXterm" ) for path in "${config_paths[@]}"; do if [ -d "$path" ] || [ -e "$path" ]; then log INFO "配置文件路径存在: $path" else log WARN "配置文件路径不存在: $path" fi done # 检查会话配置 local sessions_dir="$HOME/.mobaxterm/sessions" if [ -d "$sessions_dir" ]; then local session_count=$(find "$sessions_dir" -name "*.mx*" | wc -l) log INFO "找到 $session_count 个会话配置" fi } # 检查防火墙 check_firewall() { log INFO "检查防火墙..." # 检查iptables if command -v iptables > /dev/null 2>&1; then log INFO "iptables已安装" # 检查SSH端口是否开放 if iptables -L -n 2>/dev/null | grep -q ":22 "; then log INFO "SSH端口(22)在防火墙中已配置" else log WARN "SSH端口(22)可能在防火墙中被阻止" fi fi # 检查ufw if command -v ufw > /dev/null 2>&1; then log INFO "ufw已安装" ufw status verbose >> "$LOG_FILE" 2>&1 fi } # 收集系统信息 collect_system_info() { log INFO "收集系统信息..." { echo "=== 系统信息 ===" uname -a echo "" echo "=== 网络信息 ===" ip addr show echo "" echo "=== 路由信息 ===" ip route show echo "" echo "=== SSH配置 ===" grep -v "^#" /etc/ssh/sshd_config | grep -v "^$" | head -20 echo "" echo "=== 环境变量 ===" env | grep -E "(SSH|X11|DISPLAY|MOBA)" echo "" echo "=== 进程信息 ===" ps aux | grep -E "(ssh|x11|Xorg)" | head -10 } >> "$LOG_FILE" } # 提供解决方案 suggest_solutions() { log INFO "提供解决方案建议..." echo "" echo "=== 故障排除建议 ===" echo "" # 检查日志中的错误 grep -i "error\|fail\|warn" "$LOG_FILE" | while read -r line; do echo "发现: $line" case $line in *"本地网络故障"*) echo "建议: 检查网络适配器设置" ;; *"SSH本地连接测试失败"*) echo "建议: 确保SSH服务已启动: sudo systemctl start ssh" ;; *"DISPLAY变量未设置"*) echo "建议: 设置DISPLAY变量: export DISPLAY=localhost:0.0" ;; *"X11应用测试失败"*) echo "建议: 检查X11服务器是否运行,或者使用Xming等替代方案" ;; *"防火墙中被阻止"*) echo "建议: 开放SSH端口: sudo ufw allow 22/tcp" ;; esac echo "" done } # 主函数 main() { echo "MobaXterm故障排除工具" echo "======================" echo "日志文件: $LOG_FILE" echo "" # 执行检查 check_network echo "" check_ssh echo "" check_x11 echo "" check_mobaxterm echo "" check_firewall echo "" collect_system_info # 提供解决方案 suggest_solutions echo "故障排除完成" echo "请查看详细日志: $LOG_FILE" } main

8.2 性能监控脚本

bash

#!/bin/bash # performance_monitor.sh INTERVAL=${1:-5} # 监控间隔,默认5秒 DURATION=${2:-60} # 监控时长,默认60秒 LOG_FILE="/tmp/performance_monitor_$(date +%Y%m%d_%H%M%S).csv" PLOT_SCRIPT="/tmp/performance_plot.gnuplot" # 初始化CSV文件 init_csv() { echo "timestamp,cpu_usage,memory_usage,disk_io_read,disk_io_write,network_rx,network_tx,load_1,load_5,load_15" > "$LOG_FILE" } # 收集性能数据 collect_metrics() { local timestamp=$(date +%Y-%m-%d\ %H:%M:%S) # CPU使用率 local cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1) # 内存使用率 local memory_usage=$(free | awk 'NR==2{printf "%.2f", $3*100/$2}') # 磁盘IO(使用iostat) local disk_io=$(iostat -d -k 1 2 | tail -n +7 | head -1) local disk_read=$(echo $disk_io | awk '{print $3}') local disk_write=$(echo $disk_io | awk '{print $4}') # 网络流量 local network_stats=$(cat /proc/net/dev | grep eth0 | awk '{print $2","$10}') local network_rx=$(echo $network_stats | cut -d',' -f1) local network_tx=$(echo $network_stats | cut -d',' -f2) # 系统负载 local load_avg=$(awk '{print $1","$2","$3}' /proc/loadavg) local load_1=$(echo $load_avg | cut -d',' -f1) local load_5=$(echo $load_avg | cut -d',' -f2) local load_15=$(echo $load_avg | cut -d',' -f3) # 写入CSV echo "$timestamp,$cpu_usage,$memory_usage,$disk_read,$disk_write,$network_rx,$network_tx,$load_1,$load_5,$load_15" >> "$LOG_FILE" # 实时显示 printf "时间: %s | CPU: %6.2f%% | 内存: %6.2f%% | 负载: %s,%s,%s\n" \ "$timestamp" "$cpu_usage" "$memory_usage" "$load_1" "$load_5" "$load_15" } # 生成性能报告 generate_report() { echo "生成性能报告..." local report_file="${LOG_FILE%.csv}_report.txt" { echo "性能监控报告" echo "=============" echo "监控时间: $(date)" echo "监控时长: ${DURATION}秒" echo "监控间隔: ${INTERVAL}秒" echo "" echo "统计摘要:" echo "---------" # 读取CSV数据(跳过标题行) tail -n +2 "$LOG_FILE" | awk -F',' ' BEGIN { count=0; cpu_sum=0; cpu_max=0; mem_sum=0; mem_max=0; load1_sum=0; load1_max=0; } { count++; cpu_sum+=$2; if($2>cpu_max) cpu_max=$2; mem_sum+=$3; if($3>mem_max) mem_max=$3; load1_sum+=$8; if($8>load1_max) load1_max=$8; } END { if(count>0) { printf "CPU使用率: 平均=%.2f%%, 最高=%.2f%%\n", cpu_sum/count, cpu_max; printf "内存使用率: 平均=%.2f%%, 最高=%.2f%%\n", mem_sum/count, mem_max; printf "系统负载(1分钟): 平均=%.2f, 最高=%.2f\n", load1_sum/count, load1_max; } }' echo "" echo "峰值时间点:" echo "-----------" # 找出CPU使用率最高的时间点 echo "CPU使用率最高的时间点:" tail -n +2 "$LOG_FILE" | sort -t',' -k2 -nr | head -3 | while read line; do echo " $line" | awk -F',' '{printf " %s: %.2f%%\n", $1, $2}' done echo "" # 找出内存使用率最高的时间点 echo "内存使用率最高的时间点:" tail -n +2 "$LOG_FILE" | sort -t',' -k3 -nr | head -3 | while read line; do echo " $line" | awk -F',' '{printf " %s: %.2f%%\n", $1, $3}' done echo "" echo "建议:" echo "-----" # 基于数据的建议 tail -n +2 "$LOG_FILE" | awk -F',' ' BEGIN { high_cpu=0; high_mem=0; high_load=0; threshold_cpu=80; threshold_mem=80; threshold_load=5; } { if($2 > threshold_cpu) high_cpu++; if($3 > threshold_mem) high_mem++; if($8 > threshold_load) high_load++; } END { if(high_cpu > 0) printf "- CPU使用率多次超过%d%%,考虑优化应用程序或增加CPU资源\n", threshold_cpu; if(high_mem > 0) printf "- 内存使用率多次超过%d%%,考虑优化内存使用或增加内存\n", threshold_mem; if(high_load > 0) printf "- 系统负载多次超过%d,考虑负载均衡或优化系统性能\n", threshold_load; if(high_cpu==0 && high_mem==0 && high_load==0) printf "- 系统性能良好,无明显瓶颈\n"; }' } > "$report_file" echo "报告已生成: $report_file" } # 创建图表 create_plots() { echo "创建性能图表..." cat > "$PLOT_SCRIPT" << 'EOF' set terminal png size 1200,800 set output 'performance_plot.png' set multiplot layout 2,2 title "性能监控图表" # CPU使用率 set title "CPU使用率" set xdata time set timefmt "%Y-%m-%d %H:%M:%S" set format x "%H:%M" set ylabel "使用率 (%)" set grid plot 'performance_data.csv' using 1:2 with lines title 'CPU' # 内存使用率 set title "内存使用率" set ylabel "使用率 (%)" plot 'performance_data.csv' using 1:3 with lines title '内存' # 系统负载 set title "系统负载" set ylabel "负载" plot 'performance_data.csv' using 1:8 with lines title '1分钟负载', \ '' using 1:9 with lines title '5分钟负载', \ '' using 1:10 with lines title '15分钟负载' # 磁盘IO set title "磁盘IO" set ylabel "KB/s" plot 'performance_data.csv' using 1:4 with lines title '读取', \ '' using 1:5 with lines title '写入' unset multiplot EOF # 复制数据文件供gnuplot使用 cp "$LOG_FILE" performance_data.csv if command -v gnuplot > /dev/null 2>&1; then gnuplot "$PLOT_SCRIPT" echo "图表已生成: performance_plot.png" else echo "警告: gnuplot未安装,无法生成图表" fi } # 主监控循环 monitor_loop() { echo "开始性能监控..." echo "监控间隔: ${INTERVAL}秒" echo "监控时长: ${DURATION}秒" echo "日志文件: $LOG_FILE" echo "" init_csv local start_time=$(date +%s) local end_time=$((start_time + DURATION)) local current_time while true; do current_time=$(date +%s) if [ $current_time -ge $end_time ]; then break fi collect_metrics sleep $INTERVAL done echo "" echo "监控完成" generate_report create_plots } # 清理函数 cleanup() { rm -f performance_data.csv "$PLOT_SCRIPT" 2>/dev/null || true } trap cleanup EXIT # 检查依赖 check_dependencies() { local missing=() for cmd in iostat free top awk; do if ! command -v $cmd > /dev/null 2>&1; then missing+=("$cmd") fi done if [ ${#missing[@]} -gt 0 ]; then echo "缺少必要工具: ${missing[*]}" echo "请安装: sudo apt-get install sysstat procps" exit 1 fi } main() { check_dependencies monitor_loop } main "$@"

总结

MobaXterm作为一款强大的Windows远程计算工具,通过合理的配置和自动化脚本,可以极大地提高Linux运维工作的效率。本文涵盖了从基础连接到高级自动化运维的全方位内容,包括:

  1. 高效连接管理:SSH配置优化、批量服务器管理

  2. 文件传输自动化:高级rsync、实时同步监控

  3. 网络诊断:全面的网络检测和监控工具

  4. 自动化运维:健康检查、自动化部署

  5. 插件开发:扩展MobaXterm功能

  6. 性能优化:系统调优和安全加固

  7. 故障排除:问题诊断和性能监控

通过实施这些技巧和脚本,运维工程师可以:

  • 减少重复性工作,提高工作效率

  • 确保系统稳定性和安全性

  • 快速定位和解决问题

  • 实现运维工作的标准化和自动化

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/4 10:24:47

揭秘Docker容器异常崩溃:智能Agent如何实现秒级告警响应

第一章&#xff1a;揭秘Docker容器异常崩溃&#xff1a;智能Agent如何实现秒级告警响应在现代微服务架构中&#xff0c;Docker容器的稳定性直接影响业务连续性。当容器突发崩溃时&#xff0c;传统的日志轮询与人工排查机制往往滞后数分钟甚至更久&#xff0c;难以满足高可用系统…

作者头像 李华
网站建设 2026/2/6 23:06:17

【企业Agent安全防护指南】:Docker镜像漏洞扫描必备的5大核心技术揭秘

第一章&#xff1a;企业Agent安全防护的演进与挑战随着企业数字化转型的深入&#xff0c;终端Agent作为连接安全系统与主机的核心组件&#xff0c;其安全性直接影响整体防御体系的可靠性。早期的Agent设计侧重功能实现&#xff0c;如日志采集、策略执行和远程控制&#xff0c;但…

作者头像 李华
网站建设 2026/2/6 22:59:53

Docker镜像安全的最后防线(每小时扫描vs每日扫描,谁更可靠?)

第一章&#xff1a;Docker镜像安全的最后防线在容器化应用日益普及的今天&#xff0c;Docker镜像作为交付的核心单元&#xff0c;其安全性直接关系到整个系统的稳定与数据的安全。一旦镜像中存在恶意代码、未修复漏洞或敏感信息泄露&#xff0c;攻击者便可能通过容器逃逸、权限…

作者头像 李华