日志分析系列(外传三):平台安全性
2020-1-12 22:47:9 Author: mp.weixin.qq.com(查看原文) 阅读量:3 收藏

本系列故事纯属虚构,如有雷同纯属巧合

小B在实现了统一日志分析平台后,想到的第一个问题就是平台的安全性,用于安全分析的平台被攻击成功,那么小B将颜面扫地还可能被扫地出门。所以小B认证分析了平台可能存在的安全风险:

  • 平台产品自身的安全漏洞(解决方案:及时安装补丁或升级版本);

  • 数据泄露:在采集存储使用等过程中造成的数据泄露问题(解决方案:加密);

  • 未授权访问:平台无任何访问控制措施导致未授权访问(解决方案:见后文);

对于平台的安全性,小B觉得首要的是使用没有已知漏洞的产品版本,其次就是添加访问控制。对于小B的日志分析平台,其中有这么几个地方需要额外注意:ES与Kibana的安全性、Kafka的安全性。

通用访问控制实现

使用防火墙实现第一层访问控制是常见也是很多中小型企业使用的方法。大家一般在网络防火墙(云安全组)、本机防火墙实现访问控制功能。
在使用防火墙时,推荐使用白名单对平台的安全性进行保障,我这里使用firewall实现基础的访问控制:
# 如果操作系统是Centos系列7以上版本,推荐使用

# 保护Kafka与Zookeeper
# 只允许Kafka的机器访问ZK,这种方法也是最有效防护ZK未授权访问导致信息泄露的办法
firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="10.10.10.9" port port="2181" protocol="tcp" accept'

# 只允许Beats机器访问Kafka
firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="10.10.10.0/24" port port="9092" protocol="tcp" accept'

# 只允许办公网络IP访问Kibana
firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="182.*.*.107" port port="5601" protocol="tcp" accept'

# 如果白名单IP比较分散,可以使用firewall结合ipset,关于ipset的使用大家就自行百度吧!
firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source ipset="ipset_name" port port="9092" protocol="tcp" accept'

Elastic安全性

>>>>

Xpack实现访问控制

Elastic在6.8与7.2版本之后开放了免费认证功能。我这里是7.4.1的实验环境:
# 停止Kibana与elasticsearch服务
systemctl stop kibana.service
systemctl stop elasticsearch.service

# 创建es证书颁发机构
/usr/share/elasticsearch/bin/elasticsearch-certutil ca

# 创建es集群通信证书
/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca /etc/elasticsearch/elastic-stack-ca.p12

# 修改证书权限
chmod 644 /etc/elasticsearch/elastic-certificates.p12

# 修改配置文件
vim /etc/elasticsearch/elasticsearch.yml
# 在文件末尾添加如下内容

xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12
# 如果配置了证书密码
# /usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password
# /usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.transport.ssl.truststore.secure_password

# 启动Elasticsearch服务
systemctl start elasticsearch.service

# 创建账号密码
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive

  • elastic:超级用户
  • apm_system:使用APM时的用户
  • kibana:用来连接ES的用户
  • logstash_system:使用Logstash时的用户
  • beats_system:使用Beats时的用户
  • remote_monitoring_user:收集和存储监控信息的用户
# 修改kibana配置文件
vim /etc/kibana/kibana.yml

# 在文件末尾添加如下内容

elasticsearch.username: "kibana"
elasticsearch.password: "kibana"
# 启动kibana
systemctl start kibana.service

# 使用elastc账户登录kibana

>>>>

Nginx实现访问控制

  • 方法一:使用Nginx Proxy实现
# 安装认证模块
yum install httpd-tools.x86_64 -y

# 创建认证账号
htpasswd -cm /etc/nginx/kibana-user kibana

# 配置nginx
vim /etc/nginx/nginx.conf

server {
listen 80;
server_name kibana;
auth_basic "Restricted Access";
auth_basic_user_file /opt/nginx/conf/kibana-user;

#charset koi8-r;

#access_log logs/host.access.log main;

location / {
proxy_pass http://10.10.10.9:5601;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
}
}

  • 方法二:使用Nginx Lua实现
>>>>

其他实现访问控制方法

  • SearchGuard:http://docs.search-guard.com/latest/index (来自小B朋友的用后感:不稳定、插件容易出问题)
  • Shield:https://www.elastic.co/guide/en/shield/current/index.html (没有使用过不做评论)

Kafka与Zookeeper安全性

对于Kafka与Zookeeper,其官网都提供了加密(TLS)、认证(SASL)和授权(ACL)的方式来保障安全性。但本节只介绍了Kafka与Zookeeper关于SASL认证的情况,其他的方式大家可以阅读参考资料自行获取。
  • TLS:提供加密,是否能解决未授权访问?未知。
  • SASL:只针对客户端连接访问,无法解决Zookeeper未授权访问;
  • ACL:只针对节点内的授权,无法解决Zookeeper未授权访问;
>>>>

Zookeeper配置SASL认证

Zookeeper是一个分布式服务框架,在Kafka的依赖中,ZK主要用于:
  • 存放Broker节点的配置信息和controller节点的配置信息;
  • 监听父节点的子节点列表来进行controller的选举;
  • partition信息的存储;
下面就是用SASL来对Zookeeper进行认证(实际没什么用):
  1. 修改Zookeeper的配置文件,加入官网提供的配置文件
vim /opt/zookeeper-3.4.14/conf/zoo.cfg
# 在文件末尾添加如下内容
# 打开SASL开关
quorum.auth.enableSasl=true
# 打开Client-to-Server authentication
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
quorum.cnxn.threads.size=20
# 认证模式使用SASL
requireClientAuthScheme=sasl
  1. 创建Zookeeper的认证文件
vim /opt/zookeeper-3.4.14/conf/zookeeper-server.conf
# 创建一个用户名为kafka,密码为123456的用户信息
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_kafka="123456";
};
  1. 修改Zookeeper运行的环境变量
vim /opt/zookeeper-3.4.14/bin/zkServer.sh
# 在140行,nohup后面加入运行的环境变量
# 原文 nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \

# 添加 -DJava.security.auth.login.config信息
nohup "$JAVA" "-Djava.security.auth.login.config=/opt/zookeeper-3.4.14/conf/zookeeper-server.conf" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \

  1. 启动Zookeeper验证SASL是否配置成功
# 启动服务端
/opt/zookeeper-3.4.14/bin/zkServer.sh start

# 使用客户端连接
/opt/zookeeper-3.4.14/bin/zkCli.sh

注意图片中被红色框起来的部分,我们会发现即使我们服务端配置了SASL认证,客户端依旧可以连接,那么我们再来使用账号密码连接一下。
  1. 创建客户端认证文件
vim /opt/zookeeper-3.4.14/conf/zookeeper-client.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="12345";
};
注意配置文件中的密码是错误的,这时我们来使用客户端连接就会出现
我们将密码修改正确后重新连接发现
上述就是Zookeeper配置SASL的全过程,但是我们使用Zookeeper未授权访问脚本测试此时的Zookeeper发现(此时我的端口是开放给外网所有的):
所以SASL不能解决Zookeeper的未授权访问问题。Zookeeper的ACL也不能解决该问题,至于TLS能否解决待研究。
备注:在这里回答为什么我们配置了服务端SASL,客户端不使用账号密码也可以登录,只是会提示unknown error。来自非官方的回答:"Zookeeper认证是插件化架构,SASL只是其中一种方式,故没有Client能进入,错误的不能进入为特性,而非bug。"
所以对于Zookeeper而言,最好的办法就是使用防火墙进行安全防护。
>>>>

Kafka配置SASL认证

在新版本的Kafka中支持SASL的GSSAPI与PLAIN两种认证方式,但是在fIlebeat output kafka配置中只支持SASL/PLAIN,所以我们这里也只测试SASL/PLAIN的方法:
  1. 在上面我们已经配置好了Zookeeper的SASL了,这里我们直接进行Kafka的SASL配置
# 首先修改Kafka服务端配置文件,启动SASL/PLAIN认证
vim /opt/kafka_2.11-2.3.1/config/server.properties
# PLAIN是SASL的一种认证方式,PLAINTEXT是通信过程是明文,不加密。
listeners=SASL_PLAINTEXT://10.10.10.9:9092

# 启用的认证模式PLAIN,也可使用GSSAPI
sasl.enabled.mechanisms=PLAIN
# kafka broker之间也需要使用PLAIN方式认证,也可使用GSSAPI
sasl.mechanism.inter.broker.protocol=PLAIN
# 通信为明文。如果需要使用SSL加密通信,则使用SASL_SSL,不过需要配置证书
security.inter.broker.protocol=SASL_PLAINTEXT

  1. 创建一个Kafka的认证文件
vim /opt/kafka_2.11-2.3.1/config/kafka.conf
# kafkaServer为KafkaServer的SASL
# Client是Kafka用来连接Zookeeper的配置,如果不配置Client,会提示WARN不会报错,原理在上面已经解释过了
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka_user"
password="kafka_pass"
user_producer="producer"
user_consumer="consumer";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="123456";
};

  1. 修改Kafka运行的环境变量
vim /opt/kafka_2.11-2.3.1/bin/kafka-run-class.sh
# 找到KAKFA_OPTS
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-2.3.1/config/kafka.conf"
fi
  1. 启动Kafka并测试Kafka认证
# 启动Kafka服务
/opt/kafka_2.11-2.3.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.11-2.3.1/config/server.properties

# 启动Kafka Producer测试是否能使用Kafka,记住先创建一个system-messages的topic
/opt/kafka_2.11-2.3.1/bin/kafka-console-producer.sh --broker-list 10.10.10.9:9092 --topic system-messages

如图证明我们没有配置客户端的账号密码就不能使用Kafka。
  1. 配置Kafka Client SASL认证
# 首先是修改Producer的配置文件
vim /opt/kafka_2.11-2.3.1/config/producer.properties

# 在文件末尾添加如下内容
# 如果不添加还是不能使用

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
# 其次是创建Kafka认证的文件,我这里Kafka CLient与Server是同一台,所以配置文件我写在同一个文件中
vim /opt/kafka_2.11-2.3.1/config/kafka.conf
# 在文件末尾添加如下内容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="producer";
};
如果不是使用同一个文件,需要单独配置运行环境变量。
  1. Kafka客户端配置完成后,我们再来测试
# 切记,必须制定producer的配置文件
/opt/kafka_2.11-2.3.1/bin/kafka-console-producer.sh --broker-list 10.10.10.9:9092 --topic system-messages
  1. 与Producer一样的配置Consumer。如果你与小B一样使用的是一个配置文件就不用重复配置了,如果不是记得配置,并且需要指定运行环境变量。
>>>>

Filebeat与Logstash配置Kafka

  1. 首先是Filebeat成为Kafka的Producer
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/test.log

output.kafka:
hosts: ["10.10.10.9:9092"]
topic: "test"
username: "producer"
password: "producer"

  1. 其次就是Logstash成为Kafka的Consumer
vim /etc/logstash/conf.d/kafka.conf
input {
kafka {
bootstrap_servers => "10.10.10.9:9092"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "/etc/logstash/kafka-client-jaas.conf"
topics => ["test"]
}
}
output {
stdout {}
}
# 配置Kafka的Client信息
vim /etc/logstash/kafka-client-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="consumer";
};
好了,平台安全性就先讲这么多吧!
未完待续。。。
 
- HISTORY -
《日志分析系列(一):介绍篇》
《日志分析系列(二):平台实现篇》
《日志分析系列(外传一):Nginx透过代理获取真实客户端IP
《日志分析系列(外传二):Nginx日志统一化

文章来源: https://mp.weixin.qq.com/s?__biz=MzU2NzY5MDY3MQ==&mid=2247483870&idx=1&sn=d662fb5ed290f19d1dc9f650f59ef0ae&chksm=fc98151ecbef9c08f6f1c34435f1549b4ec89b7e60b81ee6679de94283269b37eff48908a6f2&scene=58&subscene=0#rd
如有侵权请联系:admin#unsafe.sh