Как да създадем интелигентен системен мониторинг с автоматично възстановяване
Автоматизацията е сърцето на DevOps културата. Докато съществуват мощни инструменти като Kubernetes, Docker и различни CI/CD платформи, скриптовете остават основополагащ елемент за ефективно управление на инфраструктурата. В тази статия ще разгледаме как да създадем професионален скрипт за системен мониторинг с интелигентно самовъзстановяване.
Защо скриптовете са незаменими в DevOps
В съвременната IT среда, където системите стават все по-сложни, автоматизацията чрез скриптове предлага няколко критични предимства:
Бързина на реакция: Докато човек може да забележи проблем и да реагира в рамките на минути, добре написан скрипт може да го направи в секунди.
Консистентност: Скриптовете елиминират човешката грешка и гарантират, че всяка процедура се изпълнява точно по същия начин всеки път.
Мащабируемост: Един скрипт може да управлява стотици сървъри едновременно, докато ръчното управление би било практически невъзможно.
Документация: Добре написан скрипт служи като живa документация за процесите в организацията.
Практически пример: Интелигентен системен мониторинг скрипт
Ще създадем Python скрипт, който не просто наблюдава системните ресурси, но и предприема автоматични действия за възстановяване при проблеми. Този скрипт комбинира мониторинг на системни ресурси, управление на услуги и интелигентно вземане на решения.
#!/usr/bin/env python3
"""
Интелигентен Системен Мониторинг и Автоматично Възстановяване
Автор: DevOps Bulgaria Team
Версия: 2.0
"""
import psutil
import subprocess
import logging
import smtplib
import json
import time
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, List, Optional
import argparse
import sys
class SystemMonitor:
def __init__(self, config_file: str = "monitor_config.json"):
"""Инициализация на системния монитор"""
self.config = self.load_config(config_file)
self.setup_logging()
self.alerts_sent = {}
def load_config(self, config_file: str) -> Dict:
"""Зареждане на конфигурационния файл"""
default_config = {
"thresholds": {
"cpu_percent": 85.0,
"memory_percent": 90.0,
"disk_percent": 85.0,
"load_average": 8.0
},
"services": [
"nginx", "apache2", "mysql", "postgresql",
"docker", "redis-server"
],
"email": {
"enabled": True,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "[email protected]",
"password": "your-app-password",
"to_addresses": ["[email protected]"]
},
"monitoring": {
"interval": 60,
"recovery_attempts": 3,
"cooldown_period": 300
},
"actions": {
"high_cpu": ["restart_heavy_services", "clear_cache"],
"high_memory": ["restart_memory_intensive_services", "clear_buffers"],
"service_down": ["restart_service", "check_dependencies"]
}
}
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Комбинираме default конфигурацията с потребителската
default_config.update(user_config)
return default_config
except FileNotFoundError:
logging.warning(f"Конфигурационен файл {config_file} не е намерен. Използват се стандартните настройки.")
# Създаваме примерен конфигурационен файл
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=4, ensure_ascii=False)
return default_config
def setup_logging(self):
"""Настройка на логирането"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('system_monitor.log', encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
def get_system_metrics(self) -> Dict:
"""Събиране на системни метрики"""
try:
# CPU информация
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
# Памет
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Дискове
disk_usage = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_usage[partition.mountpoint] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': (usage.used / usage.total) * 100
}
except PermissionError:
continue
# Мрежа
network = psutil.net_io_counters()
return {
'timestamp': datetime.now().isoformat(),
'cpu': {
'percent': cpu_percent,
'count': cpu_count,
'load_avg': load_avg
},
'memory': {
'total': memory.total,
'used': memory.used,
'percent': memory.percent,
'available': memory.available
},
'swap': {
'total': swap.total,
'used': swap.used,
'percent': swap.percent
},
'disk': disk_usage,
'network': {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
}
}
except Exception as e:
logging.error(f"Грешка при събиране на системни метрики: {e}")
return {}
def check_service_status(self, service_name: str) -> bool:
"""Проверка на статуса на услуга"""
try:
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except subprocess.TimeoutExpired:
logging.warning(f"Timeout при проверка на услугата {service_name}")
return False
except Exception as e:
logging.error(f"Грешка при проверка на услугата {service_name}: {e}")
return False
def restart_service(self, service_name: str) -> bool:
"""Рестартиране на услуга"""
try:
logging.info(f"Опит за рестартиране на услугата {service_name}")
result = subprocess.run(
['sudo', 'systemctl', 'restart', service_name],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logging.info(f"Услугата {service_name} е успешно рестартирана")
return True
else:
logging.error(f"Неуспешно рестартиране на {service_name}: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logging.error(f"Timeout при рестартиране на услугата {service_name}")
return False
except Exception as e:
logging.error(f"Грешка при рестартиране на услугата {service_name}: {e}")
return False
def clear_system_cache(self):
"""Изчистване на системния кеш"""
try:
logging.info("Изчистване на системния кеш")
subprocess.run(['sudo', 'sync'], timeout=10)
subprocess.run(['sudo', 'sysctl', '-w', 'vm.drop_caches=3'], timeout=10)
logging.info("Системният кеш е изчистен успешно")
except Exception as e:
logging.error(f"Грешка при изчистване на кеша: {e}")
def send_alert(self, subject: str, message: str, alert_type: str = "warning"):
"""Изпращане на имейл известие"""
if not self.config['email']['enabled']:
return
# Проверка за cooldown период
current_time = time.time()
if alert_type in self.alerts_sent:
if current_time - self.alerts_sent[alert_type] < self.config['monitoring']['cooldown_period']:
return
try:
msg = MIMEMultipart()
msg['From'] = self.config['email']['username']
msg['To'] = ', '.join(self.config['email']['to_addresses'])
msg['Subject'] = f"[System Alert] {subject}"
body = f"""
Системно известие от {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{message}
---
Автоматично генерирано от System Monitor
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(self.config['email']['smtp_server'],
self.config['email']['smtp_port'])
server.starttls()
server.login(self.config['email']['username'],
self.config['email']['password'])
text = msg.as_string()
server.sendmail(self.config['email']['username'],
self.config['email']['to_addresses'], text)
server.quit()
logging.info(f"Изпратено известие: {subject}")
self.alerts_sent[alert_type] = current_time
except Exception as e:
logging.error(f"Грешка при изпращане на известие: {e}")
def analyze_and_act(self, metrics: Dict):
"""Анализ на метриките и предприемане на действия"""
issues = []
actions_taken = []
# Проверка на CPU
if metrics['cpu']['percent'] > self.config['thresholds']['cpu_percent']:
issues.append(f"Високо CPU натоварване: {metrics['cpu']['percent']:.2f}%")
self.clear_system_cache()
actions_taken.append("Изчистен системен кеш")
# Проверка на паметта
if metrics['memory']['percent'] > self.config['thresholds']['memory_percent']:
issues.append(f"Висока употреба на памет: {metrics['memory']['percent']:.2f}%")
self.clear_system_cache()
actions_taken.append("Изчистен системен кеш за освобождаване на памет")
# Проверка на дисковете
for mountpoint, disk_info in metrics['disk'].items():
if disk_info['percent'] > self.config['thresholds']['disk_percent']:
issues.append(f"Висока употреба на диск {mountpoint}: {disk_info['percent']:.2f}%")
# Проверка на услугите
for service in self.config['services']:
if not self.check_service_status(service):
issues.append(f"Услугата {service} не работи")
if self.restart_service(service):
actions_taken.append(f"Рестартирана услуга: {service}")
else:
actions_taken.append(f"Неуспешен опит за рестартиране на: {service}")
# Изпращане на известия при необходимост
if issues:
alert_message = "Открити проблеми:\n" + "\n".join(issues)
if actions_taken:
alert_message += "\n\nПредприети действия:\n" + "\n".join(actions_taken)
self.send_alert("Системни проблеми", alert_message)
return issues, actions_taken
def generate_report(self, metrics: Dict, issues: List, actions: List) -> str:
"""Генериране на детайлен доклад"""
report = f"""
=== СИСТЕМЕН ДОКЛАД ===
Време: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
CPU:
Натоварване: {metrics['cpu']['percent']:.2f}%
Ядра: {metrics['cpu']['count']}
Load Average: {metrics['cpu']['load_avg']:.2f}
ПАМЕТ:
Общо: {metrics['memory']['total'] / (1024**3):.2f} GB
Използвано: {metrics['memory']['percent']:.2f}%
Свободно: {metrics['memory']['available'] / (1024**3):.2f} GB
ДИСКОВЕ:"""
for mountpoint, disk_info in metrics['disk'].items():
report += f"""
{mountpoint}: {disk_info['percent']:.2f}% използвано"""
if issues:
report += "\n\nОТКРИТИ ПРОБЛЕМИ:\n" + "\n".join(f"- {issue}" for issue in issues)
if actions:
report += "\n\nПРЕДПРИЕТИ ДЕЙСТВИЯ:\n" + "\n".join(f"- {action}" for action in actions)
return report
def run_monitoring_cycle(self):
"""Изпълнение на един цикъл мониторинг"""
logging.info("Започване на цикъл мониторинг...")
# Събиране на метрики
metrics = self.get_system_metrics()
if not metrics:
logging.error("Не можаха да се съберат системни метрики")
return
# Анализ и действия
issues, actions = self.analyze_and_act(metrics)
# Генериране на доклад
report = self.generate_report(metrics, issues, actions)
logging.info(report)
return len(issues) == 0 # Връща True ако няма проблеми
def run_continuous(self):
"""Непрекъснат мониторинг"""
logging.info("Започване на непрекъснат мониторинг...")
try:
while True:
self.run_monitoring_cycle()
time.sleep(self.config['monitoring']['interval'])
except KeyboardInterrupt:
logging.info("Спиране на мониторинга по заявка на потребителя")
except Exception as e:
logging.error(f"Неочаквана грешка в мониторинга: {e}")
self.send_alert("Критична грешка в мониторинга", str(e), "critical")
def main():
"""Основна функция"""
parser = argparse.ArgumentParser(description='Интелигентен системен монитор')
parser.add_argument('--config', default='monitor_config.json',
help='Път към конфигурационния файл')
parser.add_argument('--once', action='store_true',
help='Изпълнение на един цикъл вместо непрекъснат мониторинг')
parser.add_argument('--verbose', '-v', action='store_true',
help='Подробно логиране')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
monitor = SystemMonitor(args.config)
if args.once:
success = monitor.run_monitoring_cycle()
sys.exit(0 if success else 1)
else:
monitor.run_continuous()
if __name__ == "__main__":
main()
Конфигурационен файл (monitor_config.json)
{
"thresholds": {
"cpu_percent": 85.0,
"memory_percent": 90.0,
"disk_percent": 85.0,
"load_average": 8.0
},
"services": [
"nginx",
"mysql",
"docker",
"redis-server"
],
"email": {
"enabled": true,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "[email protected]",
"password": "your-app-password",
"to_addresses": ["[email protected]"]
},
"monitoring": {
"interval": 60,
"recovery_attempts": 3,
"cooldown_period": 300
}
}
Systemd Service за автоматично стартиране
[Unit]
Description=Интелигентен Системен Монитор
After=network.target
[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /opt/system-monitor/system_monitor.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Ключови характеристики на скрипта
Интелигентно вземане на решения: Скриптът не само наблюдава, но и предприема конкретни действия за решаване на проблемите.
Гъвкава конфигурация: Всички настройки се управляват чрез JSON файл, което позволява лесна персонализация без промяна на кода.
Comprehensive мониторинг: Следи CPU, памет, дискове, мрежа и състоянието на критични услуги.
Автоматично възстановяване: При открити проблеми автоматично се опитва да ги реши чрез рестартиране на услуги или изчистване на кеш.
Известяване: Изпраща детайлни имейл известия при критични ситуации с cooldown механизъм за избягване на спам.
Логиране: Подробно логиране на всички дейности за последващ анализ.
Внедряване и използване
За да внедрите скрипта в производствена среда:
- Инсталирайте зависимостите:
pip3 install psutil
- Настройте конфигурацията според вашите нужди
- Тествайте скрипта първо в тестова среда:
python3 system_monitor.py --once --verbose
- Създайте systemd service за автоматично стартиране
- Мониторирайте логовете за оптимизация на настройките
Заключение
Автоматизацията чрез скриптове не е просто технически инструмент – тя е философия на проактивно управление на инфраструктурата. Представеният скрипт демонстрира как с относително малко код можем да създадем мощна система за мониторинг и автоматично възстановяване, която работи 24/7 и осигурява стабилност на нашите услуги.
В динамичната DevOps среда, където системите стават все по-сложни, такива скриптове са критично важни за поддържане на високо ниво на услуга и минимизиране на човешката намеса при рутинни операции. Инвестицията във време за създаване на качествени автоматизационни скриптове се връща многократно под формата на по-стабилни системи и по-малко нощни повиквания.
Помнете: добрият DevOps инженер не само автоматизира процесите, но създава интелигентни системи, които могат сами да вземат решения и да се адаптират към променящите се условия.