公有云厂商自建威胁情报系统 - 嘶吼 RoarTalk – 网络安全行业综合服务平台,4hou.com

公有云厂商自建威胁情报系统

bt0sea 业务安全 2017-08-23 09:55:15
4769698
收藏

导语:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

0x00、公有云厂商安全威胁

Q:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

各个问题都很尖锐~

业务层面

根据自己的公有云厂商业务特点(提供云主机、云存储、云数据库、云GPU),建立威胁情报体系,那么面临的威胁 主要集中在DDoS攻击和主机入侵。

1、有效的做到事中及时防御,降低用户遭受DDoS攻击总量,减少DDoS用户攻击成本。

2、DDoS攻击溯源,云安全增值服务。

3、内部针对性攻击事件溯源。

技术层面

1、历史追溯:通过聚类关联分析内部外部历史威胁情报数据,获得攻击事件

domain/IP/URL/samples/analysis report

2、及时阻断:能够对DDoS发起点(botnet主控端)、中间过程发起点(反射源/匿名代理TORP2P节点等)、到达点(直接攻击源),造成攻击无法持续。侧重点在前两部分。

3、DDoS溯源,找到攻击源关联的Domain注册信息、Email信息、云主机注册信息等任何通过身份信息注册的账号。

下面是一些攻击证据展示:

Threat Top 1、公有云平台业务系统和云上用户遭受DDoS攻击

Proof 1、没一天停过,在黑洞之前,攻击峰值总和维持在30G左右。

1.png

Proof 2、云上用户占比7成、云基础设施占3成。

11.png

Threat Top 2、云内主机中马

Proof 1、对外DDoS攻击告警

111.png

Proof 2、CDN网络遭受入侵

Proof3、DGA DNS Analytics解析监控,Xshell 感染终端监控。

当然还有很多威胁,我就不在这里赘述。

0x01、如何自建公有云厂商威胁情报系统

首先,需要建立一整套威胁情报生命周期管理:

在保护云上用户的隐私的前提下,有效的建立一套自动化情报收集、情报处理(特征提取工程、关联分析)、情报应用(抗D 、云WAF黑白名单、云基础设施定向攻击排查),以及情报共享(与拥有情报数据资源的厂商交换数据)。

其次,在带宽和服务器资源允许的情况下,使用DPI/DFI技术、蜜罐技术、沙箱技术建立自身的威胁情报收集系统,提供高质量的威胁情报(注意,是带证据的IOC情报)、通过AI技术提高检测速度和降低误报率。

自动化情报收集系统

@1、云数据中心NIDS部署,集中收集日志。

Suricata IDS + ELK

网络拓扑

1111.png

主要关注:shellcode监控、mysqlRDPSSH登陆尝试、木马活动、DoS等

@2、DDoS攻击数据源抓取

先说说技术方案选型问题,考虑到性能问题,主要是抓5元组的数据,那么DPI方案不成,xflow流量采样的方式(例如:绿盟NTA解决方案),1:1000抓包,在高速DDoS攻击的时候是无法抓到瞬间脉冲攻击包的。好吧,只有自己开发基于DPDK的数据统计和抓包系统。

2.png

@3、Web威胁数据收集

基础架构是复用DDoS Pcapdump服务器,根据云平台基础设施IP列表,抓取7层数据,存储到redis,然后汇总到数据仓库中。

@4、云平台基础设施服务器部署HIDS系统

有两种选择,一是使用传统的HIDS软件的方式收集,二是使用EDR机器学习的方法分析,我选择了后者。

22.png

222.png

详细描述一下在这一领域的实践之路:

1、训练数据选择

选择800万良性文件和400万恶意文件进行训练,Top 10 恶意家族是:

3.png

2、训练模型选择

(1)    通过https://github.com/erocarrera/pefile.git(VirusTotal和Cuckoo都在使用)访问PE文件(PE头信息、PE导入表、PE导出表、PE节数据、PE版本信息) 哈希所有部分。

Linux系统、MacOSX系统使用https://github.com/lief-project/LIEF 获取 elf文件和MachO文件头相关信息,并且哈希。

(2)    通过对准确率、漏报率、性能、模型大小、查询执行时间的评估来选择模型。(目前使用sklearn库,将来打算移植到ML Toolkit for TensorFlow,听说训练速度可以提升50倍,毕竟GPU越来越吊)

33.png

通过对文件判断的malware分类结果上报数据,提供基于主机的威胁情报源。

@5、公网蜜罐系统

333.png

有关公网蜜罐有以下思考:

(1)需要用高交互蜜罐,直接修改真实服务加入log记录系统,当然log系统尽量做的隐藏些。

蜜罐本身有一定威胁情报产生能力(降低误报(3次以上持续攻击才记录)、生成有价值情报(例如:web蜜罐产生sqlixss等情报数据,而不是只收集access log))

(2)有一套完善的基础服务管理系统,可以快速部署和销毁蜜罐系统,符合大规模部署要求。

情报处理

其实就是建立一套威胁溯源平台,平台提供查询接口,提供一个可视化的关系图图,例如:

3333.png

单更重要的情报逻辑处理部分:

这里以DDoS一次攻击事件分析说起:

(1)    首先通过自动化收集系统中收集到的攻击数据:

Data1:{攻击目的IP/攻击时间/攻击类型/攻击峰值/攻击持续时间}

Data2: {攻击目的IP /攻击目的端口/攻击时间/攻击源IP/攻击源端口/地理位置/攻击IP运营商/包大小}  取Top 1000数据

(2)    集合历史攻击数据 ,获取超过2次攻击源IP

(3)    通过端口扫描,回扫。确定IP存活状态和开放端口状态。

(4)    通过外联威胁情报查询IP信誉。确定IP状态排除TOR/匿名代理等

(5)    通过内外情报源查询攻击IP对应malware样本。

(6)    关联蜜罐系统收集样本,获取botnet Server地址。

(7)    对发生攻击的botnet Server发起反制措施。

情报共享

建立提高情报数据共享接口,同时需要对对外提供数据做脱敏处理。

@1、进入到ELK数据,建立威胁情报对外接口

(1)入库:

#!/usr/bin/env python
#coding=utf-8
import traceback
from elasticsearch import Elasticsearch
import sys
import json
import datetime,time
import psycopg2
connC = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
def GetNum():
         es = Elasticsearch('x.x.x.x')
         data = es.search(index=index_day, body={"query": {"match_all": {}}},size=1)
         return data["hits"]["total"]
def Getidslog():
         es = Elasticsearch('x.x.x.x')
         tmp_str = datetime.datetime.now().strftime('%Y.%m.%d')
         index_day = 'logstash-' + tmp_str
         es.indices.put_settings(
                                                 {"index": {
                                                        "max_result_window": 500000
                                                 }})
         num = GetNum()
         data = es.search(index=index_day, body={"query": {"match_all": {}}}, size=num)
         datalen=len(data["hits"]["hits"])
         tableName = "%s_%s" % ("idslog", time.strftime("%Y%m%d"))
         cur1 = conn1.cursor()
         try:
              for c in xrange(0,datalen):
#                   print c
                     if data["hits"]["hits"][c]:
                            m_ctime=data["hits"]["hits"][c]["_source"]["@timestamp"]
                            m_src_ip=data["hits"]["hits"][c]["_source"]["src_ip"]
                            m_category=data["hits"]["hits"][c]["_source"]["alert"]["category"]
                            m_signature=data["hits"]["hits"][c]["_source"]["alert"]["signature"]
                            sql = "INSERT INTO %s (ctime,src_ip, category,signature) VALUES ('%s','%s','%s','%s')"
                            sqlCmd = sql % (tableName, m_ctime, m_src_ip, m_category, m_signature)
                            print sqlCmd
                            cur1.execute(sqlCmd)
                            conn1.commit()
         except Exception,e:
              traceback.print_exc()
def CreateTable():
    curC = connC.cursor()
    sqlCreate = "create table if not exists %s ( 
                 ctime TEXT,
                 src_ip TEXT,
                 category TEXT,
                 signature TEXT
                 )"
    tableName = "%s_%s"%("idslog", time.strftime("%Y%m%d"))
    sqlCmd = sqlCreate%tableName
    curC.execute(sqlCmd)
    curC.close()
    connC.commit()
if __name__ == '__main__':
         CreateTable()
         Getidslog()

(2)通过django web方式提供API接口。

  url(r'^api/outxxx/id$', outputAPI.as_view()),//获取某个IP对应的威胁情报,(只提供必要的情报,内部数据需要脱敏)
url(r'^api/outxxx/IPlist$', outputIPlistAPI.as_view()), //获得所有ip列表
class outputAPI(APIView):
    def get(self, request, format=None):
        m_src_ip = request.GET.get("ip") //安全机制已屏蔽
        print m_src_ip
        tableName ='idslog_20170814'
        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                port="5432")
        cur1 = conn1.cursor()
        SQL1="select * from %s WHERE src_ip=%s" %(tableName,m_src_ip)
        cur1.execute(SQL1)
        rows = cur1.fetchall()
        list =[]
        for row in rows:
            m = {"ctime": row[0], "src_ip": row[1],"category":row[2],"signature":row[3]}
            list.append(m)
        b=json.dumps(list)
        return HttpResponse(b)
class outputIPlistAPI(APIView):
    def get(self, request, format=None):
        tableName = 'idslog_20170814'
        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                 port="5432")
        cur1 = conn1.cursor()
        SQL1 = "select DISTINCT(src_ip) from {} ".format(tableName)
        cur1.execute(SQL1)
        rows = cur1.fetchall()
        list = []
        for row in rows:
            m = {"src_ip": row[0]}
            list.append(m)
        b = json.dumps(list)
        return HttpResponse(b)

0x03、总结

公有云厂商自建威胁情报系统,是一个长期的投入,虽然短期看不出来多少效果,但是针对数据情报的积累到一定量的时候会有质的变化。

  • 分享至
取消

感谢您的支持,我会继续努力的!

扫码支持

打开微信扫一扫后点击右上角即可分享哟

发表评论

 
本站4hou.com,所使用的字体和图片文字等素材部分来源于原作者或互联网共享平台。如使用任何字体和图片文字有侵犯其版权所有方的,嘶吼将配合联系原作者核实,并做出删除处理。
©2022 北京嘶吼文化传媒有限公司 京ICP备16063439号-1 本站由 提供云计算服务