在上一篇 SaaS多租户自动化渗透平台-架构笔记 中提到了 “客户对 SaaS 形态产品的数据安全性非常敏感”,因此实现数据安全隔离是平台的一个核心要求。同时因产品的定位是自动化渗透平台,用户为专业安全人员,使得保障平台自身的数据安全更加重要。这篇笔记介绍了在多租户架构下,实现数据安全隔离的一些思考和实践,主要内容包括
多租户架构数据安全隔离的要求和设计思路
基于 Flask、SQLAlchemy、Dramatiq 框架如何实现多租户数据库安全访问
ORM 框架特性导致出现“数据库跨租户越权”问题的排查过程和解决办法
https://app.mokahr.com/su/5wsls
获取当前登陆用户所属租户信息的代码逻辑是安全的 业务代码只关注功能逻辑,对数据库的选择无感知 不同租户对应不同数据库、不同账号密码,无法相互访问
平台使用的技术栈为:Web 框架 Flask 、数据库框架 SQLAlchemy、异步任务框架 Dramatiq
获取当前用户所属租户信息
这里使用 JSON Web Token (JWT) 的方式来保存用户身份和所属租户信息。对于 JWT 的安全,需要确保进行 signature 的验证(第三方库通常默认开启),以及使用的 secret key 复杂度足够强。另外为了避免共用 secret key 泄露后相互影响,这里为每个租户生成不同了 secret key。代码示例如下
verified_info = jwt.decode(jwt_token, jwt_secret)
enterprise_id = verified_info.get("enterprise_id")
JWT 安全部分详细可参考 https://portswigger.net/web-security/jwt
在当前上下文设置所属租户信息
在 Flask 中提供了 Application Context 来实现在一次请求、CLI 命令等需要在特定执行范围内,管理和共享数据的能力。使用者可在当前上下文环境,通过 g
proxy 变量来设置、访问、修改数据。详细可参考
https://flask.palletsprojects.com/en/3.0.x/appcontext/
flask.g https://flask.palletsprojects.com/en/3.0.x/api/#flask.g
代码实现方面,需要区分不同的业务场景。一是用户使用平台访问 Web 接口,Flask 会在处理请求前后,自动 push 和 pop application context。我们只需要在用户成功登录后,通过g
变量设置租户的信息,示例代码如下
from flask import g
g.enterprise_id = "tenant-A"
另一个业务场景是执行租户相关的异步任务。此时我们需要
在执行任务的前后 push 和 pop application context
发送任务时,自动在参数中添加所属租户信息
执行任务前后,从参数中获取租户信息,通过
g
变量设置/清除租户信息
示例代码如下,需求 1 对应 AppContextMiddleware,需求 2,3 对应 EnterpriseMiddleware
from threading import local
from dramatiq import Message, Middleware
from flask import g
class AppContextMiddleware(Middleware):
# 每个线程有自己独立的 context
STATE = local()
def __init__(self, app):
self.app = app
def before_process_message(self, broker, message):
context: AppContext = self.app.app_context()
context.push()
self.STATE.context = context
def after_process_message(self, broker, message, *, result=None, exception=None):
try:
context = self.STATE.context
context.pop(exception)
del self.STATE.context
except AttributeError:
pass
after_skip_message = after_process_message
class EnterpriseMiddleware(Middleware):
def before_enqueue(self, broker, message: Message, delay):
# 发送任务时自动设置 enterprise_id
message.options.setdefault("enterprise_id", g.enterprise_id)
def before_process_message(self, broker, message):
if enterprise_id := message.options.get("enterprise_id", None):
g.enterprise_id = enterprise_id
def after_process_message(self, broker, message, *, result=None, exception=None):
g.pop("enterprise_id", None)
after_skip_message = after_process_message
数据访问操作时,根据当前所属租户选择对应数据库
在 SQLAlchemy 中,会使用 Session 来进行 ORM 相关操作,我们可以通过重写 Session 的 get_bind
方法来实现根据第一步中的租户信息 g.enterprise_id
来动态选择对应数据库连接(通常为 Engine 实例)
代码示例如下
from flask_sqlalchemy.session import Session
from flask import g
class DynamicSession(Session):
def get_bind(
self,
mapper: t.Any | None = None,
clause: t.Any | None = None,
bind: sa.engine.Engine | sa.engine.Connection | None = None,
**kwargs: t.Any,
) -> sa.engine.Engine | sa.engine.Connection:
found_bind = None
if bind_key := g.get("enterprise_id", ""):
found_bind = self._db.engines.get(bind_key)
# 其他逻辑 ...
if not found_bind:
found_bind = super().get_bind(mapper, clause, bind, **kwargs)
return found_bind
其中 self._db.engines
保存了所有租户数据库的连接信息,类型为 map,key 为租户的 ID。这部分由另一个独立的线程定时同步和动态加载租户的数据库连接配置。
详细可参考
Session.get_bind https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.Session.get_bind
Engine https://docs.sqlalchemy.org/en/20/core/engines.html
在完成上述数据安全隔离措施后,我们仍遇到了在特定场景下 “某个租户获取到了其他租户数据” 的安全问题。
while True:
enterprise_ids = get_enterprise_ids()
for enterprise_id in enterprise_ids:
# 设置 g.enterprise_id 值,用于切换数据库
with bind_enterprise(enterprise_id):
# 查询待运行的任务列表
stmt = select(TaskQueue).where(TaskQueue.handled == false())
tasks = session.scalars(stmt).all()
# ... 运行条件检查
# 发送任务到队列
for task in tasks:
_send_task(task)
# 每个租户单次只发送一个任务
break
time.sleep(1)
TaskQueue
model 的定义如下class TaskQueue(Base):
id: Mapped[int] = mapped_column(
BigInteger(), primary_key=True
)
task_id: Mapped[int] = mapped_column(
BigInteger(), nullable=False, index=True
)
# 任务对应的全局唯一消息 UUID
message_id: Mapped[str] = mapped_column(
String(36), index=True, nullable=False
)
# 是否已处理
handled: Mapped[bool] = mapped_column(
Boolean(), default=False, server_default=false()
)
运行任务的代码函数
@dramatiq.actor(queue_name="task")
def run_flow_task(task_id):
try:
obj = get_task_obj_by_id(task_id)
...
except Exception:
...
某天监控告警发现,在租户 A 下运行任务报错,错误信息为“task_id = 121 在数据库里任务记录不存在”,其消息 ID(message_id)为 6d4a0abc-d277-44db-8f6a-2a90491b1dee
。之后我们开始排查,在租户 A 对应的数据库中,最大的任务 ID 为 39,确实不存在 121 的记录。接着开始查询 “任务调度服务” 的日志,发现日志中的参数值和错误信息能对应上
任务调度服务日志
租户A:send message_id='xxxx' task_id=37
租户A:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121
租户A:send message_id='xxxx' task_id=39
根据前后的日志判断,正确的 task_id 应该是 38。到租户 A 的 “任务队列表” 中查看,确实有 38,但 message_id 是 47fb72b3-64dd-460d-bc03-bccfe18fa039
与前面任务调度器的日志对应不上
租户A-任务队列表数据
id, task_id, message_id
40, 37, xxxxx
41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa039
42, 39, xxxx
47fb72b3-64dd-460d-bc03-bccfe18fa039
属于租户B,其“任务队列表表”数据如下,task_id 为 121
也能对的上租户B-任务队列表数据
id, task_id, message_id
41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039
上述问题的表现像是“租户A”读取到的“租户B”下的数据,出现了跨租户越权的问题。起初我们怀疑是代码实现有问题,但经过排查和讨论,并未找到原因。
我们又继续排查任务调度日志,发现在租户A之前,调度的上一个租户恰巧是租户B的,且也找到了同 message_id
和 task_id
的任务发送日志(第一行和第四行)
租户B:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121
...
租户A:send message_id='xxxx' task_id=37
租户A:send message_id='6d4a0abc-d277-44db-8f6a-2a90491b1dee' task_id=121
租户A:send message_id='xxxx' task_id=39
再根据 “任务队列表” 的数据对比,发现这两条数据拥有相同的数据库主键值,id 均为 41
租户A
id, task_id, message_id
41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa039
租户B
id, task_id, message_id
41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039
Normally, ORM objects are only loaded once, and if they are matched up to the primary key in a subsequent result row, the row is not applied to the object. This is both to preserve pending, unflushed changes on the object as well as to avoid the overhead and complexity of refreshing data which is already there.
文档地址:https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#populate-existing
我们来看一个代码示例,以便更好的理解特性背后的表现,这里有一个 User model,有 id 主键和 name 两个字段
class User(Base):
__tablename__ = "t_user"
id: Mapped[int] = mapped_column(BigInteger(), primary_key=True)
name: Mapped[str] = mapped_column(String(36), nullable=False)
# 查询 id 为 2 的 user model
stmt = select(User).where(User.id == 2)
user_obj = session.scalar(stmt)
# 此时用户名为 zhangsan
user_obj.name
# 中途有其他逻辑将其 name 属性更新为了 lisi
# 再次查询
user_obj = session.scalar(stmt)
# 此时用户名依旧为 zhangsan
user_obj.name
在检查租户B的待执行任务时,加载了主键为 41 的 TaskQueue model
租户B
id, task_id, message_id
41, 121, 47fb72b3-64dd-460d-bc03-bccfe18fa039
在后续检查租户A的待执行任务时,对应数据库中也有一条主键为 41 的任务队列记录
租户A
id, task_id, message_id
41, 38, 47fb72b3-64dd-460d-bc03-bccfe18fa039
因为 SQLAlchemy ORM “同一主键 model object 只会被 load 一次,后续再次查询不会更新当前 object 的状态” 的特性,会直接使用之前已加载的租户B的 TaskQueue model 数据,导出发出的消息参数错误,而任务运行的上下文环境为租户A,在对应 DB 下无法找到对应数据
针对该问题,可选的解决办法有
在发送完任务后,使用
session.expire()
将内存中的 model object 设置为 expire,下次查询时会重新加载
_send_task(task)
session.expire(task)
select 查询时,设置
execution_options
的populate_existing
参数为 True,强制 refresh
stmt = (
select(User)
.where(User.id == 2)
.execution_options(populate_existing=True)
)
以上两种方法均为单次生效,需要额外函数调用和参数设置,容易遗忘出错。而对于用户来说,多次查询同一个主键的数据时,预期的默认行为也是获取最新的值和刷新内存中的 object。因此更合适的解决方法是在创建 db engine 时就进行设置,代码示例如下
from sqlalchemy import create_engine
engine_kwargs.setdefault("execution_options", {}).setdefault(
"populate_existing", True
)
create_engine(**engine_kwargs)
PS:这里会不定期分享关于 SaaS 自动化渗透平台建设、安全研发的一些想法、实践和总结,感兴趣的朋友可以关注一下!(微信搜索 “b1ngz的笔记本”)
1 | ||
2 | JWT attacks | https://portswigger.net/web-security/jwt |
3 | Flask Application Context | https://flask.palletsprojects.com/en/3.0.x/appcontext/ |
4 | Flask g proxy object | https://flask.palletsprojects.com/en/3.0.x/api/#flask.g |
5 | Dramatiq Middleware | https://dramatiq.io/reference.html#middleware |
6 | SQLAlchemy Session | https://docs.sqlalchemy.org/en/20/orm/session_basics.html |
7 | SQLAlchemy Engine | https://docs.sqlalchemy.org/en/20/core/engines.html |
8 | SQLAlchemy ORM Populate Existing | https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#populate-existing |
9 | SQLAlchemy Using Thread-Local Scope with Web Applications | https://docs.sqlalchemy.org/en/20/orm/contextual.html#using-thread-local-scope-with-web-applications |