公开文集
0x01 SRC 资产管理系统
0x02 Web 漏洞案例库
0x03 小程序漏洞案例库
第一章:小程序渗透基础
1.1 微信小程序反编译与动态调试
1.2 微信小程序强制开启开发者模式
0x99 信息安全学习体系
01-网络安全基础
Day-001-TCP-IP协议栈安全分析
Day-002-DNS协议安全与DNS劫持攻防
Day-003-IPv6 安全基础与过渡
Day-004-HTTP-HTTPS协议深度解析
Day-005-网络嗅探与流量分析技术
Day-006-防火墙原理与配置实践
Day-007-网络地址转换 NAT 安全分析
Day-008-路由协议安全 RIP-OSPF-BGP
Day-009-VLAN 安全与 VLAN-Hopping
Day-010-无线网络基础与安全 802.11
Day-011-网络访问控制 802.1X-NAC
Day-012-网络分段与微隔离设计
Day-013-负载均衡器安全配置
Day-014-CDN安全与防护
Day-015-NTP安全
Day-016-DHCP安全与攻击防护
Day-017-ICMP协议安全分析
Day-018-网络协议模糊测试基础
Day-019-网络流量基线建立
Day-020-网络取证基础
Day-021-网络入侵检测系统 NIDS
Day-022-网络入侵防御系统 NIPS
Day-023-网络流量加密与解密
Day-024-网络协议逆向工程基础
Day-025-网络性能与安全权衡
Day-026-SDN 安全
Day-027-网络虚拟化安全
Day-028-网络欺骗技术
Day-029-网络威胁情报应用
Day-030-网络容量规划与安全
Day-031-网络安全架构设计实战
02-Web 安全
Day-032-OWASP-Top-10-2021详解
Day-033-SQL 注入原理与手工检测
Day-034-SQL注入进阶报错注入与盲注
Day-035-XSS跨站脚本攻击基础
Day-036-XSS 进阶绕过与利用
Day-037-XSS进阶绕过与利用
Day-038-CSRF 跨站请求伪造
Day-039-文件上传漏洞
Day-040-反序列化漏洞基础
Day-041-PHP反序列化深入
Day-042-Java反序列化深入
Day-043-SSTI 服务端模板注入
Day-044-文件包含漏洞 LFI-RFI
Day-045-命令注入漏洞
Day-046-XXE-XML 外部实体注入
Day-047-反序列化漏洞进阶
Day-048-API 安全基础
Day-049-API认证与授权安全
Day-050-API漏洞挖掘实战
Day-051-文件上传漏洞进阶
Day-052-反序列化漏洞实战
Day-053-Web 安全综合实战
Day-054-移动安全基础
Day-055-Android 应用安全测试
Day-056-iOS 应用安全测试
Day-057-移动应用综合实战
Day-058-云安全基础
Day-059-AWS 安全实战
Day-060-Azure 安全实战
Day-061-GCP 安全实战
Day-062-云安全综合实战
Day-063-容器安全基础
Day-064-Docker 安全实战
Day-065-Kubernetes 安全实战
Day-066-容器安全综合实战
Day-067-API 安全进阶
Day-068-服务端请求伪造 SSRF 深入
Day-069-文件上传漏洞进阶
Day-070-反序列化漏洞实战进阶
Day-071-业务逻辑漏洞深入
Day-072-前端安全深入
Day-073-Web 安全综合实战
Day-074-云安全进阶
Day-075-移动安全进阶
Day-076-API 安全进阶
Day-077-前端安全进阶
Day-078-业务逻辑漏洞进阶
Day-079-反序列化漏洞实战进阶
Day-080-文件上传漏洞实战进阶
Day-081-SSTI 服务端模板注入进阶
Day-082-XXE-XML 外部实体注入进阶
Day-083-SSRF 服务端请求伪造进阶
Day-084-命令注入漏洞进阶
Day-085-文件包含漏洞进阶
Day-086-反序列化漏洞实战进阶
Day-087-文件上传漏洞实战进阶
Day-088-SSTI 服务端模板注入实战进阶
Day-089-XXE-XML 外部实体注入实战进阶
Day-090-SSRF 服务端请求伪造实战进阶
Day-091-命令注入漏洞实战进阶
Day-092-Web 安全综合实战
Day-093-GraphQL 安全
Day-094-JWT 与 OAuth2 安全
03-系统安全
Day-095-系统监控与检测
Day-096-主机防火墙配置
Day-097-系统审计与合规
Day-098-Linux 系统安全进阶
Day-099-Windows 系统安全进阶
Day-100-容器安全进阶
Day-101-容器编排安全进阶
Day-102-Linux 内核安全
Day-103-Windows 内核安全
Day-104-系统安全总结与实战
Day-105-Linux 系统安全基础
Day-106-Windows 系统安全基础
Day-107-容器安全基础
Day-108-系统加固技术
Day-109-日志分析技术
Day-110-威胁狩猎技术
04-应用安全
Day-111-安全编码规范
Day-112-输入验证技术
Day-113-输出编码技术
Day-114-错误处理安全
Day-115-会话管理安全
Day-116-认证安全
Day-117-授权安全
Day-118-数据保护安全
Day-119-日志安全
Day-120-API 安全
Day-121-微服务安全
Day-122-新兴技术安全概论
Day-123-DevSecOps 流水线安全
Day-124-云原生安全架构
Day-125-API 安全最佳实践
Day-126-安全编码规范
Day-127-SDL 安全开发生命周期
Day-128-威胁建模实战
Day-129-安全需求分析
Day-130-安全架构设计
Day-131-安全编码实践Java
Day-132-安全编码实践Python
Day-133-代码审计方法论
Day-134-静态代码分析SAST
Day-135-动态应用测试DAST
Day-136-交互式测试IAST
Day-137-软件成分分析SCA
Day-138-依赖漏洞管理
Day-139-安全测试自动化
Day-140-漏洞管理与响应
Day-141-应用安全总结与展望
Day-142-OWASP-Top10-2024 详解
Day-143-CWE-Top25 分析
Day-144-漏洞挖掘方法论
Day-145-模糊测试技术
Day-146-逆向工程基础
Day-147-漏洞利用开发基础
Day-148-漏洞复现与验证
Day-149-漏洞披露流程
Day-150-CVE 申请与管理
Day-151-漏洞赏金计划
Day-152-等保2.0详解
Day-153-GDPR 合规实践
Day-154-数据安全法解读
Day-155-个人信息保护法与合规指南
Day-156-个人信息保护法解读
Day-157-ISO-27001 信息安全管理体系
Day-158-SOC-2 合规与审计
Day-159-PCI-DSS 支付卡行业数据安全标准
Day-160-网络安全审查办法解读
Day-161-数据出境安全评估办法
Day-162-应用安全评估实战
Day-163-红蓝对抗演练
Day-164-安全应急响应
Day-165-安全运营中心建设
Day-166-应用安全总结与展望
05-密码学
Day-167-密码学基础
Day-168-对称加密算法详解
Day-169-非对称加密算法详解
Day-170-哈希函数与数字签名
Day-171-密钥管理与PKI
Day-172-TLS-SSL 协议详解
Day-173-国密算法详解
Day-174-认证与密钥协议
Day-175-随机数生成与熵源
Day-176-椭圆曲线密码学详解
Day-177-后量子密码学详解
Day-178-高级密码学主题
Day-179-密码学行业应用精选
Day-180-常用加密算法原理与实现
Day-181-密码学总结与展望
Day-182-密码学系列总结与展望
06-渗透测试
Day-183-渗透测试方法论
Day-184-信息收集技术详解
Day-185-漏洞扫描技术详解
Day-186-漏洞利用技术详解
Day-187-渗透测试中的漏洞利用框架
Day-188-漏洞利用框架与 Metasploit 深入
Day-189-渗透测试中的 WAF 绕过技术
Day-190-渗透测试中的模糊测试技术
Day-191-渗透测试中的代码审计与静态分析
Day-192-渗透测试中的密码哈希破解技术
Day-193-渗透测试报告编写指南
Day-194-Web 应用渗透测试
Day-195-渗透测试中的 API 安全测试
Day-196-渗透测试中的 GraphQL 安全测试
Day-197-渗透测试中的前后端分离应用测试
Day-198-渗透测试中的小程序安全测试
Day-199-渗透测试中的浏览器安全测试
Day-200-OAuth-SSO安全测试
Day-201-渗透测试中的业务逻辑漏洞测试
Day-202-渗透测试中的厚客户端安全测试
Day-203-渗透测试综合实战演练
Day-204-内网渗透技术详解
Day-205-渗透测试中的内网信息收集进阶
Day-206-渗透测试中的域森林渗透技术
Day-207-渗透测试中的权限维持技术
Day-208-渗透测试中的横向移动技术
Day-209-渗透测试中的痕迹清理与反取证技术
Day-210-渗透测试中的数据窃取与 Exfiltration 技术
Day-211-渗透测试中的内部威胁与数据泄露测试
Day-212-渗透测试中的物理安全渗透
Day-213-社会工程学攻击技术
Day-214-移动应用渗透测试
Day-215-云安全渗透测试
Day-216-渗透测试中的容器与 Kubernetes 安全渗透
Day-217-渗透测试中的 Serverless 安全测试
Day-218-渗透测试中的微服务安全测试
Day-219-物联网安全渗透测试
Day-220-工业控制系统安全渗透测试
Day-221-无线网络安全渗透测试
Day-222-数据库安全渗透测试
Day-223-渗透测试中的供应链安全测试
Day-224-红队演练技术详解
Day-225-渗透测试中的红队基础设施搭建
Day-226-渗透测试中的威胁情报与狩猎
Day-227-渗透测试中的综合指纹识别技术
Day-228-自动化渗透测试技术
Day-229-渗透测试中的运维安全测试
Day-230-渗透测试中的区块链与智能合约安全测试
Day-231-渗透测试中的漏洞管理与修复验证
Day-232-渗透测试法律与合规
Day-233-后渗透攻击技术详解
Day-234-渗透测试中的人工智能应用
Day-235-漏洞利用开发深入
Day-236-云原生渗透测试深入
07-应急响应
Day-237-应急响应概述与核心概念
Day-238-应急响应流程框架
Day-239-CSIRT 团队组建与职责分工
Day-240-应急响应工具包准备
Day-241-应急响应法律与合规要求
Day-242-安全事件检测方法与指标
Day-243-云原生应急响应
Day-244-日志收集与分析技术
Day-245-网络流量分析与异常识别
Day-246-自动化响应与 SOAR
Day-247-端点监控与 EDR 技术
Day-248-威胁狩猎方法论
Day-249-威胁情报在检测中的应用
Day-250-数字取证基础与证据链管理
Day-251-内存取证技术
Day-252-磁盘取证与文件恢复
Day-253-网络取证与数据包分析
Day-254-云环境与容器取证
Day-255-恶意代码静态分析技术
Day-256-恶意代码动态分析技术
Day-257-恶意代码行为分析方法
Day-258-逆向工程基础与工具
Day-259-沙箱技术与自动化分析
Day-260-事件隔离与遏制策略
Day-261-威胁根除与系统修复
Day-262-系统恢复与数据重建
Day-263-业务连续性计划
Day-264-事件复盘与经验总结
Day-265-APT 攻击事件复盘分析
Day-266-勒索软件事件响应实战
Day-267-数据泄露事件处置流程
Day-268-内部威胁调查与取证
Day-269-综合应急响应演练
08-安全运维
Day-270-安全运营中心 SOC 概述
Day-271-安全监控指标体系
Day-272-安全告警管理
Day-273-安全可视化与仪表盘
Day-274-监控工具选型
Day-275-日志采集技术
Day-276-日志标准化与解析
Day-277-日志存储与归档
Day-278-日志分析技术
Day-279-日志合规要求
Day-280-SIEM 架构与设计
Day-281-关联规则引擎
Day-282-高级关联分析
Day-283-UEBA 用户实体行为分析
Day-284-威胁狩猎
Day-285-SOAR 基础概念
Day-286-剧本设计
Day-287-自动化响应技术
Day-288-安全工具集成
Day-289-SOAR 度量与优化
Day-290-安全基线管理
Day-291-漏洞管理流程
Day-292-补丁管理策略
Day-293-变更安全管理
Day-294-合规审计技术
Day-295-7x24 安全运营
Day-296-安全事件管理流程
Day-297-安全运营度量体系
Day-298-持续改进机制
Day-299-安全运维综合演练
Day-300-云原生安全运营
Day-301-AI 与机器学习安全运营
Day-302-安全自动化脚本实战
09-移动安全
Day-303-移动安全威胁概述
Day-304-移动设备安全架构
Day-305-移动操作系统安全模型
Day-306-移动应用权限管理
Day-307-移动端数据加密
Day-308-330-Android 安全合集
Day-309-Android 安全架构
Day-310-Android 组件安全
Day-311-Android 权限与隐私
Day-312-Android 逆向工程
Day-313-Android 应用加固
Day-314-iOS 安全架构
Day-315-iOS 应用沙盒机制
Day-316-越狱与反越狱
Day-317-iOS 逆向工程
Day-318-iOS 企业分发安全
Day-319-移动安全开发生命周期
Day-320-移动应用安全测试
Day-321-移动应用加固技术
Day-322-移动威胁防护
Day-323-移动安全合规
10-云安全
Day-324-云计算安全模型
Day-325-责任共担模型
Day-326-云安全威胁模型
Day-327-云安全合规框架
Day-328-云安全架构设计
Day-329-AWS IAM 安全
Day-330-AWS 网络安全
Day-331-AWS 存储安全
Day-332-AWS 安全监控
Day-333-AWS 安全最佳实践
Day-334-Azure AD 安全
Day-335-Azure 网络安全
Day-336-Azure 存储安全
Day-337-Azure 安全中心
Day-338-Azure 安全最佳实践
Day-339-容器安全基础
Day-340-Kubernetes 安全
Day-341-Serverless 安全
Day-342-云原生 DevSecOps
Day-343-云安全态势管理 CSPM
11-物联网工控
Day-344-物联网安全概述
Day-345-IoT 通信协议安全
Day-346-IoT 设备安全
Day-347-IoT 平台安全
Day-348-IoT 应用安全
Day-349-工业控制系统概述
Day-350-工控协议安全
Day-351-PLC 安全
Day-352-SCADA 系统安全
Day-353-工控安全防护
12-综合与总结
Day-354-安全职业发展路径
Day-355-安全技术趋势展望
Day-356-安全建设方法论
Day-357-经典攻防案例复盘
Day-358-安全学习资源指南
Day-359-信息安全行业求职指南
-
+
首页
Day-302-安全自动化脚本实战
# Day 323: 安全自动化脚本实战 - Python 脚本/API 集成/自动化工具 > 安全运维系列第 33 天 | 预计阅读时间:40 分钟 | 难度:★★★★☆ --- ## 清单 目录 1. [自动化概述](#自动化概述) 2. [Python 安全脚本](#python-安全脚本) 3. [API 集成实战](#api-集成实战) 4. [日志处理脚本](#日志处理脚本) 5. [自动化工具](#自动化工具) 6. [实战案例](#实战案例) 7. [总结与思考](#总结与思考) 8. [参考资料](#参考资料) --- ## 自动化概述 ### 自动化场景 **数据收集**: ``` 日志收集: - 系统日志 - 应用日志 - 安全日志 - 网络日志 威胁情报: - IOC 收集 - 漏洞信息 - 威胁报告 - 安全公告 ``` **数据分析**: ``` 日志分析: - 日志解析 - 模式识别 - 异常检测 - 关联分析 威胁分析: - IOC 匹配 - 威胁评分 - 攻击链分析 - 影响评估 ``` **响应处置**: ``` 自动响应: - IP 封禁 - 账户禁用 - 文件隔离 - 通知发送 报告生成: - 日报周报 - 事件报告 - 合规报告 - 审计报告 ``` ### 自动化价值 **效率提升**: ``` 时间节省: - 重复任务自动化 - 7x24 运行 - 并行处理 - 快速响应 质量提升: - 标准化流程 - 减少人为错误 - 完整记录 - 可追溯 ``` **成本降低**: ``` 人力成本: - 减少手动工作 - 优化人员配置 - 提高人效 运营成本: - 工具整合 - 流程优化 - 资源优化 ``` --- ## Python 安全脚本 ### 基础脚本 **日志解析脚本**: ```python #!/usr/bin/env python3 """ 日志解析脚本 解析系统日志,提取安全事件 """ import re from datetime import datetime class LogParser: def __init__(self, log_file): self.log_file = log_file self.patterns = { 'failed_login': r'Failed password for (\S+) from (\S+)', 'successful_login': r'Accepted password for (\S+) from (\S+)', 'sudo': r'sudo:\s+(\S+).*COMMAND=(\S+)', } def parse(self): events = [] with open(self.log_file, 'r') as f: for line in f: event = self.parse_line(line) if event: events.append(event) return events def parse_line(self, line): for event_type, pattern in self.patterns.items(): match = re.search(pattern, line) if match: return { 'type': event_type, 'timestamp': self.extract_timestamp(line), 'details': match.groups(), 'raw': line.strip() } return None def extract_timestamp(self, line): # 提取时间戳逻辑 return datetime.now().isoformat() # 使用示例 parser = LogParser('/var/log/auth.log') events = parser.parse() for event in events: print(f"{event['type']}: {event['details']}") ``` **威胁情报查询脚本**: ```python #!/usr/bin/env python3 """ 威胁情报查询脚本 查询多个 TI 平台的 IOC 信息 """ import requests import hashlib class ThreatIntel: def __init__(self, api_keys): self.api_keys = api_keys self.base_urls = { 'virustotal': 'https://www.virustotal.com/api/v3', 'abuseipdb': 'https://api.abuseipdb.com/api/v2', } def query_ip(self, ip): """查询 IP 信誉""" results = {} # VirusTotal vt_url = f"{self.base_urls['virustotal']}/ip_addresses/{ip}" vt_headers = {'x-apikey': self.api_keys['virustotal']} vt_response = requests.get(vt_url, headers=vt_headers) results['virustotal'] = vt_response.json() # AbuseIPDB abuse_url = f"{self.base_urls['abuseipdb']}/check" abuse_params = {'ipAddress': ip, 'maxAgeInDays': 90} abuse_headers = {'Key': self.api_keys['abuseipdb']} abuse_response = requests.get(abuse_url, params=abuse_params, headers=abuse_headers) results['abuseipdb'] = abuse_response.json() return results def query_hash(self, file_hash): """查询文件哈希""" vt_url = f"{self.base_urls['virustotal']}/files/{file_hash}" vt_headers = {'x-apikey': self.api_keys['virustotal']} vt_response = requests.get(vt_url, headers=vt_headers) return vt_response.json() # 使用示例 api_keys = { 'virustotal': 'YOUR_VT_API_KEY', 'abuseipdb': 'YOUR_ABUSE_API_KEY', } ti = ThreatIntel(api_keys) results = ti.query_ip('1.2.3.4') print(results) ``` ### 高级脚本 **自动化响应脚本**: ```python #!/usr/bin/env python3 """ 自动化响应脚本 根据告警自动执行响应动作 """ import requests import json from datetime import datetime class AutoResponder: def __init__(self, config): self.config = config self.session = requests.Session() self.session.headers.update({ 'Authorization': f"Bearer {config['api_token']}", 'Content-Type': 'application/json' }) def block_ip(self, ip, reason): """封禁 IP""" # 防火墙封禁 fw_response = self.session.post( f"{self.config['firewall_url']}/api/block", json={'ip': ip, 'reason': reason, 'duration': '24h'} ) # WAF 封禁 waf_response = self.session.post( f"{self.config['waf_url']}/api/blocklist", json={'ip': ip, 'reason': reason} ) return { 'firewall': fw_response.status_code == 200, 'waf': waf_response.status_code == 200 } def disable_user(self, username, reason): """禁用用户""" # AD 禁用 ad_response = self.session.post( f"{self.config['ad_url']}/api/disable", json={'username': username, 'reason': reason} ) return ad_response.status_code == 200 def send_notification(self, channel, message): """发送通知""" if channel == 'slack': response = self.session.post( self.config['slack_webhook'], json={'text': message} ) elif channel == 'email': response = self.session.post( f"{self.config['mail_url']}/api/send", json={'to': self.config['alert_email'], 'subject': 'Security Alert', 'body': message} ) return response.status_code == 200 def process_alert(self, alert): """处理告警""" actions = [] if alert['severity'] == 'critical': # 封禁攻击 IP if 'source_ip' in alert: result = self.block_ip(alert['source_ip'], alert['description']) actions.append({'action': 'block_ip', 'result': result}) # 禁用被攻陷账户 if 'compromised_user' in alert: result = self.disable_user(alert['compromised_user'], alert['description']) actions.append({'action': 'disable_user', 'result': result}) # 发送通知 message = f"Critical Alert: {alert['description']}" self.send_notification('slack', message) self.send_notification('email', message) return { 'alert_id': alert['id'], 'actions': actions, 'timestamp': datetime.now().isoformat() } # 使用示例 config = { 'api_token': 'YOUR_API_TOKEN', 'firewall_url': 'https://firewall.example.com', 'waf_url': 'https://waf.example.com', 'ad_url': 'https://ad.example.com', 'slack_webhook': 'https://hooks.slack.com/...', 'mail_url': 'https://mail.example.com', 'alert_email': 'soc@example.com' } responder = AutoResponder(config) alert = { 'id': 'ALERT-001', 'severity': 'critical', 'description': 'Brute force attack detected', 'source_ip': '1.2.3.4', 'compromised_user': 'admin' } result = responder.process_alert(alert) print(json.dumps(result, indent=2)) ``` --- ## API 集成实战 ### SIEM API 集成 **Splunk API**: ```python #!/usr/bin/env python3 """ Splunk API 集成脚本 执行搜索查询,获取告警 """ import requests import time from requests.auth import HTTPBasicAuth class SplunkClient: def __init__(self, config): self.base_url = config['url'] self.auth = HTTPBasicAuth(config['username'], config['password']) self.session = requests.Session() self.session.auth = self.auth self.session.verify = False # 生产环境应使用有效证书 def search(self, query, earliest='-1h', latest='now'): """执行搜索查询""" # 创建搜索任务 create_url = f"{self.base_url}/services/search/jobs" create_data = { 'search': f'search {query}', 'earliest_time': earliest, 'latest_time': latest, 'output_mode': 'json' } create_response = self.session.post(create_url, data=create_data) sid = create_response.json()['sid'] # 等待搜索完成 while True: status_url = f"{self.base_url}/services/search/jobs/{sid}" status_response = self.session.get(status_url) status = status_response.json()['entry'][0]['content'] if status['isDone']: break time.sleep(1) # 获取结果 results_url = f"{self.base_url}/services/search/jobs/{sid}/results" results_params = {'output_mode': 'json', 'count': 1000} results_response = self.session.get(results_url, params=results_params) return results_response.json()['results'] def get_alerts(self): """获取告警""" query = 'index=security NOT action=allowed | stats count by src_ip, action' return self.search(query) # 使用示例 config = { 'url': 'https://splunk.example.com:8089', 'username': 'admin', 'password': 'YOUR_PASSWORD' } splunk = SplunkClient(config) alerts = splunk.get_alerts() for alert in alerts: print(f"{alert['src_ip']}: {alert['action']} ({alert['count']})") ``` ### EDR API 集成 **CrowdStrike API**: ```python #!/usr/bin/env python3 """ CrowdStrike EDR API 集成 查询主机状态,执行响应动作 """ import requests class CrowdStrikeClient: def __init__(self, config): self.base_url = config['url'] self.client_id = config['client_id'] self.client_secret = config['client_secret'] self.token = None self.session = requests.Session() def authenticate(self): """获取访问令牌""" url = f"{self.base_url}/oauth2/token" data = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'client_credentials' } response = self.session.post(url, data=data) self.token = response.json()['access_token'] self.session.headers.update({'Authorization': f"Bearer {self.token}"}) def get_hosts(self, filter=None): """获取主机列表""" if not self.token: self.authenticate() url = f"{self.base_url}/devices/queries/devices-scroll/v1" params = {'filter': filter} if filter else {} response = self.session.get(url, params=params) return response.json() def contain_host(self, host_id): """隔离主机""" if not self.token: self.authenticate() url = f"{self.base_url}/containment/entities/containment/v1" data = {'ids': [host_id], 'action': 'contain'} response = self.session.post(url, json=data) return response.json() def run_command(self, host_id, command): """执行命令""" if not self.token: self.authenticate() url = f"{self.base_url}/real-time-response/entities/queued-commands/v1" data = { 'base_command': 'runscript', 'host_ids': [host_id], 'command_string': f"runscript -raw='{command}'" } response = self.session.post(url, json=data) return response.json() # 使用示例 config = { 'url': 'https://api.crowdstrike.com', 'client_id': 'YOUR_CLIENT_ID', 'client_secret': 'YOUR_CLIENT_SECRET' } cs = CrowdStrikeClient(config) hosts = cs.get_hosts() print(f"Found {len(hosts['resources'])} hosts") # 隔离可疑主机 # cs.contain_host('HOST_ID') ``` --- ## 日志处理脚本 ### 日志收集脚本 ```python #!/usr/bin/env python3 """ 日志收集脚本 从多个源收集日志,发送到 SIEM """ import logging import requests from logging.handlers import SysLogHandler class LogCollector: def __init__(self, config): self.config = config self.logger = logging.getLogger('LogCollector') self.logger.setLevel(logging.INFO) # 配置 Syslog 处理器 syslog_handler = SysLogHandler( address=(config['siem_host'], config['siem_port']), facility=SysLogHandler.LOG_LOCAL0 ) formatter = logging.Formatter( '%(asctime)s %(hostname)s %(program)s: %(message)s' ) syslog_handler.setFormatter(formatter) self.logger.addHandler(syslog_handler) def collect_system_logs(self): """收集系统日志""" with open('/var/log/syslog', 'r') as f: for line in f: self.logger.info(line.strip(), extra={ 'hostname': 'server01', 'program': 'syslog' }) def collect_auth_logs(self): """收集认证日志""" with open('/var/log/auth.log', 'r') as f: for line in f: self.logger.info(line.strip(), extra={ 'hostname': 'server01', 'program': 'auth' }) def collect_application_logs(self, log_file, program): """收集应用日志""" with open(log_file, 'r') as f: for line in f: self.logger.info(line.strip(), extra={ 'hostname': 'server01', 'program': program }) # 使用示例 config = { 'siem_host': 'siem.example.com', 'siem_port': 514 } collector = LogCollector(config) collector.collect_system_logs() collector.collect_auth_logs() collector.collect_application_logs('/var/log/nginx/access.log', 'nginx') ``` ### 日志分析脚本 ```python #!/usr/bin/env python3 """ 日志分析脚本 分析日志,检测异常 """ import re from collections import Counter from datetime import datetime, timedelta class LogAnalyzer: def __init__(self): self.failed_logins = Counter() self.successful_logins = Counter() self.threshold = 5 # 失败登录阈值 self.time_window = timedelta(minutes=5) def analyze_auth_log(self, log_file): """分析认证日志""" failed_pattern = r'Failed password for (\S+) from (\S+)' success_pattern = r'Accepted password for (\S+) from (\S+)' with open(log_file, 'r') as f: for line in f: # 失败登录 failed_match = re.search(failed_pattern, line) if failed_match: user, ip = failed_match.groups() self.failed_logins[ip] += 1 # 检查是否超过阈值 if self.failed_logins[ip] >= self.threshold: print(f"[ALERT] Brute force detected from {ip}") # 成功登录 success_match = re.search(success_pattern, line) if success_match: user, ip = success_match.groups() self.successful_logins[ip] += 1 # 检查失败后成功 (可能暴力破解成功) if self.failed_logins[ip] >= 3: print(f"[ALERT] Possible successful brute force from {ip}") def generate_report(self): """生成分析报告""" report = { 'timestamp': datetime.now().isoformat(), 'failed_logins': dict(self.failed_logins), 'successful_logins': dict(self.successful_logins), 'alerts': [] } # 添加告警 for ip, count in self.failed_logins.items(): if count >= self.threshold: report['alerts'].append({ 'type': 'brute_force', 'source_ip': ip, 'count': count }) return report # 使用示例 analyzer = LogAnalyzer() analyzer.analyze_auth_log('/var/log/auth.log') report = analyzer.generate_report() print(report) ``` --- ## 自动化工具 ### Ansible 安全自动化 **安全基线部署**: ```yaml # security_baseline.yml --- - name: Deploy Security Baseline hosts: all become: yes tasks: - name: Ensure password policy lineinfile: path: /etc/login.defs regexp: "^PASS_MAX_DAYS" line: "PASS_MAX_DAYS 90" - name: Ensure SSH security lineinfile: path: /etc/ssh/sshd_config regexp: "^PermitRootLogin" line: "PermitRootLogin no" - name: Ensure firewall enabled systemd: name: firewalld state: started enabled: yes - name: Install security updates yum: name: '*' state: latest security: yes ``` **应急响应剧本**: ```yaml # incident_response.yml --- - name: Incident Response hosts: "{{ target_host }}" become: yes tasks: - name: Isolate host from network command: iptables -I INPUT -j DROP - name: Collect evidence archive: path: - /var/log/ - /tmp/ dest: /evidence/evidence_{{ ansible_date_time.iso8601 }}.tar.gz - name: Send notification uri: url: "{{ slack_webhook }}" method: POST body_format: json body: text: "Host {{ target_host }} isolated due to security incident" ``` ### SOAR Playbook **钓鱼邮件响应 Playbook**: ```yaml # phishing_response.yml --- playbook: name: "Phishing Email Response" version: "1.0" triggers: - alert_type: "phishing_email" steps: - name: "Extract Indicators" action: "email_parse" params: email_id: "${alert.email_id}" - name: "Threat Intel Lookup" parallel: - action: "virustotal_lookup" params: indicators: "${email_data.indicators}" - action: "urlhaus_lookup" params: urls: "${email_data.urls}" - name: "Risk Assessment" action: "risk_scoring" params: ti_score: "${ti_result.score}" - name: "Decision" type: "switch" on: "${risk_score}" cases: - condition: ">= 80" playbook: "high_risk_response" - condition: ">= 50" playbook: "medium_risk_response" - default: playbook: "low_risk_response" ``` --- ## 实战案例 ### 案例 1: 自动化威胁情报收集 **项目概况**: ``` 目标:自动化收集威胁情报 数据源:10+ TI 平台 输出:标准化 IOC 格式 频率:每小时 ``` **实施方案**: ```python #!/usr/bin/env python3 """ 自动化威胁情报收集脚本 """ import requests import json from datetime import datetime class TICollector: def __init__(self, config): self.config = config self.iocs = {'ips': [], 'domains': [], 'hashes': []} def collect_from_virustotal(self): """从 VirusTotal 收集""" # 实现逻辑 pass def collect_from_abuseipdb(self): """从 AbuseIPDB 收集""" # 实现逻辑 pass def normalize_ioc(self, ioc): """标准化 IOC 格式""" return { 'type': ioc['type'], 'value': ioc['value'], 'confidence': ioc['confidence'], 'source': ioc['source'], 'timestamp': datetime.now().isoformat() } def save_to_siem(self): """保存到 SIEM""" # 实现逻辑 pass # 定时执行 # cron: 0 * * * * /usr/bin/python3 /opt/scripts/ti_collector.py ``` ### 案例 2: 自动化事件响应 **项目概况**: ``` 目标:自动化常见事件响应 场景:暴力破解、恶意软件、数据泄露 响应时间:< 5 分钟 ``` **实施方案**: ``` 触发条件: - SIEM 告警 - EDR 检测 - 用户报告 自动响应: - IP 封禁 - 账户禁用 - 文件隔离 - 通知发送 人工确认: - 高风险决策 - 不可逆操作 - 升级处置 ``` --- ## 总结与思考 ### 核心要点回顾 1. **自动化场景**:数据收集、数据分析、响应处置 2. **Python 脚本**:日志解析、威胁情报、自动响应 3. **API 集成**:SIEM API、EDR API、防火墙 API 4. **日志处理**:日志收集、日志分析、异常检测 5. **自动化工具**:Ansible、SOAR Playbook 6. **实战案例**:TI 收集、事件响应 ### 深入思考 **自动化挑战**: - 工具集成复杂 - 误报处理困难 - 维护成本高 - 技能要求高 **发展方向**: - 低代码平台 - AI 驱动自动化 - 云原生自动化 - 标准化接口 ### 最佳实践 **开发建议**: - 代码版本控制 - 完整文档 - 错误处理 - 日志记录 **运营建议**: - 定期测试 - 性能监控 - 持续优化 - 知识管理 --- ## 参考资料 ### 学习资源 - **Python Security Programming**: https://nostarch.com/pythonsecurity - **Automating Security Operations**: SANS 课程 - **SOAR Best Practices**: Gartner 报告 ### 工具资源 - **Requests**: https://requests.readthedocs.io - **Ansible**: https://www.ansible.com - **Cortex XSOAR**: https://www.paloaltonetworks.com/cortex/xsoar --- *Day 323 完成 | 安全自动化脚本实战详解 | 字数:约 20,000 字 (新增)* *安全运维系列 33 篇完整教程完结!完成*
myh0st
2026年4月13日 23:21
分享文档
收藏文档
上一篇
下一篇
微信扫一扫
复制链接
手机扫一扫进行分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码