梦回净土抵万侵!!!
在实力面前,不相信任何运维工具,只相信自己手搓的脚本。
在放工具之前来点小插曲。
之前看公众号有师傅反制了macos,遂自查了一下,不查不知道,一查发现两个进程有网络连接。
PowerChime进程
该进程为MAC的充电提示音进程,一个充电提示音进程有外联,而且还是本地ipv6。通过充电频率来统计数据,不排除还传其他东西。
fe80开头地址为本地地址,但是不排除本地代理流量再传递的可能
进程名称: PowerChime
用户名: mac
网络连接:
本地地址: ipv6:62300
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:52115
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:63501
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:51850
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:60860
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:55092
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:54878
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:54829
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:60721
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
本地地址: ipv6:64410
远程地址: fe80:8::aede:48ff:fe33:4455:49178
状态: ESTABLISHED
corespeechd进程
该进程为语音识别进程,我没打开语音识别辅助,他已经悄悄在后台给我启动了。
进程名称: corespeechd
用户名: mac
网络连接:
本地地址: ipv6:49742
远程地址: fe80:8::aede:48ff:fe33:4455:49162
状态: ESTABLISHED
禁用ipv6命令:
networksetup -setv6off Wi-Fi
networksetup -setv6off Ethernet
PowerChime:
defaults write com.apple.PowerChime ChimeOnNoHardware -bool true
sudo killall PowerChime
corespeechd(很顽固,kill半天kill不掉,最后直接删掉,让他变哑巴聋子):
重启你的 Mac。
按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。
csrutil disable
sudo mv /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd.bak
重启你的 Mac。
按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。
csrutil enable
关闭恢复模式终端,然后从苹果菜单中选择“重启”。
一 、内网被动流量探针钉钉提醒监控
该脚本通过监听开启的自定义端口流量来达到内网扫描告警目的。
端口占用会报错,尽量搞台干净系统运行。
常见端口:
21
22
80
105
135
139
443
445
1433
1521
2181
2379
3000
3306
3389
4443
4848
5432
5672
6379
6443
7001
7002
7003
7077
7848
8009
8080
8081
8161
8181
8200
8443
8848
8983
9000
9001
9042
9080
9090
9200
9300
9848
9990
9999
10250
11211
15672
19999
27017
50000
50070
61616
import socket
import threading
import requests
import os
# 从 token.txt 文件中读取钉钉机器人 Webhook 的 token
def load_dingtalk_token(filename):
if not os.path.isfile(filename):
print(f"文件 {filename} 不存在")
return None
with open(filename, 'r') as file:
token = file.read().strip()
return token
# 使用钉钉机器人发送提醒
def send_dingtalk_alert(token, message):
webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'
headers = {
'Content-Type': 'application/json'
}
data = {
'msgtype': 'text',
'text': {
'content': message
}
}
response = requests.post(webhook_url, json=data, headers=headers)
if response.status_code != 200:
print(f"发送钉钉消息失败: {response.text}")
# 处理客户端连接
def handle_client(client_socket, local_ip, local_port, token):
remote_ip, remote_port = client_socket.getpeername()
hostname = socket.gethostname()
# 接收请求数据
request_data = b""
while True:
data = client_socket.recv(1024)
if not data:
break
request_data += data
request_text = request_data.decode('utf-8', errors='replace')
message = f"本机内网IP: {local_ip}\n" \
f"主机名: {hostname}\n" \
f"被连接端口: {local_port}\n" \
f"远程连接IP: {remote_ip}\n" \
f"远程连接端口: {remote_port}\n" \
f"请求数据:\n{request_text}"
print(message)
send_dingtalk_alert(token, message)
client_socket.close()
# 启动服务器
def start_server(port, token):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', port))
server.listen(5)
local_ip = socket.gethostbyname(socket.gethostname())
print(f"服务器启动,监听端口: {port}")
while True:
client_socket, addr = server.accept()
client_handler = threading.Thread(
target=handle_client,
args=(client_socket, local_ip, port, token)
)
client_handler.start()
# 从文件中加载端口列表
def load_ports_from_file(filename):
if not os.path.isfile(filename):
print(f"文件 {filename} 不存在")
return []
with open(filename, 'r') as file:
ports = [int(line.strip()) for line in file if line.strip().isdigit()]
return ports
if __name__ == "__main__":
# 加载钉钉 token
token = load_dingtalk_token('token.txt')
if not token:
print("无法加载钉钉 token")
else:
# 加载端口列表
ports = load_ports_from_file('ports.txt')
if not ports:
print("没有可监听的端口")
else:
for port in ports:
server_thread = threading.Thread(target=start_server, args=(port, token))
server_thread.start()
二 、服务器指定目录指定后缀文件增加钉钉提醒监控
看一眼便懂,不介绍了。
import os
import time
import socket
import requests
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
def load_from_file(filename):
if not os.path.isfile(filename):
print(f"文件 {filename} 不存在")
return None
with open(filename, 'r') as file:
return [line.strip() for line in file.readlines()]
def send_dingtalk_alert(token, message):
webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'
headers = {'Content-Type': 'application/json'}
data = {'msgtype': 'text', 'text': {'content': message}}
response = requests.post(webhook_url, json=data, headers=headers)
if response.status_code != 200:
print(f"发送钉钉消息失败: {response.text}")
def get_local_info():
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
return local_ip, hostname
def get_process_info(path):
for pid in psutil.pids():
try:
p = psutil.Process(pid)
for file in p.open_files():
if file.path == path:
return p.name(), pid
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return None, None
class DirectoryEventHandler(FileSystemEventHandler):
def __init__(self, file_extension, token, local_ip, hostname):
self.file_extension = file_extension
self.token = token
self.local_ip = local_ip
self.hostname = hostname
def on_created(self, event):
if not event.is_directory and event.src_path.endswith(self.file_extension):
process_name, pid = get_process_info(event.src_path)
directory = os.path.dirname(event.src_path)
message = (
f"检测到新文件: {event.src_path}\n"
f"所在目录: {directory}\n"
f"本机内网IP: {self.local_ip}\n"
f"主机名: {self.hostname}\n"
f"进程名称: {process_name if process_name else '未知'}\n"
f"进程ID: {pid if pid else '未知'}"
)
print(message)
send_dingtalk_alert(self.token, message)
if __name__ == "__main__":
token = load_from_file('token.txt')
if not token:
print("无法加载钉钉 token")
exit(1)
directories_to_watch = load_from_file('directories.txt')
if not directories_to_watch:
print("无法加载监控目录")
exit(1)
local_ip, hostname = get_local_info()
file_extension = '.jsp' # 请替换为你要监控的文件后缀
event_handler = DirectoryEventHandler(file_extension, token[0], local_ip, hostname)
observer = Observer()
for directory in directories_to_watch:
observer.schedule(event_handler, directory, recursive=True)
print(f"开始监控目录: {directory}, 监控文件后缀: {file_extension}")
observer.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
print("停止监控")
observer.join()
三、Linux服务器简单应急python脚本
import os
import subprocess
import json
def run_command(command):
"""运行系统命令并返回输出"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout.strip()
except Exception as e:
return str(e)
def collect_system_info():
"""收集系统基本信息"""
info = {
'hostname': run_command('hostname'),
'uptime': run_command('uptime'),
'users': run_command('who'),
'last_logins': run_command('last -n 10'),
'disk_usage': run_command('df -h'),
'memory_usage': run_command('free -h'),
'cpu_info': run_command('lscpu'),
'network_info': run_command('ifconfig -a'),
}
return info
def check_open_ports():
"""检查开放端口"""
open_ports = run_command('netstat -tuln')
return open_ports
def list_running_processes():
"""列出当前运行的进程"""
processes = run_command('ps aux')
return processes
def search_suspicious_files():
"""搜索可疑文件(例如最近修改的文件)"""
suspicious_files = run_command('find / -type f -mtime -1 2>/dev/null')
return suspicious_files
def main():
# 收集系统信息
system_info = collect_system_info()
with open('system_info.json', 'w') as f:
json.dump(system_info, f, indent=4)
# 检查开放端口
open_ports = check_open_ports()
with open('open_ports.txt', 'w') as f:
f.write(open_ports)
# 列出运行的进程
running_processes = list_running_processes()
with open('running_processes.txt', 'w') as f:
f.write(running_processes)
# 搜索可疑文件
suspicious_files = search_suspicious_files()
with open('suspicious_files.txt', 'w') as f:
f.write(suspicious_files)
print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")
if __name__ == "__main__":
main()
四、Windows简单应急python脚本
import os
import subprocess
import json
def run_command(command):
"""运行系统命令并返回输出"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout.strip()
except Exception as e:
return str(e)
def collect_system_info():
"""收集系统基本信息"""
info = {
'hostname': run_command('hostname'),
'systeminfo': run_command('systeminfo'),
'uptime': run_command('net statistics workstation'),
'users': run_command('query user'),
'last_logins': run_command('quser'),
'disk_usage': run_command('wmic logicaldisk get size,freespace,caption'),
'memory_usage': run_command('systeminfo | findstr /C:"Total Physical Memory" /C:"Available Physical Memory"'),
'cpu_info': run_command('wmic cpu get name,NumberOfCores,NumberOfLogicalProcessors'),
'network_info': run_command('ipconfig /all'),
}
return info
def check_open_ports():
"""检查开放端口"""
open_ports = run_command('netstat -an')
return open_ports
def list_running_processes():
"""列出当前运行的进程"""
processes = run_command('tasklist')
return processes
def search_suspicious_files():
"""搜索可疑文件(例如最近修改的文件)"""
suspicious_files = run_command('dir /s /b /a-d /o-d')
return suspicious_files
def main():
# 收集系统信息
system_info = collect_system_info()
with open('system_info.json', 'w') as f:
json.dump(system_info, f, indent=4)
# 检查开放端口
open_ports = check_open_ports()
with open('open_ports.txt', 'w') as f:
f.write(open_ports)
# 列出运行的进程
running_processes = list_running_processes()
with open('running_processes.txt', 'w') as f:
f.write(running_processes)
# 搜索可疑文件
suspicious_files = search_suspicious_files()
with open('suspicious_files.txt', 'w') as f:
f.write(suspicious_files)
print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")
if __name__ == "__main__":
main()
五、Linux日志分析脚本
自己改下日志路径
import os
import re
import gzip
import json
import inquirer
import subprocess
from datetime import datetime, timedelta
import argparse
# 日志文件路径
LOG_FILES = {
"system": ["/var/log/syslog", "/var/log/auth.log", "/var/log/kern.log"],
"container": ["/var/lib/docker/containers/*/*.log"],
"java": ["/path/to/java/application/logs/*.log"], # 修改为实际路径
"network": ["/var/log/ufw.log", "/var/log/iptables.log"],
"middleware": ["/var/log/nginx/access.log", "/var/log/nginx/error.log",
"/var/log/apache2/access.log", "/var/log/apache2/error.log",
"/var/log/mysql/error.log"],
}
DEFAULT_KEYWORDS = ["error", "fail", "denied", "segfault", "panic", "exception", "timeout"]
def read_log_file(file_path):
with (gzip.open(file_path, 'rt') if file_path.endswith('.gz') else open(file_path, 'r')) as f:
return f.readlines()
def glob_files(pattern):
return subprocess.getoutput(f"ls {pattern}").split('\n')
def parse_log_line(line, date_format="%b %d %H:%M:%S"):
try:
timestamp_str = line.split(' ')[0]
timestamp = datetime.strptime(timestamp_str, date_format)
return timestamp, line
except Exception:
return None, line
def filter_logs_by_time(logs, time_delta):
now = datetime.now()
return [line for log in logs if (timestamp := parse_log_line(log)[0]) and now - timestamp <= time_delta]
def search_logs(logs, keywords):
results = {}
for keyword in keywords:
pattern = re.compile(keyword, re.IGNORECASE)
matching_lines = [line for line in logs if pattern.search(line)]
if matching_lines:
results[keyword] = matching_lines
return results
def save_results(results, output_file):
with open(output_file, 'w') as f:
json.dump(results, f, indent=4)
def interactive_input():
questions = [
inquirer.Checkbox('categories', message="请选择要分析的日志类别", choices=list(LOG_FILES.keys())),
inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),
inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),
]
answers = inquirer.prompt(questions)
return answers['categories'], timedelta(hours=int(answers['time_delta'])), answers['keywords'].split(',')
def main():
parser = argparse.ArgumentParser(description="日志收集和分析脚本")
parser.add_argument('--interactive', action='store_true', help="启用交互模式")
args = parser.parse_args()
if args.interactive:
selected_categories, time_delta, keywords = interactive_input()
else:
selected_categories, time_delta, keywords = list(LOG_FILES.keys()), timedelta(hours=24), DEFAULT_KEYWORDS
all_results = {}
for category in selected_categories:
for pattern in LOG_FILES[category]:
for log_file in glob_files(pattern):
if os.path.exists(log_file):
logs = read_log_file(log_file)
filtered_logs = filter_logs_by_time(logs, time_delta)
results = search_logs(filtered_logs, keywords)
if results:
all_results.setdefault(category, {})[log_file] = results
output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
save_results(all_results, output_file)
print(f"日志分析结果已保存到 {output_file}")
if __name__ == "__main__":
main()
六、windows日志分析脚本
自己改路径
import win32evtlog
import win32evtlogutil
import win32security
import inquirer
import argparse
import json
from datetime import datetime, timedelta
# 默认的日志源
LOG_SOURCES = {
"System": "System",
"Application": "Application",
"Security": "Security",
}
DEFAULT_KEYWORDS = ["error", "fail", "denied", "exception", "timeout"]
def read_event_log(source, time_delta):
server = 'localhost'
log_type = source
hand = win32evtlog.OpenEventLog(server, log_type)
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ
total = win32evtlog.GetNumberOfEventLogRecords(hand)
events = []
now = datetime.now()
time_threshold = now - time_delta
while True:
records = win32evtlog.ReadEventLog(hand, flags, 0)
if not records:
break
for record in records:
event_time = datetime.fromtimestamp(record.TimeGenerated)
if event_time < time_threshold:
return events
events.append((event_time, record))
win32evtlog.CloseEventLog(hand)
return events
def search_logs(events, keywords):
results = {}
for keyword in keywords:
keyword_lower = keyword.lower()
matching_events = [event for event in events if keyword_lower in event[1].StringInserts.lower()]
if matching_events:
results[keyword] = matching_events
return results
def save_results(results, output_file):
with open(output_file, 'w') as f:
json.dump(results, f, indent=4, default=str)
def interactive_input():
questions = [
inquirer.Checkbox('sources', message="请选择要分析的日志源", choices=list(LOG_SOURCES.keys())),
inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),
inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),
]
answers = inquirer.prompt(questions)
selected_sources = [LOG_SOURCES[source] for source in answers['sources']]
time_delta = timedelta(hours=int(answers['time_delta']))
keywords = answers['keywords'].split(',')
return selected_sources, time_delta, keywords
def main():
parser = argparse.ArgumentParser(description="Windows 日志收集和分析脚本")
parser.add_argument('--interactive', action='store_true', help="启用交互模式")
args = parser.parse_args()
if args.interactive:
selected_sources, time_delta, keywords = interactive_input()
else:
selected_sources = list(LOG_SOURCES.values())
time_delta = timedelta(hours=24)
keywords = DEFAULT_KEYWORDS
all_results = {}
for source in selected_sources:
events = read_event_log(source, time_delta)
results = search_logs(events, keywords)
if results:
all_results[source] = results
output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
save_results(all_results, output_file)
print(f"日志分析结果已保存到 {output_file}")
if __name__ == "__main__":
main()
七、内存马筛查脚本
import psutil
import subprocess
import os
import json
# 检查可疑进程
def check_suspicious_processes():
suspicious_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if any(keyword in proc.info['cmdline'] for keyword in ['java', 'tomcat', 'nacos']):
suspicious_processes.append(proc.info)
return suspicious_processes
# 分析网络连接
def analyze_network_connections():
network_connections = []
for conn in psutil.net_connections(kind='inet'):
if conn.laddr.port in range(8000, 9000) or conn.status == 'LISTEN':
try:
proc = psutil.Process(conn.pid)
if any(keyword in proc.cmdline() for keyword in ['java', 'tomcat', 'nacos']):
network_connections.append({
'pid': conn.pid,
'name': proc.name(),
'cmdline': proc.cmdline(),
'local_address': conn.laddr,
'remote_address': conn.raddr,
'status': conn.status
})
except psutil.NoSuchProcess:
continue
return network_connections
# 检查文件系统中的可疑文件
def check_suspicious_files():
suspicious_files = []
search_paths = ['/var/www/html', '/usr/share/nginx/html']
keywords = ['Behinder', 'AntSword', 'Godzilla']
for path in search_paths:
for root, _, files in os.walk(path):
for file in files:
if file.endswith(('.jsp', '.php', '.aspx')):
file_path = os.path.join(root, file)
with open(file_path, 'r', errors='ignore') as f:
content = f.read()
if any(keyword in content for keyword in keywords):
suspicious_files.append(file_path)
return suspicious_files
# 获取Java进程ID
def get_java_pids():
java_pids = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if 'java' in proc.info['cmdline']:
java_pids.append(proc.info['pid'])
return java_pids
# 检查Java进程中的内存马
def check_memory_malware(pids):
heap_dumps = []
for pid in pids:
dump_file = f'/tmp/heapdump_{pid}.hprof'
cmd = ['jmap', '-dump:live,format=b,file=' + dump_file, str(pid)]
try:
subprocess.run(cmd, check=True)
heap_dumps.append(dump_file)
except subprocess.CalledProcessError:
print(f"内存转储失败: {pid}")
return heap_dumps
# 分析内存转储文件
def analyze_heap_dump(dump_file):
analysis_result = {}
# 假设我们使用 Eclipse MAT 的命令行工具进行分析
mat_cmd = [
'java', '-jar', 'org.eclipse.mat.cli-1.11.0.jar', # 替换为实际的 Eclipse MAT CLI 工具路径
'-consolelog',
'-heapdump', dump_file,
'-query', 'find_leaks', # 这里使用 MAT 内置的 find_leaks 查询
'-format', 'JSON',
'-output', dump_file + '.json'
]
try:
subprocess.run(mat_cmd, check=True)
with open(dump_file + '.json') as f:
analysis_result = json.load(f)
except subprocess.CalledProcessError:
print(f"内存分析失败: {dump_file}")
except FileNotFoundError:
print(f"找不到分析结果文件: {dump_file}.json")
return analysis_result
# 主函数
def main():
results = {
'suspicious_processes': check_suspicious_processes(),
'network_connections': analyze_network_connections
八、cobalt等远控的dns请求筛查脚本
根据DNS请求频率
from scapy.all import *
import time
# 存储 DNS 请求的字典
dns_requests = {}
# 捕获并解析 DNS 数据包的回调函数
def dns_monitor_callback(packet):
if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求
dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')
src_ip = packet[IP].src
current_time = time.time()
# 记录每个源IP的DNS请求时间
if src_ip not in dns_requests:
dns_requests[src_ip] = []
dns_requests[src_ip].append(current_time)
# 检查最近的一段时间内的请求频率
dns_requests[src_ip] = [t for t in dns_requests[src_ip] if current_time - t < 60] # 只保留最近60秒的请求
if len(dns_requests[src_ip]) > 20: # 如果60秒内的请求数超过20次,触发警报
print(f"[ALERT] High DNS request frequency from {src_ip}: {len(dns_requests[src_ip])} requests in the last minute")
# 使用scapy捕获DNS流量
def start_dns_monitor():
print("Starting DNS traffic monitor...")
sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__":
start_dns_monitor()
dns响应数据包类型
from scapy.all import *
# 捕获并解析 DNS 数据包的回调函数
def dns_monitor_callback(packet):
if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1: # 只关注DNS响应
dns_response = packet.getlayer(DNS).an
if dns_response:
response_size = len(dns_response)
if response_size > 512: # 检查响应数据包大小,超过512字节的可能是异常的
src_ip = packet[IP].src
print(f"[ALERT] Large DNS response detected from {src_ip}: {response_size} bytes")
# 使用scapy捕获DNS流量
def start_dns_monitor():
print("Starting DNS traffic monitor...")
sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__":
start_dns_monitor()
检测特征字符串
from scapy.all import *
# 定义特征字符串或正则表达式模式
cobalt_strike_patterns = [
re.compile(r'^[a-zA-Z0-9]{16,}\.'),
re.compile(r'\..*\..*\..*') # 多级子域名
]
# 捕获并解析 DNS 数据包的回调函数
def dns_monitor_callback(packet):
if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求
dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')
src_ip = packet[IP].src
for pattern in cobalt_strike_patterns:
if pattern.match(dns_query):
print(f"[ALERT] Potential Cobalt Strike pattern detected: {dns_query} from {src_ip}")
# 使用scapy捕获DNS流量
def start_dns_monitor():
print("Starting DNS traffic monitor...")
sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__":
start_dns_monitor()
暂时就这些,手搓的好处就是有python环境随时内网搭建一个就算不出网也能监控。还不用担心安全设备本身随时可能被发现的RCE漏洞。
进群后台回复“进群”。