-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.py
More file actions
132 lines (102 loc) · 3.96 KB
/
monitor.py
File metadata and controls
132 lines (102 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import argparse
import os
import platform
import time
from datetime import datetime
import psutil
from Process_OS.windows_process import WHITELIST as WINDOWS_WHITELIST
from Process_OS.windows_process import WHITELIST_PID as WINDOWS_WHITELIST_PID
from Process_OS.linux_process import WHITELIST as LINUX_WHITELIST
from Process_OS.unix_process import WHITELIST as UNIX_WHITELIST
def get_os_policy():
os_name = platform.system().lower()
if os_name == "windows":
whitelist = set(WINDOWS_WHITELIST)
whitelist_pid = set(WINDOWS_WHITELIST_PID)
normalizer = lambda name: name
return "windows", whitelist, whitelist_pid, normalizer
if os_name == "linux":
whitelist = set(name.lower().strip() for name in LINUX_WHITELIST)
whitelist_pid = {1}
normalizer = lambda name: (name or "").lower().strip()
return "linux", whitelist, whitelist_pid, normalizer
if os_name == "darwin":
whitelist = set(os.path.basename(name).lower().strip() for name in UNIX_WHITELIST)
whitelist_pid = {1}
normalizer = lambda name: os.path.basename(name or "").lower().strip()
return "darwin", whitelist, whitelist_pid, normalizer
raise RuntimeError(f"Unsupported operating system: {os_name}")
def detect_blocked_processes(whitelist, whitelist_pid, normalizer):
blocked = []
for proc in psutil.process_iter(["pid", "name", "username"]):
try:
pid = proc.info["pid"]
name = proc.info["name"] or ""
user = proc.info.get("username") or "unknown"
if pid in whitelist_pid:
continue
normalized_name = normalizer(name)
if normalized_name in whitelist:
continue
blocked.append(
{
"pid": pid,
"name": name,
"normalized_name": normalized_name,
"user": user,
"time": datetime.now().strftime("%H:%M:%S"),
"reason": "Process not in whitelist",
}
)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return blocked
def monitor(interval_seconds=1.0, once=False):
os_name, whitelist, whitelist_pid, normalizer = get_os_policy()
print(f"Monitoring OS: {os_name}")
print(f"Scan interval: {interval_seconds:.2f}s")
print("Alert mode only: processes are NOT terminated.")
alerted_names = set()
while True:
blocked = detect_blocked_processes(whitelist, whitelist_pid, normalizer)
current_blocked_names = set()
for proc in blocked:
blocked_name_key = proc["normalized_name"]
current_blocked_names.add(blocked_name_key)
if blocked_name_key in alerted_names:
continue
print(
f"{proc['time']} PID={proc['pid']:<6} User={proc['user']:<12}\tName={proc['name']}"
#f"User={proc['user']} | Reason={proc['reason']}"
)
alerted_names.add(blocked_name_key)
alerted_names.intersection_update(current_blocked_names)
if once:
break
time.sleep(interval_seconds)
def parse_args():
parser = argparse.ArgumentParser(
description="Monitor running processes against OS whitelist and alert to stdout."
)
parser.add_argument(
"--interval",
type=float,
default=1.0,
help="Seconds between scans (default: 1.0)",
)
parser.add_argument(
"--once",
action="store_true",
help="Run a single scan and exit.",
)
return parser.parse_args()
def main():
args = parse_args()
if args.interval <= 0:
raise ValueError("--interval must be greater than 0")
try:
monitor(interval_seconds=args.interval, once=args.once)
except KeyboardInterrupt:
print("\nMonitoring stopped by user.")
if __name__ == "__main__":
main()