Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ ignore = [
"utils/build/*" = ["ALL"] # mostly python weblog code. it may be a good idea to enable rules here

"utils/{k8s_lib_injection/*,_context/_scenarios/k8s_lib_injection.py}" = [
"ANN003", # missing-type-kwargs: TODO
"ANN201", # missing-return-type-undocumented-public-function: TODO
"B006", # mutable-argument-default: TODO
"B007", # unused-loop-control-variable: TODO
"B904", # raise-without-from-inside-except: TODO
"DTZ005", # call-datetime-now-without-tzinfo: TODO
"E401", # multiple-imports-on-one-line: TODO
"E501", # line-too-long: TODO
Expand Down
18 changes: 11 additions & 7 deletions utils/_context/_scenarios/k8s_lib_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from utils._logger import logger
from .core import Scenario, scenario_groups, ScenarioGroup

DEFAULT_K8S_SCENARIO_GROUPS = [scenario_groups.all, scenario_groups.lib_injection]


class K8sScenarioWithClusterProvider:
k8s_cluster_provider: K8sClusterProvider
Expand All @@ -31,17 +33,19 @@ def __init__(
doc: str,
*,
use_uds: bool = False,
weblog_env: dict[str, str] = {},
dd_cluster_feature: dict[str, str] = {},
weblog_env: dict[str, str] | None = None,
dd_cluster_feature: dict[str, str] | None = None,
with_datadog_operator: bool = False,
with_cluster_agent: bool = True,
scenario_groups: list[ScenarioGroup] = [scenario_groups.all, scenario_groups.lib_injection],
scenario_groups: list[ScenarioGroup] | None = None,
Comment thread
cbeauchesne marked this conversation as resolved.
) -> None:
if scenario_groups is None:
scenario_groups = DEFAULT_K8S_SCENARIO_GROUPS
super().__init__(name, doc=doc, github_workflow="libinjection", scenario_groups=scenario_groups)
self.use_uds = use_uds
self.with_datadog_operator = with_datadog_operator
self.weblog_env = weblog_env
self.dd_cluster_feature = dd_cluster_feature
self.weblog_env = weblog_env or {}
self.dd_cluster_feature = dd_cluster_feature or {}
self._configuration: dict[str, str] = {}
self.with_cluster_agent = with_cluster_agent
self.k8s_helm_chart_version: str | None = None
Expand Down Expand Up @@ -224,8 +228,8 @@ def __init__(
doc: str,
*,
use_uds: bool = False,
weblog_env: dict[str, str] = {},
dd_cluster_feature: dict[str, str] = {},
weblog_env: dict[str, str] | None = None,
dd_cluster_feature: dict[str, str] | None = None,
) -> None:
super().__init__(
name,
Expand Down
52 changes: 26 additions & 26 deletions utils/k8s_lib_injection/k8s_cluster_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ class K8sClusterProvider:

def __init__(self, *, is_local_managed: bool = False) -> None:
self.is_local_managed = is_local_managed
self._cluster_info = None
self._cluster_info: K8sClusterInfo | None = None

def configure(self):
def configure(self) -> None:
self.configure_cluster()
self.configure_networking()
# self.configure_cluster_api_connection()

def configure_cluster(self):
def configure_cluster(self) -> None:
"""Configure the cluster properties"""
raise NotImplementedError

def get_cluster_info(self):
def get_cluster_info(self) -> K8sClusterInfo:
"""It should return a K8sClusterInfo object with the cluster information"""
if self._cluster_info is None:
raise ValueError("Cluster not configured")
return self._cluster_info

def configure_networking(self):
def configure_networking(self) -> None:
"""Configure the networking properties for the cluster"""

if self._cluster_info is None:
Expand All @@ -91,15 +91,15 @@ def configure_networking(self):
self._cluster_info.internal_weblog_port = 18080
self._cluster_info.cluster_host_name = "localhost"

def configure_cluster_api_connection(self):
def configure_cluster_api_connection(self) -> None:
"""Configure the k8s cluster api connection"""
try:
config.load_kube_config()
except Exception as e:
logger.error(f"Error loading kube config: {e}")
raise e
raise

def ensure_cluster(self):
def ensure_cluster(self) -> None:
"""All local managed clusters should be created here"""
if self.is_local_managed:
raise NotImplementedError
Expand All @@ -108,13 +108,13 @@ def ensure_cluster(self):
)
self.configure_cluster_api_connection()

def destroy_cluster(self):
def destroy_cluster(self) -> None:
# TODO RMM review sleep mode failures on get cluter logs
if self.is_local_managed:
raise NotImplementedError
logger.info("Using an external K8s cluster: Remember to clean up the resources!!")

def create_spak_service_account(self):
def create_spak_service_account(self) -> None:
"""Create service account for launching spark application in k8s"""
execute_command(f"kubectl create serviceaccount spark --namespace=default")
execute_command(
Expand Down Expand Up @@ -187,22 +187,22 @@ def __init__(
self.docker_in_docker = False
self.cluster_template = cluster_template

def get_agent_port(self):
def get_agent_port(self) -> int | None:
if self.docker_in_docker:
return self.internal_agent_port
return self.agent_port

def get_weblog_port(self):
def get_weblog_port(self) -> int | None:
if self.docker_in_docker:
return self.internal_weblog_port
return self.weblog_port

def core_v1_api(self):
def core_v1_api(self) -> client.CoreV1Api:
"""Provides de CoreV1Api object (from kubernetes python api) to interact with the k8s cluster"""
# return client.CoreV1Api(api_client=config.new_client_from_config(context=self.context_name))
return client.CoreV1Api()

def apps_api(self):
def apps_api(self) -> client.AppsV1Api:
"""Provides de AppsV1Api object (from kubernetes python api) to interact with the k8s cluster"""
# return client.AppsV1Api(api_client=config.new_client_from_config(context=self.context_name))
return client.AppsV1Api()
Expand All @@ -214,10 +214,10 @@ class K8sMiniKubeClusterProvider(K8sClusterProvider):
def __init__(self):
super().__init__(is_local_managed=True)

def configure_cluster(self):
def configure_cluster(self) -> None:
self._cluster_info = K8sClusterInfo(cluster_name="minikube", context_name="minikube")

def ensure_cluster(self):
def ensure_cluster(self) -> None:
logger.info("Ensuring MiniKube cluster")
execute_command("minikube start --driver docker --ports 18080:18080 --ports 8126:8126")
execute_command("minikube status")
Expand All @@ -233,7 +233,7 @@ def ensure_cluster(self):
if PrivateRegistryConfig.is_configured():
self._create_secret_to_access_to_internal_registry()

def destroy_cluster(self):
def destroy_cluster(self) -> None:
logger.info("Destroying MiniKube cluster")
execute_command("minikube delete")

Expand All @@ -248,13 +248,13 @@ class K8sEKSRemoteClusterProvider(K8sClusterProvider):
def __init__(self):
super().__init__(is_local_managed=False)

def configure_cluster(self):
def configure_cluster(self) -> None:
self._cluster_info = K8sClusterInfo(
cluster_name="montero2Sandbox.us-east-1.eksctl.io",
context_name="roberto.montero@datadoghq.com@montero2Sandbox.us-east-1.eksctl.io",
)

def configure_networking(self):
def configure_networking(self) -> None:
"""Configure the networking properties for the cluster"""

if self._cluster_info is None:
Expand All @@ -267,7 +267,7 @@ def configure_networking(self):
self._cluster_info.internal_weblog_port = 18080
self._cluster_info.cluster_host_name = None

def configure_cluster_api_connection(self):
def configure_cluster_api_connection(self) -> None:
"""Configure the k8s cluster api connection"""
try:
# Update context name
Expand Down Expand Up @@ -319,7 +319,7 @@ def configure_cluster_api_connection(self):

except Exception as e:
logger.error(f"Error loading kube config: {e}")
raise e
raise

def execute_piped_command(self, command: str) -> str:
# awk_comm = 'kubectl config get-contexts | awk \'{if ($1 ~ "@lib-injection-testing-eks-sandbox") print $1}\''
Expand All @@ -340,14 +340,14 @@ class K8sKindClusterProvider(K8sClusterProvider):
def __init__(self):
super().__init__(is_local_managed=True)

def configure_cluster(self):
def configure_cluster(self) -> None:
self._cluster_info = K8sClusterInfo(
cluster_name="lib-injection-testing",
context_name="kind-lib-injection-testing",
cluster_template="utils/k8s_lib_injection/resources/kind-config-template.yaml",
)

def configure_networking(self):
def configure_networking(self) -> None:
"""Configure the networking properties for the cluster"""

if self._cluster_info is None:
Expand All @@ -360,7 +360,7 @@ def configure_networking(self):
self._cluster_info.internal_weblog_port = 18080
self._cluster_info.cluster_host_name = "localhost"

def ensure_cluster(self):
def ensure_cluster(self) -> None:
logger.info("Ensuring kind cluster")
kind_command = f"kind create cluster --image=kindest/node:v1.25.3@sha256:f52781bc0d7a19fb6c405c2af83abfeb311f130707a0e219175677e366cc45d1 --name {self.get_cluster_info().cluster_name} --config {self.get_cluster_info().cluster_template} --wait 1m"

Expand All @@ -380,15 +380,15 @@ def ensure_cluster(self):
# We need to configure the api after create the cluster
self.configure_cluster_api_connection()

def destroy_cluster(self):
def destroy_cluster(self) -> None:
logger.info("Destroying kind cluster")
try:
execute_command(f"kind delete cluster --name {self.get_cluster_info().cluster_name}")
execute_command(f"docker rm -f {self.get_cluster_info().cluster_name}-control-plane")
except Exception as e:
logger.error(f"Error destroying the cluster: {e}")

def _setup_kind_in_gitlab(self):
def _setup_kind_in_gitlab(self) -> None:
# The build runs in a docker container:
# - Docker commands are forwarded to the host.
# - The kind container is a sibling to the build container
Expand Down
5 changes: 4 additions & 1 deletion utils/k8s_lib_injection/k8s_command_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ def helm_install_chart(
k8s_cluster_info: "K8sClusterInfo",
name: str,
chart: str,
set_dict: dict[str, str] = {},
set_dict: dict[str, str] | None = None,
value_file: str | None = None,
*,
timeout: int | None = 90,
namespace: str = "datadog",
chart_version: str | None = None,
) -> None:
if set_dict is None:
set_dict = {}

# Copy and replace cluster name in the value file
custom_value_file = None
if value_file:
Expand Down
8 changes: 4 additions & 4 deletions utils/k8s_lib_injection/k8s_datadog_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, output_folder: str) -> None:
def configure(
self,
k8s_cluster_info: K8sClusterInfo,
dd_cluster_feature: dict[str, str] = {},
dd_cluster_feature: dict[str, str] | None = None,
*,
dd_cluster_uds: bool | None = None,
dd_cluster_img: str | None = None,
Expand All @@ -37,7 +37,7 @@ def configure(
helm_chart_operator_version: str | None = None,
) -> None:
self.k8s_cluster_info = k8s_cluster_info
self.dd_cluster_feature = dd_cluster_feature
self.dd_cluster_feature = dd_cluster_feature or {}
self.dd_cluster_uds = dd_cluster_uds
self.dd_cluster_img = dd_cluster_img
self.api_key = api_key
Expand Down Expand Up @@ -252,7 +252,7 @@ def wait_for_test_agent(self, namespace: str) -> None:
daemonset_created = False
daemonset_status = None
# Wait for the daemonset to be created
for i in range(20):
for _ in range(20):
daemonset_status = self.k8s_cluster_info.apps_api().read_namespaced_daemon_set_status(
name="datadog", namespace=namespace
)
Expand Down Expand Up @@ -290,7 +290,7 @@ def _wait_for_cluster_agent_ready(self, namespace: str, label_selector: str = "a
cluster_agent_status = None
datadog_cluster_name = None

for i in range(20):
for _ in range(20):
try:
if datadog_cluster_name is None:
pods = self.k8s_cluster_info.core_v1_api().list_namespaced_pod(
Expand Down
Loading