2024-7-3 16:19:19 Author: mp.weixin.qq.com(查看原文) 阅读量:0 收藏








进程名称: PowerChime用户名: mac网络连接:  本地地址: ipv6:62300  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:52115  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:63501  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:51850  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:60860  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:55092  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:54878  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:54829  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:60721  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED  本地地址: ipv6:64410  远程地址: fe80:8::aede:48ff:fe33:4455:49178  状态: ESTABLISHED



进程名称: corespeechd用户名: mac网络连接:  本地地址: ipv6:49742  远程地址: fe80:8::aede:48ff:fe33:4455:49162  状态: ESTABLISHED


networksetup -setv6off Wi-Finetworksetup -setv6off Ethernet


defaults write com.apple.PowerChime ChimeOnNoHardware -bool truesudo killall PowerChime


重启你的 Mac。按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。csrutil disablesudo mv /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd.bak重启你的 Mac。按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。csrutil enable关闭恢复模式终端,然后从苹果菜单中选择“重启”。

一 、内网被动流量探针钉钉提醒监控




import socketimport threadingimport requestsimport os
# 从 token.txt 文件中读取钉钉机器人 Webhook 的 tokendef load_dingtalk_token(filename): if not os.path.isfile(filename): print(f"文件 {filename} 不存在") return None with open(filename, 'r') as file: token = file.read().strip() return token
# 使用钉钉机器人发送提醒def send_dingtalk_alert(token, message): webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}' headers = { 'Content-Type': 'application/json' } data = { 'msgtype': 'text', 'text': { 'content': message } } response = requests.post(webhook_url, json=data, headers=headers) if response.status_code != 200: print(f"发送钉钉消息失败: {response.text}")
# 处理客户端连接def handle_client(client_socket, local_ip, local_port, token): remote_ip, remote_port = client_socket.getpeername() hostname = socket.gethostname() # 接收请求数据 request_data = b"" while True: data = client_socket.recv(1024) if not data: break request_data += data request_text = request_data.decode('utf-8', errors='replace') message = f"本机内网IP: {local_ip}\n" \ f"主机名: {hostname}\n" \ f"被连接端口: {local_port}\n" \ f"远程连接IP: {remote_ip}\n" \ f"远程连接端口: {remote_port}\n" \ f"请求数据:\n{request_text}" print(message) send_dingtalk_alert(token, message) client_socket.close()
# 启动服务器def start_server(port, token): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('', port)) server.listen(5) local_ip = socket.gethostbyname(socket.gethostname())
print(f"服务器启动,监听端口: {port}") while True: client_socket, addr = server.accept() client_handler = threading.Thread( target=handle_client, args=(client_socket, local_ip, port, token) ) client_handler.start()
# 从文件中加载端口列表def load_ports_from_file(filename): if not os.path.isfile(filename): print(f"文件 {filename} 不存在") return [] with open(filename, 'r') as file: ports = [int(line.strip()) for line in file if line.strip().isdigit()] return ports
if __name__ == "__main__": # 加载钉钉 token token = load_dingtalk_token('token.txt') if not token: print("无法加载钉钉 token") else: # 加载端口列表 ports = load_ports_from_file('ports.txt') if not ports: print("没有可监听的端口") else: for port in ports: server_thread = threading.Thread(target=start_server, args=(port, token)) server_thread.start()

二 、服务器指定目录指定后缀文件增加钉钉提醒监控


import osimport timeimport socketimport requestsimport psutilfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandler
def load_from_file(filename): if not os.path.isfile(filename): print(f"文件 {filename} 不存在") return None with open(filename, 'r') as file: return [line.strip() for line in file.readlines()]
def send_dingtalk_alert(token, message): webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}' headers = {'Content-Type': 'application/json'} data = {'msgtype': 'text', 'text': {'content': message}} response = requests.post(webhook_url, json=data, headers=headers) if response.status_code != 200: print(f"发送钉钉消息失败: {response.text}")
def get_local_info(): hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) return local_ip, hostname
def get_process_info(path): for pid in psutil.pids(): try: p = psutil.Process(pid) for file in p.open_files(): if file.path == path: return p.name(), pid except (psutil.NoSuchProcess, psutil.AccessDenied): continue return None, None
class DirectoryEventHandler(FileSystemEventHandler): def __init__(self, file_extension, token, local_ip, hostname): self.file_extension = file_extension self.token = token self.local_ip = local_ip self.hostname = hostname
def on_created(self, event): if not event.is_directory and event.src_path.endswith(self.file_extension): process_name, pid = get_process_info(event.src_path) directory = os.path.dirname(event.src_path) message = ( f"检测到新文件: {event.src_path}\n" f"所在目录: {directory}\n" f"本机内网IP: {self.local_ip}\n" f"主机名: {self.hostname}\n" f"进程名称: {process_name if process_name else '未知'}\n" f"进程ID: {pid if pid else '未知'}" ) print(message) send_dingtalk_alert(self.token, message)
if __name__ == "__main__": token = load_from_file('token.txt') if not token: print("无法加载钉钉 token") exit(1)
directories_to_watch = load_from_file('directories.txt') if not directories_to_watch: print("无法加载监控目录") exit(1)
local_ip, hostname = get_local_info() file_extension = '.jsp' # 请替换为你要监控的文件后缀
event_handler = DirectoryEventHandler(file_extension, token[0], local_ip, hostname) observer = Observer()
for directory in directories_to_watch: observer.schedule(event_handler, directory, recursive=True) print(f"开始监控目录: {directory}, 监控文件后缀: {file_extension}")
observer.start() try: while True: time.sleep(5) except KeyboardInterrupt: observer.stop() print("停止监控")    observer.join()


import osimport subprocessimport json
def run_command(command): """运行系统命令并返回输出""" try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return result.stdout.strip() except Exception as e: return str(e)
def collect_system_info(): """收集系统基本信息""" info = { 'hostname': run_command('hostname'), 'uptime': run_command('uptime'), 'users': run_command('who'), 'last_logins': run_command('last -n 10'), 'disk_usage': run_command('df -h'), 'memory_usage': run_command('free -h'), 'cpu_info': run_command('lscpu'), 'network_info': run_command('ifconfig -a'), } return info
def check_open_ports(): """检查开放端口""" open_ports = run_command('netstat -tuln') return open_ports
def list_running_processes(): """列出当前运行的进程""" processes = run_command('ps aux') return processes
def search_suspicious_files(): """搜索可疑文件(例如最近修改的文件)""" suspicious_files = run_command('find / -type f -mtime -1 2>/dev/null') return suspicious_files
def main(): # 收集系统信息 system_info = collect_system_info() with open('system_info.json', 'w') as f: json.dump(system_info, f, indent=4)
# 检查开放端口 open_ports = check_open_ports() with open('open_ports.txt', 'w') as f: f.write(open_ports) # 列出运行的进程 running_processes = list_running_processes() with open('running_processes.txt', 'w') as f: f.write(running_processes)
# 搜索可疑文件 suspicious_files = search_suspicious_files() with open('suspicious_files.txt', 'w') as f: f.write(suspicious_files)
if __name__ == "__main__": main()


import osimport subprocessimport json
def run_command(command): """运行系统命令并返回输出""" try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return result.stdout.strip() except Exception as e: return str(e)
def collect_system_info(): """收集系统基本信息""" info = { 'hostname': run_command('hostname'), 'systeminfo': run_command('systeminfo'), 'uptime': run_command('net statistics workstation'), 'users': run_command('query user'), 'last_logins': run_command('quser'), 'disk_usage': run_command('wmic logicaldisk get size,freespace,caption'), 'memory_usage': run_command('systeminfo | findstr /C:"Total Physical Memory" /C:"Available Physical Memory"'), 'cpu_info': run_command('wmic cpu get name,NumberOfCores,NumberOfLogicalProcessors'), 'network_info': run_command('ipconfig /all'), } return info
def check_open_ports(): """检查开放端口""" open_ports = run_command('netstat -an') return open_ports
def list_running_processes(): """列出当前运行的进程""" processes = run_command('tasklist') return processes
def search_suspicious_files(): """搜索可疑文件(例如最近修改的文件)""" suspicious_files = run_command('dir /s /b /a-d /o-d') return suspicious_files
def main(): # 收集系统信息 system_info = collect_system_info() with open('system_info.json', 'w') as f: json.dump(system_info, f, indent=4)
# 检查开放端口 open_ports = check_open_ports() with open('open_ports.txt', 'w') as f: f.write(open_ports) # 列出运行的进程 running_processes = list_running_processes() with open('running_processes.txt', 'w') as f: f.write(running_processes)
# 搜索可疑文件 suspicious_files = search_suspicious_files() with open('suspicious_files.txt', 'w') as f: f.write(suspicious_files)
if __name__ == "__main__": main()



import osimport reimport gzipimport jsonimport inquirerimport subprocessfrom datetime import datetime, timedeltaimport argparse
# 日志文件路径LOG_FILES = { "system": ["/var/log/syslog", "/var/log/auth.log", "/var/log/kern.log"], "container": ["/var/lib/docker/containers/*/*.log"], "java": ["/path/to/java/application/logs/*.log"], # 修改为实际路径 "network": ["/var/log/ufw.log", "/var/log/iptables.log"], "middleware": ["/var/log/nginx/access.log", "/var/log/nginx/error.log", "/var/log/apache2/access.log", "/var/log/apache2/error.log", "/var/log/mysql/error.log"],}
DEFAULT_KEYWORDS = ["error", "fail", "denied", "segfault", "panic", "exception", "timeout"]
def read_log_file(file_path): with (gzip.open(file_path, 'rt') if file_path.endswith('.gz') else open(file_path, 'r')) as f: return f.readlines()
def glob_files(pattern): return subprocess.getoutput(f"ls {pattern}").split('\n')
def parse_log_line(line, date_format="%b %d %H:%M:%S"): try: timestamp_str = line.split(' ')[0] timestamp = datetime.strptime(timestamp_str, date_format) return timestamp, line except Exception: return None, line
def filter_logs_by_time(logs, time_delta): now = datetime.now() return [line for log in logs if (timestamp := parse_log_line(log)[0]) and now - timestamp <= time_delta]
def search_logs(logs, keywords): results = {} for keyword in keywords: pattern = re.compile(keyword, re.IGNORECASE) matching_lines = [line for line in logs if pattern.search(line)] if matching_lines: results[keyword] = matching_lines return results
def save_results(results, output_file): with open(output_file, 'w') as f: json.dump(results, f, indent=4)
def interactive_input(): questions = [ inquirer.Checkbox('categories', message="请选择要分析的日志类别", choices=list(LOG_FILES.keys())), inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"), inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)), ] answers = inquirer.prompt(questions) return answers['categories'], timedelta(hours=int(answers['time_delta'])), answers['keywords'].split(',')
def main(): parser = argparse.ArgumentParser(description="日志收集和分析脚本") parser.add_argument('--interactive', action='store_true', help="启用交互模式") args = parser.parse_args()
if args.interactive: selected_categories, time_delta, keywords = interactive_input() else: selected_categories, time_delta, keywords = list(LOG_FILES.keys()), timedelta(hours=24), DEFAULT_KEYWORDS
all_results = {} for category in selected_categories: for pattern in LOG_FILES[category]: for log_file in glob_files(pattern): if os.path.exists(log_file): logs = read_log_file(log_file) filtered_logs = filter_logs_by_time(logs, time_delta) results = search_logs(filtered_logs, keywords) if results: all_results.setdefault(category, {})[log_file] = results
output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" save_results(all_results, output_file) print(f"日志分析结果已保存到 {output_file}")
if __name__ == "__main__": main()



import win32evtlogimport win32evtlogutilimport win32securityimport inquirerimport argparseimport jsonfrom datetime import datetime, timedelta
# 默认的日志源LOG_SOURCES = { "System": "System", "Application": "Application", "Security": "Security",}
DEFAULT_KEYWORDS = ["error", "fail", "denied", "exception", "timeout"]
def read_event_log(source, time_delta): server = 'localhost' log_type = source hand = win32evtlog.OpenEventLog(server, log_type) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ total = win32evtlog.GetNumberOfEventLogRecords(hand)
events = [] now = datetime.now() time_threshold = now - time_delta
while True: records = win32evtlog.ReadEventLog(hand, flags, 0) if not records: break for record in records: event_time = datetime.fromtimestamp(record.TimeGenerated) if event_time < time_threshold: return events events.append((event_time, record))
win32evtlog.CloseEventLog(hand) return events
def search_logs(events, keywords): results = {} for keyword in keywords: keyword_lower = keyword.lower() matching_events = [event for event in events if keyword_lower in event[1].StringInserts.lower()] if matching_events: results[keyword] = matching_events return results
def save_results(results, output_file): with open(output_file, 'w') as f: json.dump(results, f, indent=4, default=str)
def interactive_input(): questions = [ inquirer.Checkbox('sources', message="请选择要分析的日志源", choices=list(LOG_SOURCES.keys())), inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"), inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)), ] answers = inquirer.prompt(questions) selected_sources = [LOG_SOURCES[source] for source in answers['sources']] time_delta = timedelta(hours=int(answers['time_delta'])) keywords = answers['keywords'].split(',') return selected_sources, time_delta, keywords
def main(): parser = argparse.ArgumentParser(description="Windows 日志收集和分析脚本") parser.add_argument('--interactive', action='store_true', help="启用交互模式") args = parser.parse_args()
if args.interactive: selected_sources, time_delta, keywords = interactive_input() else: selected_sources = list(LOG_SOURCES.values()) time_delta = timedelta(hours=24) keywords = DEFAULT_KEYWORDS
all_results = {} for source in selected_sources: events = read_event_log(source, time_delta) results = search_logs(events, keywords) if results: all_results[source] = results
output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" save_results(all_results, output_file) print(f"日志分析结果已保存到 {output_file}")
if __name__ == "__main__":    main()


import psutilimport subprocessimport osimport json
# 检查可疑进程def check_suspicious_processes(): suspicious_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): if any(keyword in proc.info['cmdline'] for keyword in ['java', 'tomcat', 'nacos']): suspicious_processes.append(proc.info) return suspicious_processes
# 分析网络连接def analyze_network_connections(): network_connections = [] for conn in psutil.net_connections(kind='inet'): if conn.laddr.port in range(8000, 9000) or conn.status == 'LISTEN': try: proc = psutil.Process(conn.pid) if any(keyword in proc.cmdline() for keyword in ['java', 'tomcat', 'nacos']): network_connections.append({ 'pid': conn.pid, 'name': proc.name(), 'cmdline': proc.cmdline(), 'local_address': conn.laddr, 'remote_address': conn.raddr, 'status': conn.status }) except psutil.NoSuchProcess: continue return network_connections
# 检查文件系统中的可疑文件def check_suspicious_files(): suspicious_files = [] search_paths = ['/var/www/html', '/usr/share/nginx/html'] keywords = ['Behinder', 'AntSword', 'Godzilla'] for path in search_paths: for root, _, files in os.walk(path): for file in files: if file.endswith(('.jsp', '.php', '.aspx')): file_path = os.path.join(root, file) with open(file_path, 'r', errors='ignore') as f: content = f.read() if any(keyword in content for keyword in keywords): suspicious_files.append(file_path) return suspicious_files
# 获取Java进程IDdef get_java_pids(): java_pids = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): if 'java' in proc.info['cmdline']: java_pids.append(proc.info['pid']) return java_pids
# 检查Java进程中的内存马def check_memory_malware(pids): heap_dumps = [] for pid in pids: dump_file = f'/tmp/heapdump_{pid}.hprof' cmd = ['jmap', '-dump:live,format=b,file=' + dump_file, str(pid)] try: subprocess.run(cmd, check=True) heap_dumps.append(dump_file) except subprocess.CalledProcessError: print(f"内存转储失败: {pid}") return heap_dumps
# 分析内存转储文件def analyze_heap_dump(dump_file): analysis_result = {} # 假设我们使用 Eclipse MAT 的命令行工具进行分析 mat_cmd = [ 'java', '-jar', 'org.eclipse.mat.cli-1.11.0.jar', # 替换为实际的 Eclipse MAT CLI 工具路径 '-consolelog', '-heapdump', dump_file, '-query', 'find_leaks', # 这里使用 MAT 内置的 find_leaks 查询 '-format', 'JSON', '-output', dump_file + '.json' ] try: subprocess.run(mat_cmd, check=True) with open(dump_file + '.json') as f: analysis_result = json.load(f) except subprocess.CalledProcessError: print(f"内存分析失败: {dump_file}") except FileNotFoundError: print(f"找不到分析结果文件: {dump_file}.json") return analysis_result
# 主函数def main(): results = { 'suspicious_processes': check_suspicious_processes(), 'network_connections': analyze_network_connections



from scapy.all import *import time
# 存储 DNS 请求的字典dns_requests = {}
# 捕获并解析 DNS 数据包的回调函数def dns_monitor_callback(packet): if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求 dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8') src_ip = packet[IP].src current_time = time.time()
# 记录每个源IP的DNS请求时间 if src_ip not in dns_requests: dns_requests[src_ip] = [] dns_requests[src_ip].append(current_time)
# 检查最近的一段时间内的请求频率 dns_requests[src_ip] = [t for t in dns_requests[src_ip] if current_time - t < 60] # 只保留最近60秒的请求 if len(dns_requests[src_ip]) > 20: # 如果60秒内的请求数超过20次,触发警报 print(f"[ALERT] High DNS request frequency from {src_ip}: {len(dns_requests[src_ip])} requests in the last minute")
# 使用scapy捕获DNS流量def start_dns_monitor(): print("Starting DNS traffic monitor...") sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__": start_dns_monitor()


from scapy.all import *
# 捕获并解析 DNS 数据包的回调函数def dns_monitor_callback(packet): if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1: # 只关注DNS响应 dns_response = packet.getlayer(DNS).an if dns_response: response_size = len(dns_response) if response_size > 512: # 检查响应数据包大小,超过512字节的可能是异常的 src_ip = packet[IP].src print(f"[ALERT] Large DNS response detected from {src_ip}: {response_size} bytes")
# 使用scapy捕获DNS流量def start_dns_monitor(): print("Starting DNS traffic monitor...") sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__": start_dns_monitor()


from scapy.all import *
# 定义特征字符串或正则表达式模式cobalt_strike_patterns = [ re.compile(r'^[a-zA-Z0-9]{16,}\.'), re.compile(r'\..*\..*\..*') # 多级子域名]
# 捕获并解析 DNS 数据包的回调函数def dns_monitor_callback(packet): if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求 dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8') src_ip = packet[IP].src for pattern in cobalt_strike_patterns: if pattern.match(dns_query): print(f"[ALERT] Potential Cobalt Strike pattern detected: {dns_query} from {src_ip}")
# 使用scapy捕获DNS流量def start_dns_monitor(): print("Starting DNS traffic monitor...") sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)
if __name__ == "__main__":    start_dns_monitor()



文章来源: https://mp.weixin.qq.com/s?__biz=Mzg5NTY3NTMxMQ==&mid=2247484464&idx=1&sn=b8a7972ee47210a46d918761119337e1&chksm=c00dfaf0f77a73e69771618aa9d9cdadefbc1542def7cd9df746addae9ba15d585c390747839&scene=58&subscene=0#rd