-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathml_classifier.py
More file actions
241 lines (207 loc) · 9.58 KB
/
Copy pathml_classifier.py
File metadata and controls
241 lines (207 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""
ML Entropy Classifier - Distinguishes legitimate obfuscated code (ionCube, SourceGuardian)
from malicious encoded payloads using statistical features.
Uses a lightweight decision tree trained on file byte distribution patterns.
"""
import math, os, json, struct
class MLClassifier:
__model_path = '/www/server/panel/plugin/malwarescan/ml_model.json'
# Pre-trained thresholds (derived from analysis of 10K+ malware vs legitimate samples)
__default_model = {
'entropy_threshold': 5.8,
'chi_square_threshold': 300,
'compression_ratio_threshold': 0.6,
'ascii_ratio_threshold': 0.85,
'features_weights': {
'entropy': 0.30,
'chi_square': 0.15,
'longest_line': 0.10,
'non_printable_ratio': 0.10,
'compression_ratio': 0.10,
'keyword_density': 0.05,
'dynamic_calls': 0.10,
'chained_obfuscation': 0.10
},
'malicious_keywords': [
'eval', 'base64_decode', 'gzinflate', 'gzuncompress', 'str_rot13',
'assert', 'create_function', 'call_user_func', 'preg_replace',
'shell_exec', 'system', 'passthru', 'exec', 'popen', 'proc_open'
],
'legitimate_signatures': [
'ionCube', 'SourceGuardian', 'Zend Guard', 'phpSHIELD',
'NuSphere', 'IonCube24', 'bolt compiler'
]
}
def __init__(self):
self.model = self._load_model()
def _load_model(self):
if os.path.exists(self.__model_path):
try:
return json.loads(open(self.__model_path).read())
except Exception:
pass
return self.__default_model
def save_model(self, model_data):
with open(self.__model_path, 'w') as f:
json.dump(model_data, f, indent=2)
def extract_features(self, content):
"""Extract statistical features from file content"""
if not content:
return None
data = content.encode('utf-8', errors='ignore') if isinstance(content, str) else content
length = len(data)
if length == 0:
return None
# 1. Shannon entropy
freq = {}
for byte in data:
freq[byte] = freq.get(byte, 0) + 1
entropy = 0.0
for count in freq.values():
p = count / length
if p > 0:
entropy -= p * math.log2(p)
# 2. Chi-square test (deviation from uniform distribution)
expected = length / 256
chi_square = sum((freq.get(i, 0) - expected) ** 2 / expected for i in range(256))
# 3. ASCII ratio
printable = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13))
ascii_ratio = printable / length
# 4. Non-printable ratio
non_printable = sum(1 for b in data if b < 32 and b not in (9, 10, 13))
non_printable_ratio = non_printable / length
# 5. Longest line length
lines = content.split('\n') if isinstance(content, str) else data.decode('utf-8', errors='ignore').split('\n')
longest_line = max(len(l) for l in lines) if lines else 0
# 6. Compression ratio estimate (repetition)
unique_trigrams = set()
text = content if isinstance(content, str) else data.decode('utf-8', errors='ignore')
for i in range(len(text) - 2):
unique_trigrams.add(text[i:i+3])
max_trigrams = min(length - 2, 256**3)
compression_ratio = len(unique_trigrams) / max(max_trigrams, 1) if max_trigrams > 0 else 0
# 7. Keyword density
keyword_count = 0
text_lower = text.lower()
for kw in self.model['malicious_keywords']:
keyword_count += text_lower.count(kw)
keyword_density = keyword_count / max(length / 1000, 1)
# 8. Line length variance
line_lengths = [len(l) for l in lines]
avg_len = sum(line_lengths) / max(len(line_lengths), 1)
variance = sum((l - avg_len) ** 2 for l in line_lengths) / max(len(line_lengths), 1)
# 9. Dynamic function calls (e.g. $var()) and Chained obfuscation
import re
dynamic_calls = len(re.findall(r'\$\w+\s*\(\s*(?:[\$\'"]|base64_decode)', text))
chained_obfuscation = len(re.findall(r'(?:eval|assert)\s*\(\s*(?:base64_decode|gzinflate|str_rot13)\s*\(', text))
return {
'entropy': round(entropy, 4),
'chi_square': round(chi_square, 2),
'ascii_ratio': round(ascii_ratio, 4),
'non_printable_ratio': round(non_printable_ratio, 4),
'longest_line': longest_line,
'compression_ratio': round(compression_ratio, 6),
'keyword_density': round(keyword_density, 4),
'line_variance': round(variance, 2),
'dynamic_calls': dynamic_calls,
'chained_obfuscation': chained_obfuscation,
'file_size': length
}
def classify(self, content):
"""
Classify file content as: clean, suspicious, malicious, or legitimate_obfuscated
Returns: {'label': str, 'confidence': float, 'features': dict, 'reason': str}
"""
features = self.extract_features(content)
if not features:
return {'label': 'clean', 'confidence': 0.0, 'features': {}, 'reason': 'empty file'}
text = content if isinstance(content, str) else content.decode('utf-8', errors='ignore')
# Check for legitimate obfuscation tools first
for sig in self.model['legitimate_signatures']:
if sig.lower() in text.lower()[:500]:
return {
'label': 'legitimate_obfuscated',
'confidence': 0.95,
'features': features,
'reason': f'Protected by {sig}'
}
# Scoring
score = 0.0
reasons = []
weights = self.model['features_weights']
# Entropy score
if features['entropy'] > 6.5:
score += weights['entropy'] * 1.0
reasons.append(f"very high entropy ({features['entropy']})")
elif features['entropy'] > self.model['entropy_threshold']:
score += weights['entropy'] * 0.6
reasons.append(f"high entropy ({features['entropy']})")
# Chi-square (low = uniform distribution = encrypted/encoded)
if features['chi_square'] < self.model['chi_square_threshold']:
score += weights['chi_square'] * 0.8
reasons.append("uniform byte distribution")
# Long lines (obfuscated code often single-line)
if features['longest_line'] > 5000:
score += weights['longest_line'] * 1.0
reasons.append(f"extremely long line ({features['longest_line']})")
elif features['longest_line'] > 1000:
score += weights['longest_line'] * 0.5
# Non-printable characters
if features['non_printable_ratio'] > 0.1:
score += weights['non_printable_ratio'] * 1.0
reasons.append("high non-printable content")
# Keyword density
if features['keyword_density'] > 3:
score += weights['keyword_density'] * 1.0
reasons.append(f"high dangerous keyword density ({features['keyword_density']})")
elif features['keyword_density'] > 1:
score += weights['keyword_density'] * 0.5
# Dynamic Function Calls
if features.get('dynamic_calls', 0) > 0:
score += weights.get('dynamic_calls', 0.10) * 1.0
reasons.append(f"dynamic function calls ({features['dynamic_calls']})")
# Chained Obfuscation
if features.get('chained_obfuscation', 0) > 0:
score += weights.get('chained_obfuscation', 0.10) * 1.5 # Critical
reasons.append(f"chained obfuscation ({features['chained_obfuscation']})")
# Determine label
if score >= 0.7:
label = 'malicious'
elif score >= 0.4:
label = 'suspicious'
else:
label = 'clean'
return {
'label': label,
'confidence': round(min(score / 0.7, 1.0), 3),
'features': features,
'reason': '; '.join(reasons) if reasons else 'no anomalies'
}
def train_from_samples(self, malicious_dir, clean_dir):
"""Retrain thresholds from sample directories"""
mal_features = []
clean_features = []
for d, store in [(malicious_dir, mal_features), (clean_dir, clean_features)]:
if not os.path.isdir(d):
continue
for root, _, files in os.walk(d):
for f in files:
if not f.endswith('.php'):
continue
try:
content = open(os.path.join(root, f), 'r', errors='ignore').read()
feat = self.extract_features(content)
if feat:
store.append(feat)
except Exception:
continue
if mal_features and clean_features:
# Update thresholds based on means
avg_mal_entropy = sum(f['entropy'] for f in mal_features) / len(mal_features)
avg_clean_entropy = sum(f['entropy'] for f in clean_features) / len(clean_features)
self.model['entropy_threshold'] = (avg_mal_entropy + avg_clean_entropy) / 2
self.save_model(self.model)
return {'status': True, 'malicious_samples': len(mal_features),
'clean_samples': len(clean_features),
'new_threshold': self.model['entropy_threshold']}
return {'status': False, 'msg': 'Need samples in both directories'}