CUCKOO沙箱源码分析 上篇
2020-06-11 13:50:03 Author: bbs.pediy.com(查看原文) 阅读量:805 收藏

前言

鄙人现今是一名研究生, 研究方向为样本分析自动化, cuckoo 沙箱项目是一个非常不错的例子, 给大家分享一下.

环境搭建

os: Ubuntu 18.04
python: python2
link: https://www.jianshu.com/p/ac009f6c2710(这个链接是我试过最靠谱的)

cuckoo总体架构描述

 cuckoo总体架构如下图, host接收分析任务,然后开启虚拟机client, 将样本和一些必要的分析代码通过http协议传输给client端. 必要的分析代码通过会包括一些hook代码和内核模块,以及一些其他与这些模块交互的Python代码, 以Windows系统为例:monitor.dll(hook), zer0m0n.sys(内核模块), inject.exe(控制monitor),execsc.exe(执行shellcode)等. cuckoo通过这些分析代码来获取样本的行为, 然后待分析时间耗尽, 将分析结果通过tcp传输给host端, 将client回复到样本执行的状态.

cuckoo常用命令行介绍

cuckoo community --> 获取一些恶意样本的特征库
cuckoo init --> 初始化cucko
cuckoo web --> 开启web服务, 可以使用浏览器上传样本,查看报告
cuckoo -d --> 开启分析样本服务, 
cuckoo submit --> 提交样本
cuckoo clean --> 清空分析结果

cuckoo命令

使用pip安装cuckoo之后, 可以使用cuckoo命令.查看一下cuckoo命令的位置:

(venv) pwnmelife@ubuntu:~/Downloads/cuckoo$ which cuckoo 
/home/pwnmelife/Downloads/cuckoo/venv/bin/cuckoo

...

(venv) pwnmelife@ubuntu:~/Downloads/cuckoo$ cat $(which cuckoo)
#!/home/pwnmelife/Downloads/cuckoo/venv/bin/python2
# -*- coding: utf-8 -*-
import re
import sys
from cuckoo.main import main
if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

可以看到cuckoo命令其实是一个python脚本, 最后执行代码是cuckoo模块中main函数

cuckoo.main函数

main函数中使用了click模块, 可以很方便的构建命令行程序.
main函数中核心就是调用了下列的两个函数:

cuckoo_init(level, ctx) # level --> 指明输出的级别(debug or quiet), ctx是click.core.Context的对象, 用于记录一些信息.
cuckoo_main(maxcount) # maxcount --> Maxinum number of analyses to process

cuckoo.init 函数

功能: 进行cuckoo的初始化,包括:

  • 创建CWD文件夹
  • 检查配置文件,检查版本
  • 连接数据库
  • 初始化cuckoo的组件

创建CWD文件夹

CWD(Current Working Directory)文件夹包含了除了运行代码以外的其余文件,非常重要.

├── agent(agent.py agent.sh android)
├── analyzer(android, darwin, linux, windows)
├── conf  --> (cuckoo的配置文件)
├── distributed
├── elasticsearch
├── __init__.py
├── log(日志)
├── monitor(hook程序)
├── pidfiles(正在)
├── signatures(样本特征)
├── storage(存储样本程序,及分析的结果)
├── stuff
├── supervisord
├── supervisord.conf
├── web(web登录的一些设置)
├── whitelist
└── yara(样本分析的yara规则, 如:shellcode.yar, vmdetect.yar)

连接数据库

cuckoo支持三种数据库: mysql, sqlite3, postgres, 可以在配置文件中进行配置.其中最常用的是mysql, 一般用作web模块存储数据.

mysql数据库中的表
+------------------+
| Tables_in_cuckoo |
+------------------+
| alembic_version  |
| errors           |
| guests           |   --> 执行分析任务虚拟机的分析流程
| machines         |   --> 虚拟机信息
| machines_tags    |
| samples          |   --> 样本信息(sha512, md5, crc32等)
| submit           |
| tags             |
| tasks            |   --> 分析任务信息
| tasks_tags       |
+------------------+


[database]
# Specify the database connection string.
# NOTE: If you are using a custom database (different from sqlite), you have to
# use utf-8 encoding when issuing the SQL database creation statement.
# Examples, see documentation for more:
# sqlite:///foo.db
# postgresql://foo:bar@localhost:5432/mydatabase
# mysql://foo:bar@localhost/mydatabase
# If empty, defaults to a SQLite3 database at $CWD/cuckoo.db.
connection = mysql://root:toor@localhost/cuckoo 

# Database connection timeout in seconds.
# If empty, default is set to 60 seconds.
timeout = 60

初始化cuckoo组件

init_modules()
init_tasks()
init_yara()
init_binaries()
init_rooter() # VPN的一些初始化
init_routing() # VPN的初始化
文件: core/startup.py

init_modules

# core.startup.py

def init_modules():
    """Initialize plugins."""
    log.debug("Imported modules...")

    categories = (
        "auxiliary", "machinery", "processing", "signatures", "reporting",
    )

    # Call the init_once() static method of each plugin/module. If an exception
    # is thrown in that initialization call, then a hard error is appropriate.
    # 下面三行代码,执行auxiliary,machinery,processing,reporting四个文件夹中的所有类的init_once函数
    # 但这些类均继承自common/abstract.py中Auxiliary,Machinery,Processing,Signature,Report,
    # Extractor. 个人觉得init_once函数是一个占坑函数, 为以后代码修改提供方便

    for category in categories:
        for module in cuckoo.plugins[category]:
            module.init_once()

    for category in categories:
        log.debug("Imported \"%s\" modules:", category)

        entries = cuckoo.plugins[category]
        for entry in entries:
            if entry == entries[-1]:
                log.debug("\t `-- %s", entry.__name__)
            else:
                log.debug("\t |-- %s", entry.__name__)

    # Initialize the RunSignatures module with all available Signatures and
    # the ExtractManager with all available Extractors.
    # 
    # RunSignatures.init_once()修改了以下两个类变量:  
    #   类变量 available_signatures: 存储所有signatures文件夹下的filename
    #   类变量 ttp_descriptions: 存储ttp_descriptions.json文件中的内容.
    RunSignatures.init_once()

    # Extractor的子类
    #   OfficeDDE1, OfficeDDE2,LnkHeader,OleStream,Powerfun,Unicorn
    # 将一些Extractor的子类添加到ExtractManager的类变量extractors中.
    # 至于为什么要添加这些子类, 先挖一个坑, 后面会填上的.
    ExtractManager.init_once()

init_tasks

# 初始化任务队列, 处理的对象:
# 1. 上次分析失败的任务
# 2. 意外退出, 未完成分析的任务

def init_tasks():
    """Check tasks and reschedule uncompleted ones."""
    db = Database()

    log.debug("Checking for locked tasks..")
    # 获取cuckoo数据库, 表tasks中, status为running的任务
    for task in db.list_tasks(status=TASK_RUNNING):
        # 检查cuckoo.conf中的reschedule选项是否修改为yes
        if config("cuckoo:cuckoo:reschedule"):
            # 如果为yes
            # 重新加入分析队列中
            task_id = db.reschedule(task.id)
            log.info(
                "Rescheduled task with ID %s and target %s: task #%s",
                task.id, task.target, task_id
            )
        # 如果为no
        else:
            # 设置为TASK_FAILED_ANALYSIS
            db.set_status(task.id, TASK_FAILED_ANALYSIS)
            log.info(
                "Updated running task ID %s status to failed_analysis",
                task.id
            )

    log.debug("Checking for pending service tasks..")
    for task in db.list_tasks(status=TASK_PENDING, category="service"):
        # 任务状态为TASK_PENDING, 说明上次分析时, 虚拟机启动失败
        # 将对应任务设置为TASK_FAILED_ANALYSIS
        db.set_status(task.id, TASK_FAILED_ANALYSIS)

init_yara

# 关键代码
# 编译yara规则为对应的yarac
def init_yara():
    """Initialize & load/compile Yara rules."""
    # yara规则所在的文件夹
    categories = (
        "binaries", "urls", "memory", "scripts", "shellcode",
        "dumpmem", "office",
    )
    for category in categories:
        dirpath = cwd("yara", category)
        if not os.path.exists(dirpath):
            log.warning("Missing Yara directory: %s?", dirpath)

        rules, indexed = {}, []
        for dirpath, dirnames, filenames in os.walk(dirpath, followlinks=True):
            for filename in filenames:
                if not filename.endswith((".yar", ".yara")):
                    continue

                filepath = os.path.join(dirpath, filename)

                # 省略一些异常处理
                rules["rule_%s_%d" % (category, len(rules))] = filepath
                indexed.append(filename)

        # Need to define each external variable that will be used in the
        # future. Otherwise Yara will complain.
        externals = {
            "filename": "",
        }

        try:
            # 编译
            File.yara_rules[category] = yara.compile(
                filepaths=rules, externals=externals
            )
        except yara.Error as e:
            raise CuckooStartupError(
                "There was a syntax error in one or more Yara rules: %s" % e
            )

        # The memory.py processing module requires a yara file with all of its
        # rules embedded in it, so create this file to remain compatible.
        if category == "memory":
            f = open(cwd("stuff", "index_memory.yar"), "wb")
            for filename in sorted(indexed):
                f.write('include "%s"\n' % cwd("yara", "memory", filename))

        indexed = sorted(indexed)
        # 省略一些log.debug

    # Store the compiled Yara rules for the "dumpmem" category in
    # $CWD/stuff/ so that we may pass it along to zer0m0n during analysis.
    # 如果分析windows程序, 则将dumpmem.yarac一同发送到windows 虚拟机中, 供zer0m0n驱动使用
    File.yara_rules["dumpmem"].save(cwd("stuff", "dumpmem.yarac"))

init_binaries

# 比较简单
def init_binaries():
    """Inform the user about the need to periodically look for new analyzer
    binaries. These include the Windows monitor etc."""

    dirpath = cwd("monitor", "latest")

    # If "latest" is a symbolic link, check that it exists.
    if os.path.islink(dirpath):
        if not os.path.exists(dirpath):
            # 抛出错误信息
            throw()
    # If "latest" is a file, check that it contains a legitimate hash.
    elif os.path.isfile(dirpath):
        monitor = os.path.basename(open(dirpath, "rb").read().strip())
        if not monitor or not os.path.isdir(cwd("monitor", monitor)):
             # 抛出错误信息
            throw()
    else:
         # 抛出错误信息
        throw()

cuckoo_main函数

SIGTERM信号

def stop(): 
    if sched:
        sched.running = False
    if rs:
        rs.instance.stop()

    Pidfile("cuckoo").remove()
    if sched:
        sched.stop()

def handle_sigterm(sig, f):
    stop()

# Handle a SIGTERM, to reduce the chance of Cuckoo exiting without
# cleaning
# 退出时, 进行一些清理工作
signal.signal(signal.SIGTERM, handle_sigterm)

关键代码

# 由于cuckoo采用的时类似cs架构, 即host通过网络传输将必要的python代码和分析工具传输给client
# client, 接受样本和python代码进行分析, 然后将结果通过网络回传给host, 完成分析.
rs = ResultServer() #运行host服务器
sched = Scheduler(max_analysis_count) # 初始化Scheduler(管理任务执行和调度的类)
sched.start() # 开始进行任务调度

ResultServer类

主要用于与client主机进行数据传输.

构造函数

# 获取cuckoo.conf中的配置信息
ip = config("cuckoo:resultserver:ip")
port = config("cuckoo:resultserver:port")
pool_size = config('cuckoo:resultserver:pool_size')

# 绑定端口
# gevent官方介绍:
# gevent是一个基于协程的Python网络库,它使用greenlet在libev或libuv事件循环之上提供一个高级同步API
sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((ip, port))

# http 服务
self.thread = threading.Thread(target=self.create_server,
                                       args=(sock, pool_size))
        self.thread.daemon = True
        self.thread.start()

create_server

# create_server是创建一个http服务, 与client端进行交互.
# create_server中使用了GeventResultServerWorker, 这个类继承于gevent.server.StreamServer(http服务器)
# self.instance也会在add_task和del_task中使用
def create_server(self, sock, pool_size):
    if pool_size:
        pool = gevent.pool.Pool(pool_size)
    else:
        pool = 'default'
    self.instance = GeventResultServerWorker(sock, spawn=pool)
    self.instance.do_run()

del_task

# 删除一个映射, ip和task_id
def del_task(self, task, machine):
    """Delete running task and cancel existing handlers."""
    self.instance.del_task(task.id, machine.ip)

add_task

# 添加一个映射, ip和task_id
def add_task(self, task, machine):
    """Register a task/machine with the ResultServer."""
    self.instance.add_task(task.id, machine.ip)

Scheduler类

Scheduler类cuckoo沙箱的一个任务调度类,

initialize函数

# 关键代码
# 以virtualbox为例
# 获取虚拟机client类型, virtualbox
machinery_name = self.cfg.cuckoo.machinery

# 获取最大并行的虚拟机数量
max_vmstartup_count = self.cfg.cuckoo.max_vmstartup_count

# 获取virtualbox的对应的管理模块, 对应的类在machinery文件夹
# cuckoo管理虚拟机通过相应的命令, 如: virtualbox
# 开启虚拟机: vboxmanage startvm <uuid|vmname>
# 恢复快照: vboxmanage snapshot <uuid|vmname> restore <snapshot-name>
machinery = cuckoo.machinery.plugins[machinery_name]()

# 读取virtualbox.conf配置信息
# 虚拟机启动的方式(headless, gui), 虚拟网卡名称(传输文件用),虚拟机的名称, 快照名称, 平台(windows/Linux)
machinery.set_options(Config(machinery_name))

# 检查虚拟机是否存在,恢复快照 
machinery.initialize(machinery_name)

start函数

# 关键代码
# 省略的部分代码, 包括: 错误的输出, 调试信息的输出,
# 以及使用信号量机制和锁来对虚拟机(共享资源)的运行的进行同步.
# 上面一个函数, 主要初始化虚拟机软件(virtualbox ,vmware, xenserver)
self.initialize()

# 检查host是否有足够的空间
# 目前只实现了Linux
if self.cfg.cuckoo.freespace:
    dir_path = cwd("storage", "analyses")

    # Linux ,return True
    # Windows, return False
    if hasattr(os, "statvfs"):
        dir_stats = os.statvfs(dir_path.encode("utf8"))

        # Calculate the free disk space in megabytes.
        space_available = dir_stats.f_bavail * dir_stats.f_frsize
        space_available /= 1024 * 1024

        if space_available < self.cfg.cuckoo.freespace:
            # ......
            continue

# 为任务选择合适的虚拟机, 空闲状态, web界面可以选择合适的虚拟机进行执行样本.
for machine in self.db.get_available_machines():
    import pdb; pdb.set_trace()
    task = self.db.fetch(machine=machine.name)
    if task:
        break

    if machine.is_analysis():
        available = True


# 接收之后, 开始进行分析, 开启虚拟机,balablala
analysis = AnalysisManager(task.id, errors)
analysis.daemon = True
analysis.start()
self.analysis_managers.add(analysis)

待续......

【预售】物联网安全漏洞实战!离物联网安全研究员只有一个课程的距离!报名满30人开班!


文章来源: https://bbs.pediy.com/thread-260038.htm
如有侵权请联系:admin#unsafe.sh