使用 Python 实现 SSH 登录日志的实时监控与异常检测

使用 Python 编写一个脚本来实时监控 SSH 登录日志,并实现基于时间戳的异常检测机制。此外,还展示如何配置 Supervisor 以确保脚本稳定运行。

技术栈

  • Python 3
  • Regular Expressions (re)
  • Requests
  • Supervisor
  • 部署与配置
  • subprocess
  • json

必要的库

[root@Dev-RockyLinux9-Area1-Shanghai ~]# pip install requests

创建脚本文件: 创建一个名为 monitor_ssh_logins.py 的 Python 脚本

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# 适用于监测 RockyLinux 中 SSH 登录事件
#
# @author: Richard <richard@ponfey.com>
# @date: 2024-08-14
#
# 导入必要的库
import os
import re
import requests
import time
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 日志记录脚本启动
logger.info("Starting monitor_ssh_logins script.")

def send_to_feishu(message):
    """
    发送消息到飞书 webhook
    """
    # 配置 webhook URL
    url = "xxx"
    headers = {"Content-Type": "application/json"}
    payload = {
        "msg_type": "text",
        "content": {
            "text": message
        }
    }
    # 发送 POST 请求
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code == 200:
        logger.info("消息发送成功")
    else:
        logger.error(f"消息发送失败,状态码:{response.status_code}")

def get_server_name():
    """
    获取服务器名称
    """
    return os.uname().nodename

def get_last_processed_timestamp(file_path):
    """
    从文件中读取最后一次处理的记录的时间戳
    """
    try:
        with open(file_path, 'r') as file:
            return file.read().strip()
    except FileNotFoundError:
        return None

def set_last_processed_timestamp(file_path, timestamp):
    """
    将时间戳写入文件
    """
    with open(file_path, 'w') as file:
        file.write(timestamp)

def parse_secure_log(file_path, last_timestamp):
    """
    解析 /var/log/secure 文件中的 SSH 登录信息
    """
    server_name = get_server_name()
    # 匹配 SSH 登录成功的模式
    pattern = r'(\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s+(\S+)\ssshd\[\d+\]:\s+Accepted\s+(publickey|password)\s+for\s+(\S+)\s+from\s+(\S+)\s+port\s+\d+\s+ssh\d+:'
    with open(file_path, 'r') as file:
        lines = file.readlines()
        # 只处理最新的记录
        for line in reversed(lines):
            match = re.match(pattern, line)
            if match:
                timestamp, host, method, user, ip = match.groups()
                if last_timestamp is None or timestamp > last_timestamp:
                    message = f"【SSH.Service 事件】时间:{timestamp} (源)IP地址 :{ip} 认证方式:{method} 用户: {user} 登录 {server_name} 成功"
                    return timestamp, message
                break
    return None, None

def main():
    """
    主函数
    """
    # 设置日志文件和时间戳文件路径
    log_file_path = "/var/log/secure"
    timestamp_file_path = "/var/tmp/last_ssh_login_timestamp.txt"

    # 检查日志文件是否存在
    if not os.path.exists(log_file_path):
        logger.error(f"文件 {log_file_path} 不存在,请检查路径是否正确。")
        return

    while True:
        # 读取上次处理的时间戳
        last_timestamp = get_last_processed_timestamp(timestamp_file_path)
        # 解析 SSH 日志
        new_timestamp, message = parse_secure_log(log_file_path, last_timestamp)
        if new_timestamp is not None:
            # 更新时间戳文件
            set_last_processed_timestamp(timestamp_file_path, new_timestamp)

            if message:
                # 发送消息到飞书
                send_to_feishu(message)

        # 每隔一段时间检查一次
        time.sleep(1)  # 每 1 秒钟检查一次

if __name__ == "__main__":
    main()
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# 适用于监测 Debian 中 SSH 登录事件
#
# @author: Richard <richard@ponfey.com>
# @date: 2024-08-14
#
# 导入必要的库
import subprocess
import requests
import json

def send_notification(log_lines):
    """发送飞书通知"""
    webhook_url = "XXX"

    # 合并多行日志为一个字符串
    log_content = "\n".join(log_lines)

    message = {
        "msg_type": "text",
        "content": {
            "text": log_content
        }
    }

    headers = {'Content-Type': 'application/json; charset=UTF-8'}
    response = requests.post(webhook_url, data=json.dumps(message), headers=headers)

    if response.status_code == 200:
        print("消息发送成功")
    else:
        print(f"消息发送失败: {response.status_code}")

def filter_ssh_login_lines(lines):
    """过滤出与 SSH 登录相关的行"""
    ssh_login_lines = [line for line in lines if "Accepted publickey for" in line]
    return ssh_login_lines

if __name__ == "__main__":
    # 设置一个标志来标记是否已经设置了监视器
    setup_watches = False

    while True:
        # 只在首次运行时设置监视器
        if not setup_watches:
            subprocess.call(["inotifywait", "-r", "-q", "-e", "modify", "/var/log/auth.log"])
            setup_watches = True

        # 检查日志文件是否有新的更改
        output = subprocess.check_output(["inotifywait", "-q", "-e", "modify", "/var/log/auth.log"])

        # 读取最新的日志行
        with open("/var/log/auth.log", "r") as log_file:
            lines = log_file.readlines()
            # 获取最后1行
            last_three_lines = lines[-1:] if len(lines) >= 1 else lines

            # 过滤出 SSH 登录相关的行
            ssh_login_lines = filter_ssh_login_lines(last_three_lines)

            # 如果有 SSH 登录相关的行,则发送通知
            if ssh_login_lines:
                send_notification(ssh_login_lines)

脚本通过 inotifywait 实现了对 /var/log/auth.log 文件的实时监控,并且在检测到 SSH 登录事件时发送通知。通过添加异常检测逻辑,可以进一步提高安全性。这样的脚本非常适合部署在生产环境中,以确保系统的安全性和稳定性。

代码解析

send_notification 函数接收一系列日志行作为参数;使用 requests.post 方法向飞书 webhook 发送 POST 请求,果请求成功,打印成功消息;否则,打印错误消息。

过滤 SSH 登录相关行

def filter_ssh_login_lines(lines):
    """过滤出与 SSH 登录相关的行"""
    ssh_login_lines = [line for line in lines if "Accepted publickey for" in line]
    return ssh_login_lines

filter_ssh_login_lines 函数接收一系列日志行,并返回其中包含 "Accepted publickey for" 的行,这个函数用于筛选出 SSH 登录成功的日志行。

if __name__ == "__main__":
    # 设置一个标志来标记是否已经设置了监视器
    setup_watches = False

    while True:
        # 只在首次运行时设置监视器
        if not setup_watches:
            subprocess.call(["inotifywait", "-r", "-q", "-e", "modify", "/var/log/auth.log"])
            setup_watches = True

        # 检查日志文件是否有新的更改
        output = subprocess.check_output(["inotifywait", "-q", "-e", "modify", "/var/log/auth.log"])

        # 读取最新的日志行
        with open("/var/log/auth.log", "r") as log_file:
            lines = log_file.readlines()
            # 获取最后1行
            last_three_lines = lines[-1:] if len(lines) >= 1 else lines

            # 过滤出 SSH 登录相关的行
            ssh_login_lines = filter_ssh_login_lines(last_three_lines)

            # 如果有 SSH 登录相关的行,则发送通知
            if ssh_login_lines:
                send_notification(ssh_login_lines)
  • setup_watches 标记是否已经设置了监视器
  • 使用 subprocess.call 来调用 inotifywait 命令,设置对 /var/log/auth.log 文件的修改监听
  • 当文件被修改时,使用 subprocess.check_output 获取输出
  • 读取文件的最新一行,并使用 filter_ssh_login_lines 过滤出 SSH 登录相关的行
  • 如果有 SSH 登录相关的行,则调用 send_notification 发送通知

配置 Supervisor

创建一个名为 monitor_ssh_logins.ini 的配置文件来管理脚本的运行

[program:monitor_ssh_logins]
command=/usr/bin/python3 /data/python/monitor_ssh_logins.py
directory=/data/python/
autostart=true
autorestart=true
stdout_logfile=/var/log/monitor_ssh_logins.py.log
stderr_logfile=/var/log/monitor_ssh_logins.py.err

重启 Supervisor 并启动脚本

[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl reread
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl update
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl start monitor_ssh_logins

检查状态

[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl status monitor_ssh_logins

异常检测

为了增加异常检测功能,还可以进一步扩展脚本,例如通过计算登录频率来识别异常登录行为。
下面是一个简单的异常检测逻辑示例:

def detect_abnormal_logins(log_file_path, threshold=3):
    """
    检测异常登录行为
    """
    login_counts = {}
    with open(log_file_path, 'r') as file:
        for line in file:
            match = re.match(r'\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2}\s+(\S+)\ssshd\[\d+\]:\s+Accepted\s+(publickey|password)\s+for\s+(\S+)\s+from\s+(\S+)\s+port\s+\d+\s+ssh\d+:', line)
            if match:
                ip = match.group(4)
                if ip in login_counts:
                    login_counts[ip] += 1
                else:
                    login_counts[ip] = 1

    # 检查是否有 IP 地址超过阈值
    for ip, count in login_counts.items():
        if count > threshold:
            message = f"【异常登录】IP 地址 {ip} 在短时间内尝试登录次数过多:{count} 次"
            send_to_feishu(message)

def main():
    """
    主函数
    """
    # ... 其他代码 ...

    while True:
        # 读取上次处理的时间戳
        last_timestamp = get_last_processed_timestamp(timestamp_file_path)
        # 解析 SSH 日志
        new_timestamp, message = parse_secure_log(log_file_path, last_timestamp)
        if new_timestamp is not None:
            # 更新时间戳文件
            set_last_processed_timestamp(timestamp_file_path, new_timestamp)

            if message:
                # 发送消息到飞书
                send_to_feishu(message)

            # 检测异常登录行为
            detect_abnormal_logins(log_file_path)

        # 每隔一段时间检查一次
        time.sleep(1)  # 每 1 秒钟检查一次

    # ... 其他代码 ...

结论

通过以上步骤,可以有效地监控 SSH 登录活动,并通过异常检测机制识别潜在的安全威胁。这种方法有助于提高系统的安全性并及时发现异常行为。

附录

命令解释:
pip install requests:安装 requests 库用于发送 HTTP 请求
supervisorctl reread:让 Supervisor 重新读取配置文件
supervisorctl update:更新 Supervisor 的配置
supervisorctl start monitor_ssh_logins:启动 monitor_ssh_logins 服务
supervisorctl status monitor_ssh_logins:查看 monitor_ssh_logins 服务的状态

参考资料

Supervisor 官方文档
Requests 库文档
飞书 webhook 文档

上一篇
下一篇