SaaS多租户自动化渗透平台-数据安全隔离实践
2024-9-27 15:28:57 Author: mp.weixin.qq.com(查看原文) 阅读量:0 收藏

0x01. 简介

在上一篇 SaaS多租户自动化渗透平台-架构笔记 中提到了 “客户对 SaaS 形态产品的数据安全性非常敏感”,因此实现数据安全隔离是平台的一个核心要求。同时因产品的定位是自动化渗透平台,用户为专业安全人员,使得保障平台自身的数据安全更加重要。这篇笔记介绍了在多租户架构下,实现数据安全隔离的一些思考和实践,主要内容包括

  • 多租户架构数据安全隔离的要求和设计思路

  • 基于 Flask、SQLAlchemy、Dramatiq 框架如何实现多租户数据库安全访问

  • ORM 框架特性导致出现“数据库跨租户越权”问题的排查过程和解决办法

PS:🔥 团队急后端研发,感兴趣可私信联系。另外公司正在广纳人才,可复制访问下方链接在线查看职位详情和投递简历

 https://app.mokahr.com/su/5wsls 


在多租户架构下,不同客户的访问和操作均对应同一套代码和服务,如何防止一个用户越权访问到其他租户的数据,是一个非常关键和基础的安全问题。这里首先想到的方案是,每个租户分配一个独立的数据库,有不同的账号密码,并限制访问权限。而从研发安全的视角,需要尽量避免在写业务代码时,手动处理选择和访问数据库资源的操作,即在底层进行统一的封装和屏蔽,实现根据当前用户所属的租户自动选择对应的数据库。而这也依赖用户所属租户的信息无法被篡改和伪造,即需要保证身份认证的安全性
梳理一下,有几个要点
  1. 获取当前登陆用户所属租户信息的代码逻辑是安全的
  2. 业务代码只关注功能逻辑,对数据库的选择无感知
  3. 不同租户对应不同数据库、不同账号密码,无法相互访问

平台使用的技术栈为:Web 框架 Flask 、数据库框架 SQLAlchemy、异步任务框架 Dramatiq

实现过程包括
  1. 获取当前用户所属租户信息

这里使用 JSON Web Token (JWT) 的方式来保存用户身份和所属租户信息。对于 JWT 的安全,需要确保进行 signature 的验证(第三方库通常默认开启),以及使用的 secret key 复杂度足够强。另外为了避免共用 secret key 泄露后相互影响,这里为每个租户生成不同了 secret key。代码示例如下

verified_info = jwt.decode(jwt_token, jwt_secret)enterprise_id = verified_info.get("enterprise_id")

JWT 安全部分详细可参考 https://portswigger.net/web-security/jwt

  1. 在当前上下文设置所属租户信息

在 Flask 中提供了 Application Context 来实现在一次请求、CLI 命令等需要在特定执行范围内,管理和共享数据的能力。使用者可在当前上下文环境,通过 g proxy 变量来设置、访问、修改数据。详细可参考

  • https://flask.palletsprojects.com/en/3.0.x/appcontext/

  • flask.g https://flask.palletsprojects.com/en/3.0.x/api/#flask.g

代码实现方面,需要区分不同的业务场景。一是用户使用平台访问 Web 接口,Flask 会在处理请求前后,自动 push 和 pop application context。我们只需要在用户成功登录后,通过g变量设置租户的信息,示例代码如下

from flask import g
g.enterprise_id = "tenant-A"
另一个业务场景是执行租户相关的异步任务。此时我们需要
  1. 在执行任务的前后 push 和 pop application context

  2. 发送任务时,自动在参数中添加所属租户信息

  3. 执行任务前后,从参数中获取租户信息,通过 变量设置/清除租户信息

在 Dramatiq 框架中,我们可通过编写自定义 Middleware,在对应 hooks 函数中实现以上逻辑,详细参考 https://dramatiq.io/reference.html#middleware

示例代码如下,需求 1 对应 AppContextMiddleware,需求 2,3 对应 EnterpriseMiddleware

from threading import localfrom dramatiq import Message, Middlewarefrom flask import g

class AppContextMiddleware(Middleware): # 每个线程有自己独立的 context STATE = local()
def __init__(self, app): self.app = app
def before_process_message(self, broker, message): context: AppContext = self.app.app_context() context.push() self.STATE.context = context
def after_process_message(self, broker, message, *, result=None, exception=None): try: context = self.STATE.context context.pop(exception) del self.STATE.context except AttributeError: pass
after_skip_message = after_process_message

class EnterpriseMiddleware(Middleware):
def before_enqueue(self, broker, message: Message, delay): # 发送任务时自动设置 enterprise_id message.options.setdefault("enterprise_id", g.enterprise_id)
def before_process_message(self, broker, message): if enterprise_id := message.options.get("enterprise_id", None): g.enterprise_id = enterprise_id
def after_process_message(self, broker, message, *, result=None, exception=None): g.pop("enterprise_id", None)
after_skip_message = after_process_message
  1. 数据访问操作时,根据当前所属租户选择对应数据库

在 SQLAlchemy 中,会使用 Session 来进行 ORM 相关操作,我们可以通过重写 Session 的 get_bind 方法来实现根据第一步中的租户信息 g.enterprise_id 来动态选择对应数据库连接(通常为 Engine 实例)

代码示例如下

from flask_sqlalchemy.session import Sessionfrom flask import g
class DynamicSession(Session): def get_bind( self, mapper: t.Any | None = None, clause: t.Any | None = None, bind: sa.engine.Engine | sa.engine.Connection | None = None, **kwargs: t.Any, ) -> sa.engine.Engine | sa.engine.Connection: found_bind = None if bind_key := g.get("enterprise_id", ""): found_bind = self._db.engines.get(bind_key) # 其他逻辑 ...
if not found_bind: found_bind = super().get_bind(mapper, clause, bind, **kwargs)
return found_bind

其中 self._db.engines 保存了所有租户数据库的连接信息,类型为 map,key 为租户的 ID。这部分由另一个独立的线程定时同步和动态加载租户的数据库连接配置。

详细可参考

  • Session.get_bind https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.Session.get_bind

  • Engine https://docs.sqlalchemy.org/en/20/core/engines.html


在完成上述数据安全隔离措施后,我们仍遇到了在特定场景下 “某个租户获取到了其他租户数据” 的安全问题。

首先介绍发生问题的业务场景“扫描任务的执行”。当用户在平台创建任务后,并不会立刻被运行,而是在“任务队列表”中插入一条数据。再由“任务调度服务”定期检查任务队列表中是否有任务,满足运行的条件后才会发送到队列运行。在多租户架构下,“任务调度服务”会依次访问所有租户的数据库进行检查,其代码示例如下
while True:    enterprise_ids = get_enterprise_ids()        for enterprise_id in enterprise_ids:      # 设置 g.enterprise_id 值,用于切换数据库      with bind_enterprise(enterprise_id):        # 查询待运行的任务列表        stmt = select(TaskQueue).where(TaskQueue.handled == false())        tasks = session.scalars(stmt).all()        # ... 运行条件检查        # 发送任务到队列        for task in tasks:          _send_task(task)          # 每个租户单次只发送一个任务          break        time.sleep(1)
TaskQueue model 的定义如下
class TaskQueue(Base):  id: Mapped[int] = mapped_column(    BigInteger(), primary_key=True  )  task_id: Mapped[int] = mapped_column(    BigInteger(), nullable=False, index=True  )  # 任务对应的全局唯一消息 UUID  message_id: Mapped[str] = mapped_column(    String(36), index=True, nullable=False  )  # 是否已处理  handled: Mapped[bool] = mapped_column(        Boolean(), default=False, server_default=false()  )

运行任务的代码函数

@dramatiq.actor(queue_name="task")def run_flow_task(task_id):    try:      obj = get_task_obj_by_id(task_id)      ...    except Exception:      ...

某天监控告警发现,在租户 A 下运行任务报错,错误信息为“task_id = 121 在数据库里任务记录不存在”,其消息 ID(message_id)为 6d4a0abc-d277-44db-8f6a-2a90491b1dee。之后我们开始排查,在租户 A 对应的数据库中,最大的任务 ID 为 39,确实不存在 121 的记录。接着开始查询 “任务调度服务” 的日志,发现日志中的参数值和错误信息能对应上

任务调度服务日志

租户A:send message_id='xxxx' task_id=37租户A:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121租户A:send message_id='xxxx' task_id=39

根据前后的日志判断,正确的 task_id 应该是 38。到租户 A 的 “任务队列表” 中查看,确实有 38,但 message_id 是 47fb72b3-64dd-460d-bc03-bccfe18fa039与前面任务调度器的日志对应不上

租户A-任务队列表数据

id, task_id, message_id40, 37, xxxxx     41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa03942, 39, xxxx
“任务调度服务日志” 中有一个需要注意的点是,前后 task_id 37 和 39 的两个任务是正常运行的,经过手工测试,也无法复现该问题。
因为 message_id  为全局唯一消息 ID,根据日志查询到 47fb72b3-64dd-460d-bc03-bccfe18fa039属于租户B,其“任务队列表表”数据如下,task_id 为 121 也能对的上

租户B-任务队列表数据

id, task_id, message_id41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039

上述问题的表现像是“租户A”读取到的“租户B”下的数据,出现了跨租户越权的问题。起初我们怀疑是代码实现有问题,但经过排查和讨论,并未找到原因。

我们又继续排查任务调度日志,发现在租户A之前,调度的上一个租户恰巧是租户B的,且也找到了同 message_idtask_id的任务发送日志(第一行和第四行)

租户B:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121...租户A:send message_id='xxxx' task_id=37租户A:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121租户A:send message_id='xxxx' task_id=39

再根据 “任务队列表” 的数据对比,发现这两条数据拥有相同的数据库主键值,id 均为 41

租户Aid, task_id, message_id41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa039

租户Bid, task_id, message_id41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039
根据上述表现,我们怀疑是业务层代码在进行数据库查询时,没有刷新已加载到内存中的数据。
经过文档的搜索,发现在 SQLAlchemy ORM 中存在一个默认特性,“同一主键 model object 只会被 load 一次,后续再次查询不会更新当前 object 的状态”,目的为了保留内存中 model 已经被修改的状态,以及避免刷新已经存在数据的成本和复杂度。官方文档中的描述原文为

Normally, ORM objects are only loaded once, and if they are matched up to the primary key in a subsequent result row, the row is not applied to the object. This is both to preserve pending, unflushed changes on the object as well as to avoid the overhead and complexity of refreshing data which is already there.

文档地址:https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#populate-existing

我们来看一个代码示例,以便更好的理解特性背后的表现,这里有一个 User model,有 id 主键和 name 两个字段

class User(Base):  __tablename__ = "t_user"    id: Mapped[int] = mapped_column(BigInteger(), primary_key=True)  name: Mapped[str] = mapped_column(String(36), nullable=False)
查询代码如下
# 查询 id 为 2 的 user modelstmt = select(User).where(User.id == 2)user_obj = session.scalar(stmt)# 此时用户名为 zhangsanuser_obj.name
# 中途有其他逻辑将其 name 属性更新为了 lisi
# 再次查询user_obj = session.scalar(stmt)# 此时用户名依旧为 zhangsanuser_obj.name
代码中先后两次查询了同一个 id 的 model,可以看到,在第一次查询后,即使数据库中的 name 字段已经被修改了,后续再次查询,得到的仍然是原值。
结合上述特性,再回顾一下问题发生的过程
  1. 在检查租户B的待执行任务时,加载了主键为 41 的 TaskQueue model

租户B
id, task_id, message_id41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039
  1. 在后续检查租户A的待执行任务时,对应数据库中也有一条主键为 41 的任务队列记录

租户Aid, task_id, message_id41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa039
  1. 因为 SQLAlchemy ORM “同一主键 model object 只会被 load 一次,后续再次查询不会更新当前 object 的状态” 的特性,会直接使用之前已加载的租户B的 TaskQueue model 数据,导出发出的消息参数错误,而任务运行的上下文环境为租户A,在对应 DB 下无法找到对应数据

问题可总结为:任务调度服务在检查租户下待运行任务的过程中切换了数据库,在不同数据库下加载了同一 model 相同主键的 object,而因 ORM 框架默认特性,后续的查询不会刷新内存中的 object,导致数据不正确

针对该问题,可选的解决办法有

  1. 在发送完任务后,使用session.expire()将内存中的 model object 设置为 expire,下次查询时会重新加载

_send_task(task)session.expire(task)
参考 https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.Session.expire
  1. select 查询时,设置execution_optionspopulate_existing参数为 True,强制 refresh

stmt = ( select(User) .where(User.id == 2) .execution_options(populate_existing=True))
参考 https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#populate-existing

以上两种方法均为单次生效,需要额外函数调用和参数设置,容易遗忘出错。而对于用户来说,多次查询同一个主键的数据时,预期的默认行为也是获取最新的值和刷新内存中的 object。因此更合适的解决方法是在创建 db engine 时就进行设置,代码示例如下

from sqlalchemy import create_engine
engine_kwargs.setdefault("execution_options", {}).setdefault( "populate_existing", True)create_engine(**engine_kwargs)
参考 https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine
延伸:上述问题发生在“任务调度”场景,属于后台服务。对于需要直接处理用户请求的后端 API 服务,是否会存在类似的问题?
结论:在处理一个 Web 请求时,会创建独立的 scoped_session 来进行数据库的相关操作,并在 Web 请求结束后销毁。即处理不同用户请求的 session 不同,互不影响。在处理请求的过程中,对于用户侧功能,只会访问对应租户的 DB,因此不会出现该问题。在管理侧功能,会有手动切换和指定 DB 的操作,则可能会出现。因此建议参考前面的方法,在初始化数据库 engine 时,将 populate_existing 的默认值设置为 True
参考 https://docs.sqlalchemy.org/en/20/orm/contextual.html#using-thread-local-scope-with-web-applications

本文分享了 SaaS 多租户自动化渗透平台在数据安全隔离方面的设计思路、代码实践步骤、以及因数据库框架默认特性导致“跨租户数据越权访问”安全问题的排查和解决过程。除了需要严格保障不同租户间无法访问到对方数据外,随着平台功能不断增加,租户内用户的权限控制和数据隔离也非常重要,且因业务需求多样化,会面临更多的挑战,后续有机会再单独进行讨论和介绍。

PS:这里会不定期分享关于 SaaS 自动化渗透平台建设、安全研发的一些想法、实践和总结,感兴趣的朋友可以关注一下!(微信搜索 “b1ngz的笔记本”)



1

SaaS多租户自动化渗透平台-架构笔记

https://mp.weixin.qq.com/s/FNr3TsiZU371F_44WelZbw

2

JWT attacks

https://portswigger.net/web-security/jwt

3

Flask  Application Context

https://flask.palletsprojects.com/en/3.0.x/appcontext/

4

Flask g proxy object

https://flask.palletsprojects.com/en/3.0.x/api/#flask.g

5

Dramatiq Middleware

https://dramatiq.io/reference.html#middleware

6

SQLAlchemy Session

https://docs.sqlalchemy.org/en/20/orm/session_basics.html

7

SQLAlchemy Engine

https://docs.sqlalchemy.org/en/20/core/engines.html

8

SQLAlchemy ORM Populate Existing

https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#populate-existing

9

SQLAlchemy Using Thread-Local Scope with Web Applications

https://docs.sqlalchemy.org/en/20/orm/contextual.html#using-thread-local-scope-with-web-applications


文章来源: https://mp.weixin.qq.com/s?__biz=MzkwNDE5NzUyMA==&mid=2247483699&idx=1&sn=3d35c690bff8246d0f8e320c59108828&chksm=c08be5ccf7fc6cda4ece5c805eba67be1ee3287684f3db3d2d4d5ca2fe658274cb4a922ddbfb&scene=58&subscene=0#rd
如有侵权请联系:admin#unsafe.sh