使用 Python 编写一个脚本来实时监控 SSH 登录日志,并实现基于时间戳的异常检测机制。此外,还展示如何配置 Supervisor 以确保脚本稳定运行。
技术栈
- Python 3
- Regular Expressions (re)
- Requests
- Supervisor
- 部署与配置
- subprocess
- json
必要的库
[root@Dev-RockyLinux9-Area1-Shanghai ~]# pip install requests
创建脚本文件: 创建一个名为 monitor_ssh_logins.py 的 Python 脚本
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# 适用于监测 RockyLinux 中 SSH 登录事件
#
# @author: Richard <richard@ponfey.com>
# @date: 2024-08-14
#
# 导入必要的库
import os
import re
import requests
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 日志记录脚本启动
logger.info("Starting monitor_ssh_logins script.")
def send_to_feishu(message):
"""
发送消息到飞书 webhook
"""
# 配置 webhook URL
url = "xxx"
headers = {"Content-Type": "application/json"}
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
# 发送 POST 请求
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
logger.info("消息发送成功")
else:
logger.error(f"消息发送失败,状态码:{response.status_code}")
def get_server_name():
"""
获取服务器名称
"""
return os.uname().nodename
def get_last_processed_timestamp(file_path):
"""
从文件中读取最后一次处理的记录的时间戳
"""
try:
with open(file_path, 'r') as file:
return file.read().strip()
except FileNotFoundError:
return None
def set_last_processed_timestamp(file_path, timestamp):
"""
将时间戳写入文件
"""
with open(file_path, 'w') as file:
file.write(timestamp)
def parse_secure_log(file_path, last_timestamp):
"""
解析 /var/log/secure 文件中的 SSH 登录信息
"""
server_name = get_server_name()
# 匹配 SSH 登录成功的模式
pattern = r'(\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s+(\S+)\ssshd\[\d+\]:\s+Accepted\s+(publickey|password)\s+for\s+(\S+)\s+from\s+(\S+)\s+port\s+\d+\s+ssh\d+:'
with open(file_path, 'r') as file:
lines = file.readlines()
# 只处理最新的记录
for line in reversed(lines):
match = re.match(pattern, line)
if match:
timestamp, host, method, user, ip = match.groups()
if last_timestamp is None or timestamp > last_timestamp:
message = f"【SSH.Service 事件】时间:{timestamp} (源)IP地址 :{ip} 认证方式:{method} 用户: {user} 登录 {server_name} 成功"
return timestamp, message
break
return None, None
def main():
"""
主函数
"""
# 设置日志文件和时间戳文件路径
log_file_path = "/var/log/secure"
timestamp_file_path = "/var/tmp/last_ssh_login_timestamp.txt"
# 检查日志文件是否存在
if not os.path.exists(log_file_path):
logger.error(f"文件 {log_file_path} 不存在,请检查路径是否正确。")
return
while True:
# 读取上次处理的时间戳
last_timestamp = get_last_processed_timestamp(timestamp_file_path)
# 解析 SSH 日志
new_timestamp, message = parse_secure_log(log_file_path, last_timestamp)
if new_timestamp is not None:
# 更新时间戳文件
set_last_processed_timestamp(timestamp_file_path, new_timestamp)
if message:
# 发送消息到飞书
send_to_feishu(message)
# 每隔一段时间检查一次
time.sleep(1) # 每 1 秒钟检查一次
if __name__ == "__main__":
main()
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# 适用于监测 Debian 中 SSH 登录事件
#
# @author: Richard <richard@ponfey.com>
# @date: 2024-08-14
#
# 导入必要的库
import subprocess
import requests
import json
def send_notification(log_lines):
"""发送飞书通知"""
webhook_url = "XXX"
# 合并多行日志为一个字符串
log_content = "\n".join(log_lines)
message = {
"msg_type": "text",
"content": {
"text": log_content
}
}
headers = {'Content-Type': 'application/json; charset=UTF-8'}
response = requests.post(webhook_url, data=json.dumps(message), headers=headers)
if response.status_code == 200:
print("消息发送成功")
else:
print(f"消息发送失败: {response.status_code}")
def filter_ssh_login_lines(lines):
"""过滤出与 SSH 登录相关的行"""
ssh_login_lines = [line for line in lines if "Accepted publickey for" in line]
return ssh_login_lines
if __name__ == "__main__":
# 设置一个标志来标记是否已经设置了监视器
setup_watches = False
while True:
# 只在首次运行时设置监视器
if not setup_watches:
subprocess.call(["inotifywait", "-r", "-q", "-e", "modify", "/var/log/auth.log"])
setup_watches = True
# 检查日志文件是否有新的更改
output = subprocess.check_output(["inotifywait", "-q", "-e", "modify", "/var/log/auth.log"])
# 读取最新的日志行
with open("/var/log/auth.log", "r") as log_file:
lines = log_file.readlines()
# 获取最后1行
last_three_lines = lines[-1:] if len(lines) >= 1 else lines
# 过滤出 SSH 登录相关的行
ssh_login_lines = filter_ssh_login_lines(last_three_lines)
# 如果有 SSH 登录相关的行,则发送通知
if ssh_login_lines:
send_notification(ssh_login_lines)
脚本通过 inotifywait 实现了对 /var/log/auth.log 文件的实时监控,并且在检测到 SSH 登录事件时发送通知。通过添加异常检测逻辑,可以进一步提高安全性。这样的脚本非常适合部署在生产环境中,以确保系统的安全性和稳定性。
代码解析
send_notification 函数接收一系列日志行作为参数;使用 requests.post 方法向飞书 webhook 发送 POST 请求,果请求成功,打印成功消息;否则,打印错误消息。
过滤 SSH 登录相关行
def filter_ssh_login_lines(lines):
"""过滤出与 SSH 登录相关的行"""
ssh_login_lines = [line for line in lines if "Accepted publickey for" in line]
return ssh_login_lines
filter_ssh_login_lines 函数接收一系列日志行,并返回其中包含 "Accepted publickey for" 的行,这个函数用于筛选出 SSH 登录成功的日志行。
if __name__ == "__main__":
# 设置一个标志来标记是否已经设置了监视器
setup_watches = False
while True:
# 只在首次运行时设置监视器
if not setup_watches:
subprocess.call(["inotifywait", "-r", "-q", "-e", "modify", "/var/log/auth.log"])
setup_watches = True
# 检查日志文件是否有新的更改
output = subprocess.check_output(["inotifywait", "-q", "-e", "modify", "/var/log/auth.log"])
# 读取最新的日志行
with open("/var/log/auth.log", "r") as log_file:
lines = log_file.readlines()
# 获取最后1行
last_three_lines = lines[-1:] if len(lines) >= 1 else lines
# 过滤出 SSH 登录相关的行
ssh_login_lines = filter_ssh_login_lines(last_three_lines)
# 如果有 SSH 登录相关的行,则发送通知
if ssh_login_lines:
send_notification(ssh_login_lines)
- setup_watches 标记是否已经设置了监视器
- 使用 subprocess.call 来调用 inotifywait 命令,设置对 /var/log/auth.log 文件的修改监听
- 当文件被修改时,使用 subprocess.check_output 获取输出
- 读取文件的最新一行,并使用 filter_ssh_login_lines 过滤出 SSH 登录相关的行
- 如果有 SSH 登录相关的行,则调用 send_notification 发送通知
配置 Supervisor
创建一个名为 monitor_ssh_logins.ini 的配置文件来管理脚本的运行
[program:monitor_ssh_logins]
command=/usr/bin/python3 /data/python/monitor_ssh_logins.py
directory=/data/python/
autostart=true
autorestart=true
stdout_logfile=/var/log/monitor_ssh_logins.py.log
stderr_logfile=/var/log/monitor_ssh_logins.py.err
重启 Supervisor 并启动脚本
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl reread
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl update
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl start monitor_ssh_logins
检查状态
[root@Dev-RockyLinux9-Area1-Shanghai ~]# supervisorctl status monitor_ssh_logins
异常检测
为了增加异常检测功能,还可以进一步扩展脚本,例如通过计算登录频率来识别异常登录行为。
下面是一个简单的异常检测逻辑示例:
def detect_abnormal_logins(log_file_path, threshold=3):
"""
检测异常登录行为
"""
login_counts = {}
with open(log_file_path, 'r') as file:
for line in file:
match = re.match(r'\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2}\s+(\S+)\ssshd\[\d+\]:\s+Accepted\s+(publickey|password)\s+for\s+(\S+)\s+from\s+(\S+)\s+port\s+\d+\s+ssh\d+:', line)
if match:
ip = match.group(4)
if ip in login_counts:
login_counts[ip] += 1
else:
login_counts[ip] = 1
# 检查是否有 IP 地址超过阈值
for ip, count in login_counts.items():
if count > threshold:
message = f"【异常登录】IP 地址 {ip} 在短时间内尝试登录次数过多:{count} 次"
send_to_feishu(message)
def main():
"""
主函数
"""
# ... 其他代码 ...
while True:
# 读取上次处理的时间戳
last_timestamp = get_last_processed_timestamp(timestamp_file_path)
# 解析 SSH 日志
new_timestamp, message = parse_secure_log(log_file_path, last_timestamp)
if new_timestamp is not None:
# 更新时间戳文件
set_last_processed_timestamp(timestamp_file_path, new_timestamp)
if message:
# 发送消息到飞书
send_to_feishu(message)
# 检测异常登录行为
detect_abnormal_logins(log_file_path)
# 每隔一段时间检查一次
time.sleep(1) # 每 1 秒钟检查一次
# ... 其他代码 ...
结论
通过以上步骤,可以有效地监控 SSH 登录活动,并通过异常检测机制识别潜在的安全威胁。这种方法有助于提高系统的安全性并及时发现异常行为。
附录
命令解释:
pip install requests:安装 requests 库用于发送 HTTP 请求
supervisorctl reread:让 Supervisor 重新读取配置文件
supervisorctl update:更新 Supervisor 的配置
supervisorctl start monitor_ssh_logins:启动 monitor_ssh_logins 服务
supervisorctl status monitor_ssh_logins:查看 monitor_ssh_logins 服务的状态