Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/macaron/code_analyzer/dataflow_analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

from __future__ import annotations

import os
from collections.abc import Iterable
from typing import cast

from macaron.code_analyzer.dataflow_analysis import bash, core, evaluation, facts, github, printing
from macaron.errors import CallGraphError
Expand Down Expand Up @@ -51,7 +53,11 @@ def analyse_github_workflow_file(workflow_path: str, repo_path: str | None, dump


def analyse_github_workflow(
workflow: github_workflow_model.Workflow, workflow_source_path: str, repo_path: str | None, dump_debug: bool = False
workflow: github_workflow_model.Workflow,
workflow_source_path: str,
repo_path: str | None,
dump_debug: bool = False,
local_action_stack: tuple[str, ...] = (),
) -> core.Node:
"""Perform dataflow analysis for GitHub Actions Workflow.

Expand All @@ -75,7 +81,9 @@ def analyse_github_workflow(
analysis_context = core.OwningContextRef(core.AnalysisContext(repo_path))

core.reset_debug_sequence_number()
raw_workflow_node = github.RawGitHubActionsWorkflowNode.create(workflow, analysis_context, workflow_source_path)
raw_workflow_node = github.RawGitHubActionsWorkflowNode.create(
workflow, analysis_context, workflow_source_path, local_action_stack
)
core.increment_debug_sequence_number()

raw_workflow_node.analyse()
Expand All @@ -89,6 +97,31 @@ def analyse_github_workflow(
return raw_workflow_node


def analyse_github_composite_action_file(
action_path: str, repo_path: str | None, dump_debug: bool = False
) -> core.Node:
"""Perform dataflow analysis for a standalone local composite GitHub Action metadata file.

The action is wrapped in a synthetic workflow so existing GitHub Actions analyses can traverse
its nested ``run`` and ``uses`` steps without a separate root node type.
"""
steps = github.parse_composite_action_steps(action_path)
workflow = cast(
github_workflow_model.Workflow,
{
"name": action_path,
"on": "workflow_call",
"jobs": {
"composite-action": {
"runs-on": "ubuntu-latest",
"steps": steps,
}
},
},
)
return analyse_github_workflow(workflow, action_path, repo_path, dump_debug, (os.path.abspath(action_path),))


def analyse_bash_script(
bash_content: str, source_path: str, repo_path: str | None, dump_debug: bool = False
) -> core.Node:
Expand Down
189 changes: 180 additions & 9 deletions src/macaron/code_analyzer/dataflow_analysis/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

from __future__ import annotations

import os
from collections import defaultdict
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from graphlib import TopologicalSorter
from typing import cast

import yamale

from macaron.code_analyzer.dataflow_analysis import ( # pylint: disable=cyclic-import
bash,
Expand All @@ -19,10 +23,75 @@
models,
printing,
)
from macaron.errors import CallGraphError
from macaron.errors import CallGraphError, ParseError
from macaron.parsers import github_workflow_model


def parse_composite_action_steps(action_path: str) -> list[github_workflow_model.Step]:
"""Parse the steps from a local composite GitHub Action metadata file.

Parameters
----------
action_path: str
Path to an ``action.yml`` or ``action.yaml`` metadata file.

Returns
-------
list[github_workflow_model.Step]
The composite action steps, or an empty list when the action is not a composite action.

Raises
------
ParseError
When the action metadata file cannot be parsed.
"""
try:
parse_result = yamale.make_data(action_path, parser="ruamel")
except OSError as error:
raise ParseError("Cannot parse GitHub Action metadata: " + action_path) from error

if len(parse_result) != 1 or not isinstance(parse_result[0][0], dict):
raise ParseError("Cannot parse GitHub Action metadata: " + action_path)

action = parse_result[0][0]
runs = action.get("runs")
if not isinstance(runs, dict) or runs.get("using") != "composite":
return []

steps = runs.get("steps", [])
if not isinstance(steps, list):
raise ParseError("Cannot parse composite GitHub Action steps: " + action_path)

return cast(list[github_workflow_model.Step], steps)


def resolve_local_action_metadata_path(repo_path: str | None, uses_name: str) -> str | None:
"""Resolve a local ``uses: ./...`` action reference to its metadata file path."""
if repo_path is None or not uses_name.startswith("./"):
return None

action_dir = os.path.normpath(os.path.join(repo_path, uses_name))
repo_root = os.path.abspath(repo_path)
abs_action_dir = os.path.abspath(action_dir)
if os.path.commonpath([repo_root, abs_action_dir]) != repo_root:
return None

for metadata_name in ("action.yml", "action.yaml"):
candidate = os.path.join(action_dir, metadata_name)
if os.path.isfile(candidate):
return candidate

return None


def split_uses_name_version(uses: str) -> tuple[str, str | None]:
"""Split a GitHub Actions ``uses`` value into name and optional version."""
uses_name, separator, uses_version = uses.rpartition("@")
if not separator:
return uses, None
return uses_name, uses_version


@dataclass(frozen=True)
class GitHubActionsWorkflowContext(core.Context):
"""Context for the top-level scope of a GitHub Actions Workflow."""
Expand All @@ -41,10 +110,14 @@ class GitHubActionsWorkflowContext(core.Context):
console: core.ContextRef[facts.Scope]
#: Filepath of workflow file.
source_filepath: str
#: Local composite action metadata paths currently being expanded.
local_action_stack: tuple[str, ...]

@staticmethod
def create(
analysis_context: core.ContextRef[core.AnalysisContext], source_filepath: str
analysis_context: core.ContextRef[core.AnalysisContext],
source_filepath: str,
local_action_stack: tuple[str, ...] = (),
) -> GitHubActionsWorkflowContext:
"""Create a new workflow context and its associated scopes.

Expand All @@ -68,6 +141,7 @@ def create(
workflow_variables=core.OwningContextRef(facts.Scope("workflow_vars")),
console=core.OwningContextRef(facts.Scope("console")),
source_filepath=source_filepath,
local_action_stack=local_action_stack,
)

def direct_refs(self) -> Iterator[core.ContextRef[core.Context] | core.ContextRef[facts.Scope]]:
Expand Down Expand Up @@ -135,9 +209,15 @@ class GitHubActionsStepContext(core.Context):
#: Name prefix for step output variables (stored in the job variables)
#: belonging to this step (e.g. "steps.step_id.outputs.")
output_var_prefix: str | None
#: Local composite action metadata paths currently being expanded.
local_action_stack: tuple[str, ...]

@staticmethod
def create(job_context: core.ContextRef[GitHubActionsJobContext], step_id: str | None) -> GitHubActionsStepContext:
def create(
job_context: core.ContextRef[GitHubActionsJobContext],
step_id: str | None,
local_action_stack: tuple[str, ...] = (),
) -> GitHubActionsStepContext:
"""Create a new step context and its associated scopes.

Env scope inherits from outer context. Output var prefix is derived from step_id.
Expand All @@ -158,6 +238,7 @@ def create(job_context: core.ContextRef[GitHubActionsJobContext], step_id: str |
job_context=job_context.get_non_owned(),
env=core.OwningContextRef(facts.Scope("env", job_context.ref.env.ref)),
output_var_prefix=("steps." + step_id + ".outputs.") if step_id is not None else None,
local_action_stack=local_action_stack,
)

def direct_refs(self) -> Iterator[core.ContextRef[core.Context] | core.ContextRef[facts.Scope]]:
Expand Down Expand Up @@ -216,6 +297,7 @@ def create(
workflow: github_workflow_model.Workflow,
analysis_context: core.ContextRef[core.AnalysisContext],
source_filepath: str,
local_action_stack: tuple[str, ...] = (),
) -> RawGitHubActionsWorkflowNode:
"""Create workflow node and its associated context.

Expand All @@ -233,7 +315,7 @@ def create(
RawGitHubActionsWorkflowNode
The new workflow node.
"""
workflow_context = GitHubActionsWorkflowContext.create(analysis_context, source_filepath)
workflow_context = GitHubActionsWorkflowContext.create(analysis_context, source_filepath, local_action_stack)

return RawGitHubActionsWorkflowNode(workflow, core.OwningContextRef(workflow_context))

Expand Down Expand Up @@ -435,7 +517,7 @@ def build_normal_job() -> core.Node:
if isinstance(raw_with_params, dict):

def build_reusable_workflow_call_job() -> core.Node:
uses_name, _, uses_version = call_def["uses"].rpartition("@")
uses_name, uses_version = split_uses_name_version(call_def["uses"])

with_parameters: dict[str, facts.Value] = {}
for key, val in raw_with_params.items():
Expand All @@ -455,7 +537,7 @@ def build_reusable_workflow_call_job() -> core.Node:
self.job_id,
self.context.get_non_owned(),
uses_name,
uses_version if uses_version != "" else None,
uses_version,
with_parameters,
)

Expand Down Expand Up @@ -614,7 +696,14 @@ def create(

steps = [
RawGitHubActionsStepNode(
step, core.OwningContextRef(GitHubActionsStepContext.create(context, step.get("id")))
step,
core.OwningContextRef(
GitHubActionsStepContext.create(
context,
step.get("id"),
context.ref.workflow_context.ref.local_action_stack,
)
),
)
for step in job.get("steps", [])
]
Expand Down Expand Up @@ -793,7 +882,7 @@ def identify_interpretations(self, state: core.State) -> dict[core.Interpretatio
if isinstance(raw_with_params, dict):

def build_action() -> core.Node:
uses_name, _, uses_version = self.definition["uses"].rpartition("@")
uses_name, uses_version = split_uses_name_version(self.definition["uses"])

with_parameters: dict[str, facts.Value] = {}
for key, val in raw_with_params.items():
Expand All @@ -812,7 +901,7 @@ def build_action() -> core.Node:
self.definition,
self.context.get_non_owned(),
uses_name,
uses_version if uses_version != "" else None,
uses_version,
with_parameters,
)

Expand Down Expand Up @@ -893,6 +982,33 @@ def __init__(

def identify_interpretations(self, state: core.State) -> dict[core.InterpretationKey, Callable[[], core.Node]]:
"""Intepret the semantics of the different supported actions."""
if self.uses_name.startswith("./"):
action_path = resolve_local_action_metadata_path(
self.context.ref.job_context.ref.workflow_context.ref.analysis_context.ref.repo_path,
self.uses_name,
)
if action_path:
abs_action_path = os.path.abspath(action_path)
if abs_action_path in self.context.ref.local_action_stack:

def build_noop_for_recursive_local_action() -> core.Node:
return core.NoOpStatementNode()

return {"default": build_noop_for_recursive_local_action}

def build_composite_action() -> core.Node:
steps = parse_composite_action_steps(action_path)
if not steps:
return core.NoOpStatementNode()
return GitHubActionsCompositeActionNode.create(
action_path,
steps,
self.context.ref.job_context.get_non_owned(),
self.context.ref.local_action_stack + (abs_action_path,),
)

return {"default": build_composite_action}

match self.uses_name:
case "actions/checkout":

Expand Down Expand Up @@ -1054,6 +1170,61 @@ def get_printable_properties_table(self) -> dict[str, set[tuple[str | None, str]
return result


class GitHubActionsCompositeActionNode(core.ControlFlowGraphNode):
"""Control-flow-graph node representing a local composite GitHub Action."""

#: Path to the composite action metadata file.
action_path: str
#: Nested action steps, in execution order.
steps: list[RawGitHubActionsStepNode]
#: Control flow graph.
_cfg: core.ControlFlowGraph

def __init__(self, action_path: str, steps: list[RawGitHubActionsStepNode]) -> None:
"""Initialize a local composite action node."""
super().__init__()
self.action_path = action_path
self.steps = steps
self._cfg = core.ControlFlowGraph.create_from_sequence(steps)

def children(self) -> Iterator[core.Node]:
"""Yield the nested composite action steps."""
yield from self.steps

def get_entry(self) -> core.Node:
"""Return the entry node."""
return self._cfg.get_entry()

def get_successors(self, node: core.Node, exit_type: core.ExitType) -> set[core.Node | core.ExitType]:
"""Return the successors for a particular exit of a particular node."""
return self._cfg.get_successors(node, core.DEFAULT_EXIT)

def get_printable_properties_table(self) -> dict[str, set[tuple[str | None, str]]]:
"""Return a properties table containing the local action path."""
return {"local action": {(None, self.action_path)}}

@staticmethod
def create(
action_path: str,
steps: list[github_workflow_model.Step],
job_context: core.NonOwningContextRef[GitHubActionsJobContext],
local_action_stack: tuple[str, ...],
) -> GitHubActionsCompositeActionNode:
"""Create a composite action node from parsed metadata steps."""
return GitHubActionsCompositeActionNode(
action_path,
[
RawGitHubActionsStepNode(
step,
core.OwningContextRef(
GitHubActionsStepContext.create(job_context, step.get("id"), local_action_stack)
),
)
for step in steps
],
)


class GitHubActionsRunStepNode(core.ControlFlowGraphNode):
"""Control-flow-graph node representing a GitHub Actions Run Step.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ def _extract_statement_words(statement_node: bash.BashStatementNode) -> list[str

def _extract_call_words(call_expr: CallExpr) -> list[str]:
"""Extract literal word values from a call expression."""
args = call_expr["Args"]
args = call_expr.get("Args", [])
words: list[str] = []
for arg in args:
parts = arg["Parts"]
parts = arg.get("Parts", [])
word = "".join(part.get("Value", "") for part in parts if is_lit(part)).strip()
if not word:
return []
Expand Down
Loading
Loading