Python 运维脚本
1、备份文件
import os
import shutil# 定义配置文件目录和备份目录的路径
config_dir = "/root/python/to/config/files/"
backup_dir = "/root/python/to/backup/"# 遍历配置文件目录中的所有文件
for filename in os.listdir(config_dir):# 如果文件名以 ".conf" 结尾,则执行备份操作if filename.endswith('.conf'):# 构建完整的文件路径file_path = os.path.join(config_dir, filename)# 构建备份文件路径backup_path = os.path.join(backup_dir, filename)# 将文件复制到备份目录shutil.copy(file_path, backup_path)# 打印备份完成的消息print(f"Backup of {filename} completed")
2、安装pip
- 根据不同的版本,安装不同的pip
wget https://bootstrap.pypa.io/pip/3.6/get-pip.py
python3 get-pip.py
3、将备份文件传送到远程主机上进行备份
import os
import shutil
import paramiko# 定义配置
local_config_dir = "/root/python/to/config/files/"
remote_backup_dir = "/root/python/to/backup/"
remote_host = "192.168.1.101"
remote_username = "root" # 根据实际情况修改用户名
remote_password = "your_password" # 根据实际情况修改密码# 检查本地配置目录是否存在
if not os.path.exists(local_config_dir):print(f"Local directory {local_config_dir} does not exist.")exit(1)# 创建一个 SSH 客户端对象
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:# 连接远程服务器print(f"Connecting to {remote_host}...")ssh.connect(remote_host, username=remote_username, password=remote_password)# 创建一个 SFTP 客户端对象sftp = ssh.open_sftp()# 检查远程目录是否存在,如果不存在则创建print(f"Checking remote directory {remote_backup_dir}...")try:sftp.stat(remote_backup_dir)print(f"Directory {remote_backup_dir} already exists.")except IOError:print(f"Directory {remote_backup_dir} does not exist. Creating...")sftp.mkdir(remote_backup_dir)# 遍历本地目录中的所有文件for filename in os.listdir(local_config_dir):file_path = os.path.join(local_config_dir, filename)backup_path = os.path.join(remote_backup_dir, filename)# 检查是否为文件(忽略目录)if os.path.isfile(file_path):print(f"Transferring {file_path} to {backup_path}...")sftp.put(file_path, backup_path)print(f"Transfer of {filename} completed.")finally:# 关闭 SFTP 和 SSH 连接print("Closing SSH connection...")ssh.close()
4、对历史日志进行删除
## 日志保留策略 import os
import time# 日志文件所在的目录
log_dir = '/path/to/logs/'# 文件的最大保留天数
days_old = 30# 获取当前时间戳
current_time = time.time()# 遍历日志目录中的所有文件
for filename in os.listdir(log_dir):# 构造文件的完整路径file_path = os.path.join(log_dir, filename)# 计算文件的年龄(以秒为单位)file_age = current_time - os.path.getmtime(file_path)# 如果文件年龄超过最大保留天数(以秒为单位),则删除该文件if file_age > days_old * 86400:os.remove(file_path)print(f"Deleted old log file: {filename}")
5、查看模块是否安装
import importlib
import sys # 用于处理命令行参数def check_modules(modules):"""检查指定的模块是否已安装"""for module_name in modules:try:importlib.import_module(module_name)print(f"{module_name} is installed.")except ImportError:print(f"{module_name} is not installed.")if __name__ == "__main__":# 通过命令行参数获取模块名称列表if len(sys.argv) < 2:print("Usage: python script.py module1 module2 ...")sys.exit(1)# 从命令行参数中提取模块名称modules = sys.argv[1:]# 检查模块是否已安装check_modules(modules)
- 脚本使用:
python3 test1.py time os
6、 查看系统进程
##获取当前所有的进程
import psutil# 遍历所有进程,获取进程的 PID 和名称
for proc in psutil.process_iter(['pid', 'name']):# 打印每个进程的 PID 和名称print(f"PID: {proc.info['pid']}, 进程名: {proc.info['name']}")
7、系统资源监控
1、只监控挂载点
import psutildef get_disk_usage():partitions = psutil.disk_partitions()for partition in partitions:usage = psutil.disk_usage(partition.mountpoint)total_gb = usage.total / (2**30) # 转换为 GiBused_gb = usage.used / (2**30) # 转换为 GiBfree_gb = usage.free / (2**30) # 转换为 GiBprint(f"挂载点: {partition.mountpoint}")print(f"总空间: {total_gb:.2f} GiB")print(f"已用空间: {used_gb:.2f} GiB")print(f"剩余空间: {free_gb:.2f} GiB")print("-" * 30)# 调用函数
get_disk_usage()
2、全部挂载点都监控
import psutildef get_disk_usage(mountpoints):for mountpoint in mountpoints:try:usage = psutil.disk_usage(mountpoint)total_gb = usage.total / (2**30) # 转换为 GiBused_gb = usage.used / (2**30) # 转换为 GiBfree_gb = usage.free / (2**30) # 转换为 GiBprint(f"挂载点: {mountpoint}")print(f"总空间: {total_gb:.2f} GiB")print(f"已用空间: {used_gb:.2f} GiB")print(f"剩余空间: {free_gb:.2f} GiB")print("-" * 30)except Exception as e:print(f"分析挂载点 {mountpoint} 出错: {e}")# 手动指定要分析的挂载点
mountpoints = ['/', # 根目录'/boot', # 启动分区'/dev', # 设备文件挂载点'/tmp', # 临时文件系统'/run', # 用于运行时数据'/var/lib/docker/overlay2', # Docker 的 overlayfs 挂载点'/sys/fs/cgroup', # cgroup 文件系统'/proc', # proc 文件系统'/sys', # 系统文件'/mnt' # 用户指定的挂载点示例
]# 调用函数
get_disk_usage(mountpoints)
3、CPU 内存 磁盘监控
import datetime
import psutil
import shutildef get_system_report():report = []# 报告生成时间report.append(f"报告生成时间: {datetime.datetime.now().strftime('%m-%d')}")# CPU使用率report.append(f"CPU使用率: {psutil.cpu_percent()}%")# 内存使用率report.append(f"内存使用率: {psutil.virtual_memory().percent}%")# 磁盘使用情况total, used, free = shutil.disk_usage("/")report.append(f"磁盘总空间: {total // (2**30)}GiB")report.append(f"磁盘已用空间: {used // (2**30)}GiB")report.append(f"磁盘剩余空间: {free // (2**30)}GiB")# 返回拼接的报告return "\n".join(report)# 打印系统报告
print(get_system_report())
8、用户管理
#!/usr/bin/env python3
# -*- coding: utf-8 -*-import os
import sys
import pwd
import grp
import subprocess
from typing import List, Optional
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.panel import Panel
from rich import print as rprintconsole = Console()class UserManager:def __init__(self):self.console = Console()self.check_root_privileges()self.sudoers_file = "/etc/sudoers"self.sudoers_dir = "/etc/sudoers.d"def check_root_privileges(self):"""检查是否具有root权限"""if os.geteuid() != 0:self.console.print("[red]错误:此程序需要root权限才能运行![/red]")self.console.print("请使用 sudo 运行此程序")sys.exit(1)def run_command(self, command: List[str]) -> tuple:"""执行shell命令"""try:result = subprocess.run(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True,check=True)return True, result.stdoutexcept subprocess.CalledProcessError as e:return False, e.stderrdef list_users(self):"""列出所有用户"""table = Table(title="系统用户列表")table.add_column("用户名", style="cyan")table.add_column("UID", style="green")table.add_column("GID", style="yellow")table.add_column("主目录", style="blue")table.add_column("Shell", style="magenta")for user in pwd.getpwall():table.add_row(user.pw_name,str(user.pw_uid),str(user.pw_gid),user.pw_dir,user.pw_shell)self.console.print(table)def create_user(self):"""创建新用户"""try:username = Prompt.ask("请输入新用户名")# 检查用户是否已存在try:pwd.getpwnam(username)self.console.print(f"[red]错误:用户 {username} 已存在![/red]")returnexcept KeyError:pass# 获取用户信息home_dir = Prompt.ask("请输入主目录", default=f"/home/{username}")shell = Prompt.ask("请输入shell", default="/bin/bash")create_home = Confirm.ask("是否创建主目录?", default=True)set_password = Confirm.ask("是否设置密码?", default=True)# 构建useradd命令cmd = ["useradd"]if create_home:cmd.append("-m")cmd.extend(["-d", home_dir, "-s", shell, username])success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功创建用户 {username}[/green]")if set_password:self.set_password(username)else:self.console.print(f"[red]创建用户失败:{output}[/red]")except KeyboardInterrupt:self.console.print("\n[yellow]已取消创建用户[/yellow]")def delete_user(self):"""删除用户"""username = Prompt.ask("请输入要删除的用户名")# 检查用户是否存在try:pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")returnremove_home = Confirm.ask("是否删除用户主目录?", default=True)cmd = ["userdel"]if remove_home:cmd.append("-r")cmd.append(username)success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功删除用户 {username}[/green]")else:self.console.print(f"[red]删除用户失败:{output}[/red]")def set_password(self, username: Optional[str] = None):"""设置用户密码"""try:if username is None:username = Prompt.ask("请输入用户名")# 检查用户是否存在try:pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")return# 使用chpasswd命令设置密码password = Prompt.ask("请输入新密码", password=True)confirm_password = Prompt.ask("请再次输入密码", password=True)if password != confirm_password:self.console.print("[red]错误:两次输入的密码不一致![/red]")return# 使用echo和chpasswd命令设置密码cmd = f"echo '{username}:{password}' | chpasswd"success, output = self.run_command(["bash", "-c", cmd])if success:self.console.print(f"[green]成功设置用户 {username} 的密码[/green]")else:self.console.print(f"[red]设置密码失败:{output}[/red]")except KeyboardInterrupt:self.console.print("\n[yellow]已取消设置密码[/yellow]")def modify_user(self):"""修改用户信息"""try:username = Prompt.ask("请输入要修改的用户名")# 检查用户是否存在try:user_info = pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")returnself.console.print(Panel(f"当前用户信息:\n"f"用户名:{user_info.pw_name}\n"f"UID:{user_info.pw_uid}\n"f"GID:{user_info.pw_gid}\n"f"主目录:{user_info.pw_dir}\n"f"Shell:{user_info.pw_shell}"))new_home = Prompt.ask("请输入新的主目录", default=user_info.pw_dir)new_shell = Prompt.ask("请输入新的shell", default=user_info.pw_shell)cmd = ["usermod", "-d", new_home, "-s", new_shell, username]success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功修改用户 {username} 的信息[/green]")else:self.console.print(f"[red]修改用户信息失败:{output}[/red]")except KeyboardInterrupt:self.console.print("\n[yellow]已取消修改用户信息[/yellow]")def list_groups(self):"""列出所有用户组"""table = Table(title="系统用户组列表")table.add_column("组名", style="cyan")table.add_column("GID", style="green")table.add_column("成员", style="yellow")for group in grp.getgrall():members = ", ".join(group.gr_mem) if group.gr_mem else "无"table.add_row(group.gr_name,str(group.gr_gid),members)self.console.print(table)def create_group(self):"""创建新用户组"""groupname = Prompt.ask("请输入新组名")# 检查组是否已存在try:grp.getgrnam(groupname)self.console.print(f"[red]错误:组 {groupname} 已存在![/red]")returnexcept KeyError:passcmd = ["groupadd", groupname]success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功创建组 {groupname}[/green]")else:self.console.print(f"[red]创建组失败:{output}[/red]")def delete_group(self):"""删除用户组"""groupname = Prompt.ask("请输入要删除的组名")# 检查组是否存在try:grp.getgrnam(groupname)except KeyError:self.console.print(f"[red]错误:组 {groupname} 不存在![/red]")returncmd = ["groupdel", groupname]success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功删除组 {groupname}[/green]")else:self.console.print(f"[red]删除组失败:{output}[/red]")def add_user_to_group(self):"""将用户添加到组"""username = Prompt.ask("请输入用户名")groupname = Prompt.ask("请输入组名")# 检查用户和组是否存在try:pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")returntry:grp.getgrnam(groupname)except KeyError:self.console.print(f"[red]错误:组 {groupname} 不存在![/red]")returncmd = ["usermod", "-a", "-G", groupname, username]success, output = self.run_command(cmd)if success:self.console.print(f"[green]成功将用户 {username} 添加到组 {groupname}[/green]")else:self.console.print(f"[red]添加用户到组失败:{output}[/red]")def check_sudo_access(self, username: str) -> bool:"""检查用户是否有sudo权限"""try:# 检查sudoers文件cmd = f"grep -E '^{username}|^%.*{username}' {self.sudoers_file}"success, output = self.run_command(["bash", "-c", cmd])if success and output.strip():return True# 检查sudoers.d目录下的文件cmd = f"grep -r -E '^{username}|^%.*{username}' {self.sudoers_dir}"success, output = self.run_command(["bash", "-c", cmd])if success and output.strip():return Truereturn Falseexcept:return Falsedef list_sudo_users(self):"""列出所有具有sudo权限的用户"""table = Table(title="Sudo权限用户列表")table.add_column("用户名", style="cyan")table.add_column("权限来源", style="green")table.add_column("权限详情", style="yellow")# 检查每个用户for user in pwd.getpwall():username = user.pw_nameif self.check_sudo_access(username):# 查找权限来源cmd = f"grep -r -E '^{username}|^%.*{username}' {self.sudoers_file} {self.sudoers_dir}"success, output = self.run_command(["bash", "-c", cmd])if success:for line in output.strip().split('\n'):if line:source, *content = line.split(':', 1)content = content[0] if content else "N/A"source = source.replace(self.sudoers_file, "sudoers")source = source.replace(self.sudoers_dir + '/', "sudoers.d/")table.add_row(username, source, content.strip())self.console.print(table)def grant_sudo_access(self):"""授予用户sudo权限"""username = Prompt.ask("请输入要授予sudo权限的用户名")try:pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")returnif self.check_sudo_access(username):self.console.print(f"[yellow]用户 {username} 已经具有sudo权限[/yellow]")return# 创建新的sudoers文件filename = f"{self.sudoers_dir}/{username}"content = f"{username} ALL=(ALL) ALL"try:# 首先写入临时文件temp_file = f"{filename}.tmp"with open(temp_file, 'w') as f:f.write(content + '\n')# 检查语法success, output = self.run_command(["visudo", "-c", "-f", temp_file])if not success:self.console.print(f"[red]错误:sudoers语法检查失败:{output}[/red]")os.unlink(temp_file)return# 移动到最终位置os.rename(temp_file, filename)os.chmod(filename, 0o440)self.console.print(f"[green]成功授予用户 {username} sudo权限[/green]")except Exception as e:self.console.print(f"[red]授予sudo权限失败:{str(e)}[/red]")def revoke_sudo_access(self):"""撤销用户的sudo权限"""username = Prompt.ask("请输入要撤销sudo权限的用户名")try:pwd.getpwnam(username)except KeyError:self.console.print(f"[red]错误:用户 {username} 不存在![/red]")returnif not self.check_sudo_access(username):self.console.print(f"[yellow]用户 {username} 没有sudo权限[/yellow]")return# 查找并删除sudoers配置filename = f"{self.sudoers_dir}/{username}"if os.path.exists(filename):try:os.unlink(filename)self.console.print(f"[green]成功撤销用户 {username} 的sudo权限[/green]")except Exception as e:self.console.print(f"[red]撤销sudo权限失败:{str(e)}[/red]")else:self.console.print(f"[yellow]警告:需要手动编辑 {self.sudoers_file} 来撤销权限[/yellow]")def main_menu():"""显示主菜单"""manager = UserManager()while True:try:console.clear()console.print(Panel.fit("[bold cyan]Linux用户管理[/bold cyan]\n""1. 列出所有用户\n""2. 创建新用户\n""3. 删除用户\n""4. 修改用户信息\n""5. 设置用户密码\n""6. 列出所有用户组\n""7. 创建新用户组\n""8. 删除用户组\n""9. 将用户添加到组\n""10. 列出所有具有sudo权限的用户\n""11. 授予用户sudo权限\n""12. 撤销用户的sudo权限\n""0. 退出程序",title="主菜单"))choice = Prompt.ask("请选择操作", choices=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"])if choice == "0":console.print("[yellow]感谢使用,再见![/yellow]")breaktry:if choice == "1":manager.list_users()elif choice == "2":manager.create_user()elif choice == "3":manager.delete_user()elif choice == "4":manager.modify_user()elif choice == "5":manager.set_password()elif choice == "6":manager.list_groups()elif choice == "7":manager.create_group()elif choice == "8":manager.delete_group()elif choice == "9":manager.add_user_to_group()elif choice == "10":manager.list_sudo_users()elif choice == "11":manager.grant_sudo_access()elif choice == "12":manager.revoke_sudo_access()Prompt.ask("\n按回车键继续...")except KeyboardInterrupt:console.print("\n[yellow]操作已取消[/yellow]")try:if Prompt.ask("\n是否返回主菜单?", choices=["y", "n"], default="y") == "n":console.print("[yellow]感谢使用,再见![/yellow]")returnexcept KeyboardInterrupt:console.print("\n[yellow]感谢使用,再见![/yellow]")returnexcept KeyboardInterrupt:console.print("\n[yellow]感谢使用,再见![/yellow]")breakif __name__ == "__main__":try:main_menu()except Exception as e:console.print(f"\n[red]发生错误:{str(e)}[/red]")sys.exit(1)
vim requirements.txt
rich==12.6.0
click==8.0.4##安装模块: pip3 install -r requirements.txt
9、 网络检测
1、网络输入输出总量监测
import psutildef get_network_usage():"""获取并打印网络 I/O 模块的使用情况,包括发送和接收的字节数(单位:MB)。"""try:# 获取网络 I/O 统计信息net_io = psutil.net_io_counters()# 计算发送和接收的字节数(转换为 MB)bytes_received = net_io.bytes_recv / (2**20) # 转换为 MBbytes_sent = net_io.bytes_sent / (2**20) # 转换为 MB# 输出结果print(f"总接收字节数: {bytes_received:.2f} MB")print(f"总发送字节数: {bytes_sent:.2f} MB")except Exception as e:# 捕获异常并输出错误信息print(f"获取网络 I/O 使用信息失败: {e}")# 调用函数
get_network_usage()
10、远程控制主机,并执行命令
import paramiko
import sys# 检查命令行参数是否正确
if len(sys.argv) < 5:print("Usage: python ssh.py <hostname> <username> <password> <command1> [command2] [command3] ...")sys.exit(1)# 从命令行参数中提取主机信息和命令
hostname = sys.argv[1] # 主机 IP 地址
username = sys.argv[2] # 登录用户名
password = sys.argv[3] # 登录密码
commands = sys.argv[4:] # 命令列表,从第四个参数开始# 创建 SSH 客户端对象
ssh = paramiko.SSHClient()# 设置自动添加主机密钥策略
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 连接到远程主机
try:ssh.connect(hostname=hostname, # 主机 IP 地址username=username, # 登录用户名password=password # 登录密码)# 依次执行命令并打印输出for idx, command in enumerate(commands):print(f"\n------ 执行命令 '{command}' 的结果 ------")try:stdin, stdout, stderr = ssh.exec_command(command) # 执行命令并获取输出# 打印标准输出print(stdout.read().decode('utf-8'))# 打印标准错误(如果存在)error_output = stderr.read().decode('utf-8')if error_output:print(f"--- 错误输出 ---\n{error_output}")# 显式关闭 stdin, stdout, stderrstdin.close()stdout.close()stderr.close()except Exception as e:print(f"执行命令 '{command}' 时发生错误: {e}")finally:# 确保关闭 SSH 连接ssh.close()
# 执行方法:
python3 ssh.py 192.168.1.100 root 1 'df -h' 'ip a s' 'ls -la'
11、服务监控,(状态,重启,关闭)
1、Nginx服务监控
import subprocessdef run_cmd(cmd):try:output = subprocess.run(["systemctl", "is-active", "nginx"], capture_output=True, text=True, check=True)return output.stdout.strip() == "active"except subprocess.CalledProcessError as e:return Falsedef restart_nginx():try:subprocess.run(["systemctl", "restart", "nginx"], check=True)print("Nginx restarted successfully")return Trueexcept subprocess.CalledProcessError as e:print("Failed to restart Nginx")print(e.stderr)return Falseif __name__ == "__main__":if run_cmd("systemctl active nginx"):print("Nginx is active")else:print("Nginx is not active")if restart_nginx():print("Nginx restarted successfully")else:print("Failed to restart Nginx")
12、CPU 监控
# 导入paramiko模块,用于SSH连接
import paramiko
# 导入time模块,用于处理时间
import time
# 导入smtplib模块,用于发送邮件
import smtplib
# 导入MIMEText类,用于创建邮件正文
from email.mime.text import MIMEText
# 导入MIMEMultipart库,用于创建邮件
from email.mime.multipart import MIMEMultipart
# 导入signal库,用于处理信号
import signal# 定义一些常量
SSH_HOST = "192.168.1.100" # 监控的远程主机IP
SSH_PORT = 22 # SSH端口
SSH_USERNAME = "root" # SSH用户名
SSH_PASSWORD = "password123" # SSH密码
CPU_THRESHOLD = 80 # CPU使用率阈值(%)# 邮件配置
SMTP_SERVER = "smtp.example.com" # SMTP服务器地址
SMTP_PORT = 587 # SMTP端口
SMTP_USERNAME = "sender@example.com" # SMTP用户名
SMTP_PASSWORD = "your_email_password" # SMTP密码
RECIPIENT_EMAIL = "admin@example.com" # 接收邮箱# 信号处理函数,用于优雅退出
def graceful_exit(signal, frame):print("\n接收到退出信号,程序优雅退出。")client.close()exit(0)# 发送邮件通知
def send_email(cpu_usage, restarted_process):message = MIMEText(f"高CPU占用进程已被重启:{restarted_process}\n当前CPU使用率:{cpu_usage}%","plain","utf-8")message["From"] = SMTP_USERNAMEmessage["To"] = RECIPIENT_EMAILmessage["Subject"] = "高CPU使用率警报"try:with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:server.starttls() # 启用TLS加密server.login(SMTP_USERNAME, SMTP_PASSWORD)server.sendmail(SMTP_USERNAME, RECIPIENT_EMAIL, message.as_string())print("已发送邮件通知。")except Exception as e:print(f"发送邮件失败:{e}")# 主功能函数,监控CPU使用率并处理
def monitor_cpu_usage():try:# 创建SSH客户端client = paramiko.SSHClient()client.set_missing_host_key_policy(paramiko.AutoAddPolicy())client.connect(SSH_HOST, port=SSH_PORT, username=SSH_USERNAME, password=SSH_PASSWORD)while True:# 执行命令获取CPU使用率stdin, stdout, stderr = client.exec_command("mpstat 1 1 | grep all | awk '{print $12}'")cpu_usage = float(stdout.read().decode().strip())# 判断CPU使用率是否超过阈值if cpu_usage > CPU_THRESHOLD:print(f"高CPU警报:CPU使用率为 {cpu_usage}%(超过阈值 {CPU_THRESHOLD}%)")# 执行重启操作(这里假设有一个重启命令)stdin, stdout, stderr = client.exec_command("pkill -9 high_cpu_process && sleep 1 && /start_high_cpu_process.sh")if stdout.channel.exit_status == 0:print("进程已重启。")# 发送邮件通知send_email(cpu_usage, "high_cpu_process")else:print("重启失败。")print(stderr.read().decode().strip())else:print(f"当前CPU使用率:{cpu_usage}%")# 每5秒检查一次time.sleep(5)except paramiko.SSHException as e:print(f"SSH连接失败:{e}")except Exception as e:print(f"发生错误:{e}")finally:client.close()# 设置优雅退出的信号处理
signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)# 主程序入口
if __name__ == "__main__":print("监控程序已启动,按 Ctrl+C 优雅退出。")monitor_cpu_usage()
相关文章:

Python 运维脚本
1、备份文件 import os import shutil# 定义配置文件目录和备份目录的路径 config_dir "/root/python/to/config/files/" backup_dir "/root/python/to/backup/"# 遍历配置文件目录中的所有文件 for filename in os.listdir(config_dir):# 如果文件名以…...
MySQL数据库常见面试题之三大范式
写在前面 此文章大部分不会引用最原始的概念,采用说人话的方式。 面试题:三大范式是什么?目的是什么?必须遵循吗? 假设有一张表(学号,姓名,课程,老师) 是…...

大模型项目:普通蓝牙音响接入DeepSeek,解锁语音交互新玩法
本文附带视频讲解 【代码宇宙019】技术方案:蓝牙音响接入DeepSeek,解锁语音交互新玩法_哔哩哔哩_bilibili 目录 效果演示 核心逻辑 技术实现 大模型对话(技术: LangChain4j 接入 DeepSeek) 语音识别(…...
C/C++复习--C语言隐式类型转换
目录 什么是隐式类型转换?整型提升 规则与示例符号位扩展的底层逻辑 算术转换 类型层次与转换规则混合类型运算的陷阱 隐式转换的实际应用与问题 代码示例分析常见错误与避免方法 总结与最佳实践 1. 什么是隐式类型转换? 隐式类型转换是C语言在编译阶段…...
Pandas 时间处理利器:to_datetime() 与 Timestamp() 深度解析
Pandas 时间处理利器:to_datetime() 与 Timestamp() 深度解析 在数据分析和处理中,时间序列数据扮演着至关重要的角色。Pandas 库凭借其强大的时间序列处理能力,成为 Python 数据分析领域的佼佼者。其中,to_datetime() 函数和 Ti…...

单链表设计与实现
01. 单链表简介 在数据结构中,单链表的实现可以分为 带头结点 和 不带头结点 两种方式,这里我们讨论第二种方式。 头结点:链表第一个节点不存实际数据,仅作为辅助节点指向首元节点(第一个数据节点)。头指…...
JDS-算法开发工程师-第9批
单选题 print(fn.__default__) 哪一个不是自适应学习率的优化算法 (选项:Adagrad,RMSprop,Adam,Momentum,动量法在梯度下降的基础上,加入了“惯性”概念,通过累积历史的梯度更新来加速收敛&…...
Git标签删除脚本解析与实践:轻松管理本地与远程标签
Git 标签删除脚本解析与实践:轻松管理本地与远程标签 在 Git 版本控制系统中,标签常用于标记重要的版本节点,方便追溯和管理项目的不同阶段。随着项目的推进,一些旧标签可能不再需要,此时就需要对它们进行清理。本文将通过一个完整的脚本,详细介绍如何删除本地和远程的 …...
Python中,async和with结合使用,有什么好处?
在Python的异步编程中,async和with的结合使用(即async with)为开发者提供了一种优雅且高效的资源管理模式。这种组合不仅简化了异步代码的编写,还显著提升了程序的健壮性和可维护性。以下是其核心优势及典型应用场景的分析&#x…...

springboot生成二维码到海报模板上
springboot生成二维码到海报模板上 QRCodeController package com.ruoyi.web.controller.app;import com.google.zxing.WriterException; import com.ruoyi.app.domain.Opportunity; import com.ruoyi.app.tool.QRCodeGenerator; import com.ruoyi.common.core.page.TableDat…...

SEO长尾关键词布局优化法则
内容概要 在SEO优化体系中,长尾关键词的精准布局是突破流量瓶颈的关键路径。相较于竞争激烈的核心词,长尾词凭借其高转化率和低竞争特性,成为内容矩阵流量裂变的核心驱动力。本节将系统梳理长尾关键词布局的核心逻辑框架,涵盖从需…...

python:trimesh 用于 STL 文件解析和 3D 操作
python:trimesh 是一个用于处理三维模型的库,支持多种格式的导入导出,比如STL、OBJ等,还包含网格操作、几何计算等功能。 Python Trimesh 库使用指南 安装依赖库 pip install trimesh Downloading trimesh-4.6.8-py3-none-any.w…...

应急响应基础模拟靶机-security2
PS:杰克创建的流量包(result.pcap)在root目录下,请根据已有信息进行分析 1、首个攻击者扫描端口使用的工具是? 2、后个攻击者使用的漏洞扫描工具是? 3、攻击者上传webshell的绝对路径及User-agent是什么? 4、攻击者反弹shell的…...
ROS 2 FishBot PID控制电机代码
#include <Arduino.h> #include <Wire.h> #include <MPU6050_light.h> #include <Esp32McpwmMotor.h> #include <Esp32PcntEncoder.h>Esp32McpwmMotor motor; // 创建一个名为motor的对象,用于控制电机 Esp32PcntEncoder enco…...
Bash 字符串语法糖详解
Bash 作为 Linux 和 Unix 系统中最常用的 Shell 之一,其字符串处理能力是脚本开发者的核心技能之一。为了让字符串操作更高效、更直观,Bash 提供了丰富的语法糖(syntactic sugar)。这些语法糖通过简洁的语法形式,隐藏了…...

OpenCV定位地板上的书
任务目标是将下面的图片中的书本找出来: 使用到的技术包括:转灰度图、提取颜色分量、二值化、形态学、轮廓提取等。 我们尝试先把图片转为灰度图,然后二值化,看看效果: 可以看到,二值化后,书的…...
C++ string初始化、string赋值操作、string拼接操作
以下介绍了string的六种定义方式,还有很多,这个只是简单举例 #include<iostream>using namespace std;int main() {//1 无参构造string s1;cout << s1 << endl;//2 初始化构造string s2 ({h, h, l, l, o});cout << s2 <<…...
linux动态占用cpu脚本、根据阈值增加占用或取消占用cpu的脚本、自动检测占用脚本状态、3脚本联合套用。
文章目录 说明流程占用脚本1.0版本使用测试占用脚本2.0版本使用测试测试脚本使用测试检测脚本使用测试脚本说明书启动说明停止说明内存占用cpu内存成品任务测试说明 cpu占用实现的功能整体流程 1、先获取当前实际使用率2、设置一个最低阈值30%,一个最高阈值80%、一个需要增加的…...

NHANES稀有指标推荐:MedHi
文章题目:Association of dietary live microbe intake with frailty in US adults: evidence from NHANES DOI:10.1016/j.jnha.2024.100171 中文标题:美国成人膳食活微生物摄入量与虚弱的相关性:来自 NHANES 的证据 发表杂志&…...
无人机空中物流优化:用 Python 打造高效配送模型
友友们好! 我是Echo_Wish,我的的新专栏《Python进阶》以及《Python!实战!》正式启动啦!这是专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会…...

关于我在实现用户头像更换时遇到的图片上传和保存的问题
目录 前言 前端更换头像 后端处理 文件系统存储图片 数据库存储图片 处理图片文件 生成图片名 保存图片 将图片路径存储到数据库 完整代码 总结 前言 最近在实现一个用户头像更换的功能,但是因为之前并没有处理过图片的上传和保存,所以就开始…...

10.二叉搜索树中第k小的元素(medium)
1.题目链接: 230. 二叉搜索树中第 K 小的元素 - 力扣(LeetCode)230. 二叉搜索树中第 K 小的元素 - 给定一个二叉搜索树的根节点 root ,和一个整数 k ,请你设计一个算法查找其中第 k 小的元素(从 1 开始计数…...

AlimaLinux设置静态IP
通过nmcli命令来操作 步骤 1:确认当前活动的网络接口名称 首先,需要确认当前系统中可用的网络接口名称。可以使用以下命令查看: nmcli device步骤 2:修改配置以匹配正确的接口名称 sudo nmcli connection modify ens160 ipv4.…...

滑动窗口——将x减到0的最小操作数
题目: 这个题如果我们直接去思考方法是很困难的,因为我们不知道下一步是在数组的左还是右操作才能使其最小。正难则反,思考一下,无论是怎么样的,最终这个数组都会分成三个部分左中右,而左右的组合就是我们…...

基于SpringBoot的抽奖系统测试报告
一、编写目的 本报告为抽奖系统测试报告,本项目可用于团体抽奖活动,包括了用户注册,用户登录,修改奖项以及抽奖等功能。 二、项目背景 抽奖系统采用前后端分离的方法来实现,同时使用了数据库来存储相关的数据&…...

服务器mysql连接我碰到的错误
搞了2个下午,总算成功了 我在服务器上使用docker部署了java项目与mysql,但mysql连接一直出现问题 1.首先,我使用的是localhost连接,心想反正都在服务器上吧。 jdbc:mysql://localhost:3306/fly-bird?useSSLfalse&serverTime…...

【Part 2安卓原生360°VR播放器开发实战】第四节|安卓VR播放器性能优化与设备适配
《VR 360全景视频开发》专栏 将带你深入探索从全景视频制作到Unity眼镜端应用开发的全流程技术。专栏内容涵盖安卓原生VR播放器开发、Unity VR视频渲染与手势交互、360全景视频制作与优化,以及高分辨率视频性能优化等实战技巧。 📝 希望通过这个专栏&am…...
TIME - MoE 模型代码 4——Time-MoE-main/run_eval.py
源码:https://github.com/Time-MoE/Time-MoE 这段代码是一个用于评估 Time-MoE 模型性能的脚本,它支持分布式环境下的模型评估,通过计算 MSE 和 MAE 等指标来衡量模型在时间序列预测任务上的表现。代码的核心功能包括:模型加载、…...
数字孪生概念
数字孪生(Digital Twin) 是指通过数字技术对物理实体(如设备、系统、流程或环境)进行高保真建模和实时动态映射,实现虚实交互、仿真预测和优化决策的技术体系。它是工业4.0、智慧城市和数字化转型的核心技术之一。 1. …...

从知识图谱到精准决策:基于MCP的招投标货物比对溯源系统实践
前言 从最初对人工智能的懵懂认知,到逐渐踏入Prompt工程的世界,我们一路探索,从私有化部署的实际场景,到对DeepSeek技术的全面解读,再逐步深入到NL2SQL、知识图谱构建、RAG知识库设计,以及ChatBI这些高阶应…...