Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/macaron/console.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2025 - 2026, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module implements a rich console handler for logging."""
Expand Down Expand Up @@ -306,6 +306,7 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None:
self.policies_table = Table(box=None)
self.components_violates_table = Table(box=None)
self.components_satisfy_table = Table(box=None)
self.policy_evidence_table = Table(box=None)
self.policy_summary_table = Table(show_header=False, box=None)
self.policy_summary: dict[str, str | Status] = {
"Passed Policies": "None",
Expand Down Expand Up @@ -537,7 +538,7 @@ def update_policy_engine(self, results: dict) -> None:
components_violates_table.add_column("PURL", justify="left")
components_violates_table.add_column("Policy Name", justify="left")

for values in results["component_violates_policy"]:
for values in results.get("component_violates_policy", []):
components_violates_table.add_row(values[0], values[1], values[2])

self.components_violates_table = components_violates_table
Expand All @@ -547,16 +548,37 @@ def update_policy_engine(self, results: dict) -> None:
components_satisfy_table.add_column("PURL", justify="left")
components_satisfy_table.add_column("Policy Name", justify="left")

for values in results["component_satisfies_policy"]:
for values in results.get("component_satisfies_policy", []):
components_satisfy_table.add_row(values[0], values[1], values[2])

self.components_satisfy_table = components_satisfy_table

standard_outputs = {
"passed_policies",
"failed_policies",
"component_satisfies_policy",
"component_violates_policy",
}
policy_evidence_table = Table(box=None)
policy_evidence_table.add_column("Relation", justify="left")
policy_evidence_table.add_column("Evidence", justify="left")
for relation, rows in sorted(results.items()):
if relation in standard_outputs:
continue
for values in rows:
policy_evidence_table.add_row(relation, " | ".join(values))

self.policy_evidence_table = policy_evidence_table

self.policy_summary["Passed Policies"] = (
"\n".join(policy[0] for policy in results["passed_policies"]) if results["passed_policies"] else "None"
"\n".join(policy[0] for policy in results.get("passed_policies", []))
if results.get("passed_policies", [])
else "None"
)
self.policy_summary["Failed Policies"] = (
"\n".join(policy[0] for policy in results["failed_policies"]) if results["failed_policies"] else "None"
"\n".join(policy[0] for policy in results.get("failed_policies", []))
if results.get("failed_policies", [])
else "None"
)

self.generate_policy_summary_table()
Expand Down Expand Up @@ -756,6 +778,12 @@ def make_layout(self) -> Group:
"[bold red] Components Violate Policy[/] [white not italic]None[/]",
]
layout = layout + ["", self.policy_summary_table]
if self.policy_evidence_table.row_count > 0:
layout = layout + [
"",
"[bold yellow] Policy Evidence[/]",
self.policy_evidence_table,
]
if self.verification_summary_attestation:
vsa_table = Table(show_header=False, box=None)
vsa_table.add_column("Detail", justify="left")
Expand Down
5 changes: 2 additions & 3 deletions src/macaron/dependency_analyzer/cyclonedx.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2026, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains helper functions to process CycloneDX SBOM."""
Expand Down Expand Up @@ -357,10 +357,9 @@ def resolve_dependencies(main_ctx: Any, sbom_path: str, recursive: bool = False)

# Start resolving dependencies.
logger.info(
"Running %s version %s dependency analyzer on %s",
"Running %s version %s dependency analyzer.",
dep_analyzer.tool_name,
dep_analyzer.tool_version,
find_report_output_path(main_ctx.component.repository.fs_path),
)

log_path = os.path.join(
Expand Down
175 changes: 175 additions & 0 deletions src/macaron/output_reporter/malware_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright (c) 2026 - 2026, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Helpers for building user-friendly malware findings for output reports."""

from __future__ import annotations

from typing import Any

from macaron.slsa_analyzer.checks.check_result import CheckResult, CheckResultType

MALICIOUS_METADATA_CHECK_ID = "mcn_detect_malicious_metadata_1"

_HEURISTIC_REASONS = {
"empty_project_link": "Package metadata has no useful project links.",
"source_code_repo": "Macaron could not identify a matching source repository.",
"one_release": "The package has only one release.",
"high_release_frequency": "The package has an unusual release cadence.",
"unchanged_release": "Multiple releases appear to contain unchanged code.",
"closer_release_join_date": "A maintainer joined close to the release time.",
"suspicious_setup": "The package contains suspicious setup/install behavior.",
"wheel_absence": "The package omits a wheel, which can force source install behavior.",
"anomalous_version": "The package version looks unusual for the package history.",
"typosquatting_presence": "The package name resembles another package.",
"fake_email": "Maintainer metadata contains suspicious email information.",
"similar_projects": "Macaron found similarly structured package projects.",
"package_description_intent": "The package description does not clearly explain benign intent.",
"type_stub_file": "The package does not look like a legitimate type-stub package.",
"suspicious_patterns": "Source code analysis found suspicious Python malware patterns.",
}

_CATEGORY_KEYWORDS = (
("Command and control", ("c2", "domain", "url", "endpoint", "network", "exfil")),
("Payload staging", ("payload", "download", "rope", "pyz", "dropper")),
("Persistence", ("persist", "systemd", "service", "unit")),
("Credential access", ("credential", "secret", "token", "aws", "azure", "gcp", "kubernetes", "vault", "ssh")),
("Suspicious execution", ("obfuscation", "inline", "import", "exec", "eval", "system", "subprocess", "command")),
)


def _display_value(value: Any) -> Any:
"""Return a JSON-serializable display value for enums and primitive values."""
if hasattr(value, "value"):
return value.value
return value


def _confidence_value(value: Any) -> float | None:
"""Return a primitive confidence score."""
display_value = _display_value(value)
if isinstance(display_value, (int, float)):
return float(display_value)
return None


def _categorize_source_finding(rule_id: str, message: str) -> str:
"""Map a source-code rule finding to a user-facing behavior category."""
haystack = f"{rule_id} {message}".lower()
for category, keywords in _CATEGORY_KEYWORDS:
if any(keyword in haystack for keyword in keywords):
return category
return "Suspicious source pattern"


def _build_failed_heuristics(result: dict[Any, Any]) -> list[dict[str, str]]:
"""Build a readable list of failed heuristics."""
failed_heuristics = []
for heuristic, heuristic_result in result.items():
heuristic_name = str(_display_value(heuristic))
status = str(_display_value(heuristic_result))
if status != "FAIL":
continue
failed_heuristics.append(
{
"name": heuristic_name,
"reason": _HEURISTIC_REASONS.get(heuristic_name, "This heuristic flagged suspicious package behavior."),
}
)
return failed_heuristics


def _build_behavioral_findings(detail_information: dict[str, Any]) -> list[dict[str, Any]]:
"""Build behavior-oriented findings from source-code rule evidence."""
source_findings = detail_information.get("enabled_sourcecode_rule_findings")
if not isinstance(source_findings, dict):
return []

grouped_findings: dict[str, dict[str, Any]] = {}
for rule_id, finding in source_findings.items():
if not isinstance(finding, dict):
continue
message = finding.get("message", "Source-code rule matched suspicious code.")
if not isinstance(message, str):
message = "Source-code rule matched suspicious code."

category = _categorize_source_finding(rule_id, message)
grouped = grouped_findings.setdefault(
category,
{
"category": category,
"reason": message,
"rules": [],
"evidence": [],
},
)
grouped["rules"].append(rule_id)
detections = finding.get("detections", [])
if isinstance(detections, list):
grouped["evidence"].extend(detection for detection in detections if isinstance(detection, dict))

return list(grouped_findings.values())


def _recommended_actions(verdict: str, behavioral_findings: list[dict[str, Any]]) -> list[str]:
"""Return concise next steps for a malware finding."""
if verdict == "no_suspicious_indicators":
return []

actions = [
"Block the package from promotion until the finding is reviewed.",
"Send the package artifact to dynamic analysis if available.",
]
categories = {finding["category"] for finding in behavioral_findings}
if "Credential access" in categories:
actions.append("Investigate whether systems that imported this package exposed credentials.")
if "Command and control" in categories:
actions.append("Review network controls for the reported domains or endpoints.")
return actions


def build_malware_summary(check_result: CheckResult) -> dict[str, Any] | None:
"""Build a user-friendly malware summary for the malicious metadata check."""
if check_result.check.check_id != MALICIOUS_METADATA_CHECK_ID:
return None
if check_result.result.result_type != CheckResultType.FAILED:
return None

summaries = []
for result_table in check_result.result.result_tables:
result = getattr(result_table, "result", {}) or {}
detail_information = getattr(result_table, "detail_information", {}) or {}
known_malware = getattr(result_table, "known_malware", None)
confidence = _confidence_value(getattr(result_table, "confidence", None))

failed_heuristics = _build_failed_heuristics(result)
behavioral_findings = _build_behavioral_findings(detail_information)

if known_malware:
verdict = "known_malware"
primary_reason = f"Package is listed as known malware: {known_malware}"
else:
verdict = "suspicious"
if behavioral_findings:
primary_reason = "Source code contains suspicious Python malware patterns."
elif failed_heuristics:
primary_reason = "Package metadata matches suspicious malware heuristics."
else:
primary_reason = "Macaron's malicious package check failed."

summaries.append(
{
"check_id": check_result.check.check_id,
"verdict": verdict,
"confidence": confidence,
"primary_reason": primary_reason,
"known_malware": known_malware,
"failed_heuristics": failed_heuristics,
"behavioral_findings": behavioral_findings,
"recommended_actions": _recommended_actions(verdict, behavioral_findings),
}
)

if not summaries:
return None
return summaries[0]
Loading
Loading