-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFEC_Optimized.py
More file actions
289 lines (242 loc) · 10.5 KB
/
FEC_Optimized.py
File metadata and controls
289 lines (242 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
"""
IndustrialFEC - Universal Fractal Compression Engine v1.0
Optimized for both Colab and local execution with intelligent resource management
"""
import sys
import os
import time
import pickle
import zstandard as zstd
import brotli
import lzma
import hashlib
import platform
import psutil
from collections import deque
from datetime import datetime
from typing import Dict, List, Tuple
try:
import matplotlib.pyplot as plt
import seaborn as sns
PLOT_AVAILABLE = True
except ImportError:
PLOT_AVAILABLE = False
class IndustrialFEC:
def __init__(self, preset: str = 'balanced'):
self.presets = {
'text': {'block': 128, 'window': 8192, 'layers': ['zstd']},
'binary': {'block': 256, 'window': 16384, 'layers': ['brotli']},
'balanced': {'block': 192, 'window': 12288, 'layers': ['zstd', 'lzma']},
'extreme': {'block': 64, 'window': 65536, 'layers': ['lzma', 'brotli']}
}
self._apply_preset(preset)
self._safety_check()
self.stats = self._init_stats()
self.compressed = None
def _apply_preset(self, preset_name: str):
"""Load optimization profile"""
config = self.presets[preset_name]
self.block_size = config['block']
self.window_size = config['window']
self.compression_layers = config['layers']
def _safety_check(self):
"""Prevent system resource exhaustion"""
if platform.system() != 'Linux':
self.MAX_MEM_RATIO = 50 # 50x input size
self.MIN_MEM_AVAIL = 100 # 100MB minimum free
def _init_stats(self):
"""Initialize performance metrics"""
return {
'start_time': datetime.now(),
'original_size': 0,
'compressed_size': 0,
'patterns_found': 0,
'memory_peaks': [],
'phase_times': {
'pattern_find': 0.0,
'residual_build': 0.0,
'compress': 0.0
}
}
def _mem_guard(self, current_size: int):
"""Check memory constraints"""
if platform.system() == 'Linux':
return
process = psutil.Process()
mem_usage = process.memory_info().rss / 1024**2 # MB
input_size = self.stats['original_size'] / 1024**2
if mem_usage > input_size * self.MAX_MEM_RATIO:
raise MemoryError(f"Memory limit exceeded ({mem_usage:.1f}MB for {input_size:.1f}MB input)")
if psutil.virtual_memory().available < self.MIN_MEM_AVAIL * 1024**2:
raise MemoryError("System memory critically low")
def _find_patterns(self, data: bytes) -> List[Tuple[int, int, int]]:
"""Adaptive pattern detection with memory safeguards"""
patterns = []
view = memoryview(data)
block_cache = deque(maxlen=self.window_size//self.block_size)
hash_table = {}
start_time = time.perf_counter()
for i in range(0, len(data) - self.block_size + 1, self.block_size):
block = view[i:i+self.block_size]
digest = hashlib.blake2b(block, digest_size=4).digest()
if digest in hash_table and (i - hash_table[digest]) <= self.window_size:
patterns.append((i, hash_table[digest], self.block_size))
self.stats['patterns_found'] += 1
# Update cache with LRU strategy
if digest in block_cache:
block_cache.remove(digest)
block_cache.append(digest)
hash_table[digest] = i
# Memory check every 1000 blocks
if i % (self.block_size * 1000) == 0:
self._mem_guard(len(data))
self.stats['memory_peaks'].append(psutil.Process().memory_info().rss / 1024**2)
self.stats['phase_times']['pattern_find'] = time.perf_counter() - start_time
return patterns
def _build_residual(self, data: bytes, patterns: list) -> bytes:
"""Construct residual data with progress tracking"""
residual = bytearray()
last_pos = 0
start_time = time.perf_counter()
for i, j, bs in patterns:
residual.extend(data[last_pos:i])
last_pos = i + bs
residual.extend(data[last_pos:])
self.stats['phase_times']['residual_build'] = time.perf_counter() - start_time
return bytes(residual)
def _compress_residual(self, residual: bytes) -> bytes:
"""Multi-layer parallel compression"""
results = {}
start_time = time.perf_counter()
# Parallel compression candidates
if 'zstd' in self.compression_layers:
results['zstd'] = zstd.compress(residual)
if 'brotli' in self.compression_layers:
results['brotli'] = brotli.compress(residual)
if 'lzma' in self.compression_layers:
results['lzma'] = lzma.compress(residual)
# Select best compression
best_layer = min(results, key=lambda k: len(results[k]))
self.stats['phase_times']['compress'] = time.perf_counter() - start_time
return results[best_layer], best_layer
def compress(self, data: bytes) -> Dict:
"""Main compression pipeline"""
self.stats = self._init_stats()
self.stats['original_size'] = len(data)
if self._is_precompressed(data):
print("File is already compressed - using pass-through")
return data
patterns = self._find_patterns(data)
residual = self._build_residual(data, patterns)
compressed_residual, best_layer = self._compress_residual(residual)
self.compressed = {
'patterns': patterns,
'residual': compressed_residual,
'metadata': {
'original_size': len(data),
'block_size': self.block_size,
'hash': hashlib.sha3_256(data).hexdigest(),
'system': platform.platform(),
'best_layer': best_layer
}
}
self.stats['compressed_size'] = len(pickle.dumps(self.compressed))
return self.compressed
def _is_precompressed(self, data: bytes) -> bool:
"""Detect common compressed formats"""
magic_numbers = {
b'\x50\x4B\x03\x04': 'zip',
b'\x1F\x8B\x08': 'gzip',
b'\x42\x5A\x68': 'bzip2',
b'\xFD7zXZ': 'xz'
}
return any(data.startswith(magic) for magic in magic_numbers)
def analyze(self) -> Dict:
"""Performance analysis report"""
total_time = sum(self.stats['phase_times'].values())
return {
'compression_ratio': self.stats['original_size'] / self.stats['compressed_size'],
'patterns_per_mb': self.stats['patterns_found'] / (self.stats['original_size'] / 1024**2),
'memory_peak': max(self.stats['memory_peaks']) if self.stats['memory_peaks'] else 0,
'time_total': (datetime.now() - self.stats['start_time']).total_seconds(),
'phase_breakdown': {k: f"{v/total_time:.1%}" for k, v in self.stats['phase_times'].items()},
'best_layer': self.compressed['metadata']['best_layer']
}
def visualize(self):
"""Interactive performance dashboard"""
if not PLOT_AVAILABLE:
print("Visualization requires matplotlib and seaborn")
return
plt.figure(figsize=(15, 8))
# Compression Ratio Comparison
plt.subplot(2, 2, 1)
labels = ['Original', 'FEC Compressed']
sizes = [self.stats['original_size'], self.stats['compressed_size']]
plt.bar(labels, sizes, color=['red', 'green'])
plt.title(f"Size Reduction ({self.stats['original_size']/self.stats['compressed_size']:.1f}x)")
plt.ylabel('Bytes')
# Phase Time Distribution
plt.subplot(2, 2, 2)
phases = list(self.stats['phase_times'].keys())
times = [self.stats['phase_times'][p] for p in phases]
plt.pie(times, labels=phases, autopct='%1.1f%%', startangle=90)
plt.title('Processing Time Distribution')
# Memory Timeline
plt.subplot(2, 2, 3)
plt.plot(self.stats['memory_peaks'], marker='o')
plt.title('Memory Usage During Compression')
plt.xlabel('Checkpoints')
plt.ylabel('Memory (MB)')
# Layer Comparison
plt.subplot(2, 2, 4)
layers = ['zstd', 'brotli', 'lzma']
sizes = [
len(zstd.compress(b'test')) if 'zstd' in layers else 0,
len(brotli.compress(b'test')),
len(lzma.compress(b'test'))
]
plt.barh(layers, sizes)
plt.title('Compression Layer Efficiency')
plt.tight_layout()
plt.show()
def main(file_path: str):
"""Universal entry point for Colab and CLI"""
try:
fec = IndustrialFEC(preset='balanced')
with open(file_path, 'rb') as f:
data = f.read()
print(f"🔧 Compressing {len(data)/1024**2:.2f} MB file...")
start_time = time.time()
compressed = fec.compress(data)
output_path = f"{file_path}.fec"
with open(output_path, 'wb') as f:
pickle.dump(compressed, f)
report = fec.analyze()
print("\n📈 Compression Report")
print(f"Ratio: {report['compression_ratio']:.1f}x")
print(f"Peak Memory: {report['memory_peak']:.1f} MB")
print(f"Time: {report['time_total']:.1f}s")
print(f"Best Layer: {report['best_layer'].upper()}")
print("\nPhase Efficiency:")
for phase, eff in report['phase_breakdown'].items():
print(f" {phase.replace('_', ' ').title():<15} {eff}")
if PLOT_AVAILABLE:
fec.visualize()
print(f"\n✅ Saved compressed file to {output_path}")
except MemoryError as e:
print(f"🚨 Memory Error: {str(e)}")
print("Try: Reduce file size or use 'extreme' preset")
except Exception as e:
print(f"❌ Error: {str(e)}")
if __name__ == "__main__":
if 'google.colab' in sys.modules:
from google.colab import files
print("Google Colab Mode - Upload your file")
uploaded = files.upload()
file_path = next(iter(uploaded))
main(file_path)
else:
if len(sys.argv) != 2:
print("Usage: python industrial_fec_pro.py <file>")
sys.exit(1)
main(sys.argv[1])