From d5312d22b441064bff2483d41034080493859e6e Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Mon, 29 Sep 2025 17:03:18 +0200 Subject: [PATCH 01/10] [WIP] autotune: generate step response from flight data --- autotune/data_selection_window.py | 42 +++++++ autotune/pid_analyse.py | 182 ++++++++++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 autotune/pid_analyse.py diff --git a/autotune/data_selection_window.py b/autotune/data_selection_window.py index e42e11a..bdd6a0f 100644 --- a/autotune/data_selection_window.py +++ b/autotune/data_selection_window.py @@ -3,6 +3,7 @@ from data_extractor import DataExtractor from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.widgets import SpanSelector +from pid_analyse import plot_closed_loop_step_response from PyQt5.QtWidgets import ( QComboBox, QDialog, @@ -54,6 +55,18 @@ def __init__(self, filename): "output": "vehicle_angular_velocity/xyz[2].0", "input_legacy": "actuator_controls_1/control[2].0", }, + "Rollrate(closed-loop)": { + "input": "vehicle_rates_setpoint/roll.0", + "output": "vehicle_angular_velocity/xyz[0].0", + }, + "Pitchrate(closed-loop)": { + "input": "vehicle_rates_setpoint/pitch.0", + "output": "vehicle_angular_velocity/xyz[1].0", + }, + "Yawrate(closed-loop)": { + "input": "vehicle_rates_setpoint/yaw.0", + "output": "vehicle_angular_velocity/xyz[2].0", + }, } self.presets = {} @@ -106,6 +119,10 @@ def __init__(self, filename): btn_ok.clicked.connect(self.loadSelection) layout_v.addWidget(btn_ok) + pid_btn_ok = QPushButton("Analyse Current Tuning") + pid_btn_ok.clicked.connect(self.plotPIDAnalysis) + layout_v.addWidget(pid_btn_ok) + self.setLayout(layout_v) if filename: @@ -291,6 +308,31 @@ def onselect(self, xmin, xmax): self.plotCoherence() + def plotPIDAnalysis(self): + + if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: + return + + # Use getInputOutputData with selected range + if ( + self.t_start is not None + and self.t_stop is not None + and self.t_stop > self.t_start + ): + t_sel, u_sel, y_sel, _ = self.data_extractor.getInputOutputData( + self.topics[self.index_u], + self.topics[self.index_y], + self.t_start, + self.t_stop, + ) + else: + # If no range selected, just use full duration + t_sel, u_sel, y_sel, _ = self.data_extractor.getInputOutputData( + self.topics[self.index_u], self.topics[self.index_y] + ) + + plot_closed_loop_step_response(u_sel, y_sel, t_sel) + def plotCoherence(self): if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: return diff --git a/autotune/pid_analyse.py b/autotune/pid_analyse.py new file mode 100644 index 0000000..e9db1cd --- /dev/null +++ b/autotune/pid_analyse.py @@ -0,0 +1,182 @@ +import matplotlib.pyplot as plt +import numpy as np +from numpy.lib.stride_tricks import sliding_window_view +from scipy.ndimage import gaussian_filter1d + + +def plot_closed_loop_step_response( + u: np.ndarray, + y: np.ndarray, + t: np.ndarray, + cutfreq: float = 25.0, + window_duration: float = 1.0, +) -> dict: + """ + Estimate and plot the closed-loop step response using Wiener deconvolution, + including uncertainty bounds and key metrics. + + Parameters + ---------- + u : ndarray + Input setpoint signal. + y : ndarray + Measured output signal. + t : ndarray + Time vector corresponding to u and y. + cutfreq : float + Cutoff frequency for Wiener deconvolution (Hz). + window_duration : float + Duration of each analysis window in seconds. + + Returns + ------- + dict + Metrics: rise_time, settling_time, overshoot, steady_state_error (mean ± std) + """ + fs = estimate_sampling_frequency(t) + frame_samples = int(window_duration * fs) + shift = frame_samples // 16 + response_window_samples = int(0.5 * fs) + time_response = t[:response_window_samples] - t[0] + + # Extract overlapping windows + u_windows = sliding_window_view(u, frame_samples)[::shift] + y_windows = sliding_window_view(y, frame_samples)[::shift] + + # Apply Hanning window + window_func = np.hanning(frame_samples) + u_windows = u_windows * window_func + y_windows = y_windows * window_func + + # Wiener deconvolution + deconvolved = wiener_deconvolution(u_windows, y_windows, cutfreq, fs) + step_responses = deconvolved[:, :response_window_samples].cumsum(axis=1) + + # Plot with uncertainty and compute metrics + metrics = plot_step_responses_with_metrics(time_response, step_responses) + return metrics + + +def estimate_sampling_frequency(t: np.ndarray) -> float: + dt = np.diff(t).mean() + if dt == 0: + raise ValueError("Time vector has zero differences.") + return 1 / dt + + +def wiener_deconvolution( + input_: np.ndarray, output: np.ndarray, cutfreq: float, fs: float +) -> np.ndarray: + pad_len = 1024 - (input_.shape[1] % 1024) + input_padded = np.pad(input_, ((0, 0), (0, pad_len)), mode="constant") + output_padded = np.pad(output, ((0, 0), (0, pad_len)), mode="constant") + + H = np.fft.fft(input_padded, axis=-1) + G = np.fft.fft(output_padded, axis=-1) + + sn = create_frequency_mask(H.shape[1], cutfreq, fs) + H_conj = np.conj(H) + denom = (H * H_conj) + (1.0 / sn) + deconvolved = np.real(np.fft.ifft(G * H_conj / denom, axis=-1)) + + return deconvolved + + +def create_frequency_mask(n_samples: int, cutfreq: float, fs: float) -> np.ndarray: + freqs = np.abs(np.fft.fftfreq(n_samples, 1 / fs)) + mask = np.clip(freqs, cutfreq - 1e-9, cutfreq) + mask = normalize(mask) + len_lpf = np.sum(1 - mask) + mask = normalize(gaussian_filter1d(mask, len_lpf / 6.0)) + return 10.0 * (-mask + 1.0 + 1e-9) + + +def normalize(arr: np.ndarray) -> np.ndarray: + arr -= arr.min() + max_val = arr.max() + if max_val > 1e-10: + arr /= max_val + return arr + + +def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray) -> dict: + mean_resp = responses.mean(axis=0) + std_resp = responses.std(axis=0) + + plt.figure(figsize=(8, 5)) + + # Plot individual responses lightly + for resp in responses: + plt.plot(time, resp, alpha=0.2, color="gray") + + # Mean response + plt.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") + + # Uncertainty bounds (±1 std) + plt.fill_between( + time, + mean_resp - std_resp, + mean_resp + std_resp, + color="blue", + alpha=0.2, + label="±1 std", + ) + + # Horizontal reference line + plt.axhline(1, color="red", linestyle="--", label="Input Input") + + plt.xlabel("Time [s]") + plt.ylabel("Step Response") + plt.title("Estimated Step Response with Uncertainty") + plt.legend() + plt.tight_layout() + plt.show(block=False) + + # Compute metrics + metrics = compute_step_response_metrics(time, responses) + return metrics + + +def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> dict: + + rise_times, settling_times, overshoots, steady_state_errors = [], [], [], [] + + for resp in responses: + final_val = resp[-1] + + # Rise time (10% -> 90%) + try: + t10 = time[np.where(resp >= 0.1 * final_val)[0][0]] + t90 = time[np.where(resp >= 0.9 * final_val)[0][0]] + rise_times.append(t90 - t10) + except IndexError: + rise_times.append(np.nan) + + # Settling time (within ±2% of final value) + within_bounds = np.where(np.abs(resp - final_val) <= 0.02 * final_val)[0] + settling_times.append( + time[within_bounds[-1]] if len(within_bounds) > 0 else np.nan + ) + + # Overshoot (%) + overshoot = (np.max(resp) - final_val) / final_val * 100 + overshoots.append(overshoot) + + # Steady-state error + steady_state_errors.append(final_val - 1) + + metrics = { + "rise_time": (np.nanmean(rise_times), np.nanstd(rise_times)), + "settling_time": (np.nanmean(settling_times), np.nanstd(settling_times)), + "overshoot (%)": (np.nanmean(overshoots), np.nanstd(overshoots)), + "steady_state_error": ( + np.nanmean(steady_state_errors), + np.nanstd(steady_state_errors), + ), + } + + print("Step Response Metrics (mean ± std):") + for k, v in metrics.items(): + print(f"{k}: {v[0]:.3f} ± {v[1]:.3f}") + + return metrics From 7ae45bb69869cdf07ae86ffc8872a52a9d25add6 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Thu, 2 Oct 2025 16:44:20 +0200 Subject: [PATCH 02/10] autotune: fix settling time metric --- autotune/pid_analyse.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/autotune/pid_analyse.py b/autotune/pid_analyse.py index e9db1cd..f7e89c3 100644 --- a/autotune/pid_analyse.py +++ b/autotune/pid_analyse.py @@ -152,11 +152,19 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di except IndexError: rise_times.append(np.nan) - # Settling time (within ±2% of final value) - within_bounds = np.where(np.abs(resp - final_val) <= 0.02 * final_val)[0] - settling_times.append( - time[within_bounds[-1]] if len(within_bounds) > 0 else np.nan - ) + # Settling time (within ±10% of final value) + tolerance = 0.1 * final_val + above_lower = np.where(resp < final_val + tolerance)[0] + below_upper = np.where(resp > final_val - tolerance)[0] + within_bounds = np.intersect1d(above_lower, below_upper) + + # Find first index after which response stays within bounds for the rest of the signal + for idx in within_bounds: + if np.all(resp[idx:] <= final_val + tolerance) and np.all(resp[idx:] >= final_val - tolerance): + settling_times.append(time[idx]) + break + else: + settling_times.append(np.nan) # Overshoot (%) overshoot = (np.max(resp) - final_val) / final_val * 100 From fbcd5adbbeac7d7359fda33eb6cfe1d81c1ef0c6 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Thu, 2 Oct 2025 16:44:34 +0200 Subject: [PATCH 03/10] autotune: plot reference step input in PID analyser --- autotune/pid_analyse.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/autotune/pid_analyse.py b/autotune/pid_analyse.py index f7e89c3..160f8be 100644 --- a/autotune/pid_analyse.py +++ b/autotune/pid_analyse.py @@ -122,8 +122,11 @@ def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray) -> label="±1 std", ) - # Horizontal reference line - plt.axhline(1, color="red", linestyle="--", label="Input Input") + # Reference step input + step_time = np.concatenate([[-0.01, 0], time]) + step_values = np.concatenate([[0, 1], np.ones_like(time)]) + + plt.step(step_time, step_values, where='post', color='red', label="Step Input") plt.xlabel("Time [s]") plt.ylabel("Step Response") From af9b17e046636f35f5369b72df992695a6c77942 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Fri, 3 Oct 2025 10:59:29 +0200 Subject: [PATCH 04/10] autotune: (wip) start wrapping step response in UI --- autotune/data_selection_window.py | 27 +++++++++++--- autotune/pid_analyse.py | 60 +++++++++++++++++++------------ 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/autotune/data_selection_window.py b/autotune/data_selection_window.py index bdd6a0f..4e97a23 100644 --- a/autotune/data_selection_window.py +++ b/autotune/data_selection_window.py @@ -309,11 +309,9 @@ def onselect(self, xmin, xmax): self.plotCoherence() def plotPIDAnalysis(self): - if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: return - # Use getInputOutputData with selected range if ( self.t_start is not None and self.t_stop is not None @@ -326,12 +324,33 @@ def plotPIDAnalysis(self): self.t_stop, ) else: - # If no range selected, just use full duration t_sel, u_sel, y_sel, _ = self.data_extractor.getInputOutputData( self.topics[self.index_u], self.topics[self.index_y] ) - plot_closed_loop_step_response(u_sel, y_sel, t_sel) + # Create or reuse Step Response dialog + if not hasattr(self, "step_dialog") or self.step_dialog is None: + self.step_dialog = QDialog(self) + self.step_dialog.setWindowTitle("Step Response") + layout = QVBoxLayout(self.step_dialog) + + # Matplotlib figure + canvas + self.step_fig, self.step_ax = plt.subplots(figsize=(8, 5), constrained_layout=True) + self.step_canvas = FigureCanvas(self.step_fig) + layout.addWidget(self.step_canvas) + + self.step_dialog.setLayout(layout) + + else: + # Clear previous plot if dialog already exists + self.step_ax.clear() + + plot_closed_loop_step_response(u_sel, y_sel, t_sel, ax=self.step_ax) + self.step_canvas.draw() + + self.step_dialog.show() + self.step_dialog.raise_() + def plotCoherence(self): if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: diff --git a/autotune/pid_analyse.py b/autotune/pid_analyse.py index 160f8be..da89b8b 100644 --- a/autotune/pid_analyse.py +++ b/autotune/pid_analyse.py @@ -10,6 +10,7 @@ def plot_closed_loop_step_response( t: np.ndarray, cutfreq: float = 25.0, window_duration: float = 1.0, + ax = None ) -> dict: """ Estimate and plot the closed-loop step response using Wiener deconvolution, @@ -33,7 +34,12 @@ def plot_closed_loop_step_response( dict Metrics: rise_time, settling_time, overshoot, steady_state_error (mean ± std) """ - fs = estimate_sampling_frequency(t) + + dt = np.diff(t).mean() + if dt == 0: + return + fs = 1 / dt + frame_samples = int(window_duration * fs) shift = frame_samples // 16 response_window_samples = int(0.5 * fs) @@ -53,16 +59,10 @@ def plot_closed_loop_step_response( step_responses = deconvolved[:, :response_window_samples].cumsum(axis=1) # Plot with uncertainty and compute metrics - metrics = plot_step_responses_with_metrics(time_response, step_responses) + metrics = plot_step_responses_with_metrics(time_response, step_responses, ax=ax) return metrics -def estimate_sampling_frequency(t: np.ndarray) -> float: - dt = np.diff(t).mean() - if dt == 0: - raise ValueError("Time vector has zero differences.") - return 1 / dt - def wiener_deconvolution( input_: np.ndarray, output: np.ndarray, cutfreq: float, fs: float @@ -99,21 +99,22 @@ def normalize(arr: np.ndarray) -> np.ndarray: return arr -def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray) -> dict: +def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray, ax=None) -> dict: mean_resp = responses.mean(axis=0) std_resp = responses.std(axis=0) - plt.figure(figsize=(8, 5)) + if ax is None: + fig, ax = plt.subplots(figsize=(8, 5)) # Plot individual responses lightly for resp in responses: - plt.plot(time, resp, alpha=0.2, color="gray") + ax.plot(time, resp, alpha=0.2, color="gray") # Mean response - plt.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") + ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") # Uncertainty bounds (±1 std) - plt.fill_between( + ax.fill_between( time, mean_resp - std_resp, mean_resp + std_resp, @@ -123,23 +124,36 @@ def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray) -> ) # Reference step input - step_time = np.concatenate([[-0.01, 0], time]) - step_values = np.concatenate([[0, 1], np.ones_like(time)]) + ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input") - plt.step(step_time, step_values, where='post', color='red', label="Step Input") - - plt.xlabel("Time [s]") - plt.ylabel("Step Response") - plt.title("Estimated Step Response with Uncertainty") - plt.legend() - plt.tight_layout() - plt.show(block=False) + ax.set_xlabel("Time [s]") + ax.set_ylabel("Step Response") + ax.set_title("Estimated Step Response with Uncertainty") + ax.legend() + ax.grid(True) # Compute metrics metrics = compute_step_response_metrics(time, responses) + + # --- Display metrics inside plot --- + metrics_text = "\n".join( + f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() + ) + ax.text( + 0.98, + 0.02, + metrics_text, + ha="right", + va="bottom", + transform=ax.transAxes, + fontsize=9, + color="gray", + ) + return metrics + def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> dict: rise_times, settling_times, overshoots, steady_state_errors = [], [], [], [] From 736a8d28eea1d4c07afe69b405c3a5899622b996 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Fri, 3 Oct 2025 14:28:57 +0200 Subject: [PATCH 05/10] autotune: [wip] continue wrapping in UI --- autotune/data_selection_window.py | 31 ++-- .../{pid_analyse.py => pid_analyse_window.py} | 170 +++++++----------- 2 files changed, 79 insertions(+), 122 deletions(-) rename autotune/{pid_analyse.py => pid_analyse_window.py} (52%) diff --git a/autotune/data_selection_window.py b/autotune/data_selection_window.py index 4e97a23..b1ebc4b 100644 --- a/autotune/data_selection_window.py +++ b/autotune/data_selection_window.py @@ -3,7 +3,7 @@ from data_extractor import DataExtractor from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.widgets import SpanSelector -from pid_analyse import plot_closed_loop_step_response +from pid_analyse_window import PIDAnalyseWindow from PyQt5.QtWidgets import ( QComboBox, QDialog, @@ -312,6 +312,7 @@ def plotPIDAnalysis(self): if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: return + # Get input/output data if ( self.t_start is not None and self.t_stop is not None @@ -328,28 +329,16 @@ def plotPIDAnalysis(self): self.topics[self.index_u], self.topics[self.index_y] ) - # Create or reuse Step Response dialog - if not hasattr(self, "step_dialog") or self.step_dialog is None: - self.step_dialog = QDialog(self) - self.step_dialog.setWindowTitle("Step Response") - layout = QVBoxLayout(self.step_dialog) + # Create analysis window if not open yet + if not hasattr(self, "pid_window") or self.pid_window is None: + self.pid_window = PIDAnalyseWindow(self) - # Matplotlib figure + canvas - self.step_fig, self.step_ax = plt.subplots(figsize=(8, 5), constrained_layout=True) - self.step_canvas = FigureCanvas(self.step_fig) - layout.addWidget(self.step_canvas) + # Show + self.pid_window.show() + self.pid_window.raise_() - self.step_dialog.setLayout(layout) - - else: - # Clear previous plot if dialog already exists - self.step_ax.clear() - - plot_closed_loop_step_response(u_sel, y_sel, t_sel, ax=self.step_ax) - self.step_canvas.draw() - - self.step_dialog.show() - self.step_dialog.raise_() + # Update plot + self.pid_window.generate_step_response(u_sel, y_sel, t_sel) def plotCoherence(self): diff --git a/autotune/pid_analyse.py b/autotune/pid_analyse_window.py similarity index 52% rename from autotune/pid_analyse.py rename to autotune/pid_analyse_window.py index da89b8b..5ff9a05 100644 --- a/autotune/pid_analyse.py +++ b/autotune/pid_analyse_window.py @@ -1,7 +1,34 @@ +# pid_analyse_window.py import matplotlib.pyplot as plt import numpy as np from numpy.lib.stride_tricks import sliding_window_view from scipy.ndimage import gaussian_filter1d +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from PyQt5.QtWidgets import QDialog, QVBoxLayout + + +class PIDAnalyseWindow(QDialog): + """Dialog window to show estimated step response""" + + def __init__(self, parent=None): + super().__init__(parent) + self.setWindowTitle("Estimated Step Response") + + # Figure and canvas + self.figure, self.ax = plt.subplots(figsize=(8, 5), constrained_layout=True) + self.canvas = FigureCanvas(self.figure) + + # Layout + layout = QVBoxLayout() + layout.addWidget(self.canvas) + self.setLayout(layout) + + def generate_step_response(self, u: np.ndarray, y: np.ndarray, t: np.ndarray) -> dict: + """Compute and plot step response, updating the canvas.""" + self.ax.clear() + metrics = plot_closed_loop_step_response(u, y, t, ax=self.ax) + self.canvas.draw() + return metrics def plot_closed_loop_step_response( @@ -10,34 +37,11 @@ def plot_closed_loop_step_response( t: np.ndarray, cutfreq: float = 25.0, window_duration: float = 1.0, - ax = None + ax=None ) -> dict: - """ - Estimate and plot the closed-loop step response using Wiener deconvolution, - including uncertainty bounds and key metrics. - - Parameters - ---------- - u : ndarray - Input setpoint signal. - y : ndarray - Measured output signal. - t : ndarray - Time vector corresponding to u and y. - cutfreq : float - Cutoff frequency for Wiener deconvolution (Hz). - window_duration : float - Duration of each analysis window in seconds. - - Returns - ------- - dict - Metrics: rise_time, settling_time, overshoot, steady_state_error (mean ± std) - """ - dt = np.diff(t).mean() - if dt == 0: - return + if np.isclose(dt, 0): + return {} fs = 1 / dt frame_samples = int(window_duration * fs) @@ -63,40 +67,38 @@ def plot_closed_loop_step_response( return metrics +def wiener_deconvolution(input_: np.ndarray, output: np.ndarray, cutoff_freq: float, fs: float, epsilon: float = 1e-3) -> np.ndarray: + """ + Perform Wiener deconvolution on input/output signals + """ + # Pad to next power-of-2 FFT + n_samples = input_.shape[1] + n_fft = 2 ** int(np.ceil(np.log2(n_samples))) + input_padded = np.pad(input_, ((0, 0), (0, n_fft - n_samples)), mode="constant") + output_padded = np.pad(output, ((0, 0), (0, n_fft - n_samples)), mode="constant") -def wiener_deconvolution( - input_: np.ndarray, output: np.ndarray, cutfreq: float, fs: float -) -> np.ndarray: - pad_len = 1024 - (input_.shape[1] % 1024) - input_padded = np.pad(input_, ((0, 0), (0, pad_len)), mode="constant") - output_padded = np.pad(output, ((0, 0), (0, pad_len)), mode="constant") - + # FFT H = np.fft.fft(input_padded, axis=-1) G = np.fft.fft(output_padded, axis=-1) - sn = create_frequency_mask(H.shape[1], cutfreq, fs) + # Frequency-domain Wiener filter + snr = create_frequency_mask(n_fft, cutoff_freq, fs) H_conj = np.conj(H) - denom = (H * H_conj) + (1.0 / sn) - deconvolved = np.real(np.fft.ifft(G * H_conj / denom, axis=-1)) - - return deconvolved + deconv_freq = (H_conj * G) / (H * H_conj + epsilon / snr[None, :]) + # IFFT to get impulse response + deconvolved = np.real(np.fft.ifft(deconv_freq, axis=-1)) + return deconvolved[:, :n_samples] -def create_frequency_mask(n_samples: int, cutfreq: float, fs: float) -> np.ndarray: - freqs = np.abs(np.fft.fftfreq(n_samples, 1 / fs)) - mask = np.clip(freqs, cutfreq - 1e-9, cutfreq) - mask = normalize(mask) - len_lpf = np.sum(1 - mask) - mask = normalize(gaussian_filter1d(mask, len_lpf / 6.0)) - return 10.0 * (-mask + 1.0 + 1e-9) - -def normalize(arr: np.ndarray) -> np.ndarray: - arr -= arr.min() - max_val = arr.max() - if max_val > 1e-10: - arr /= max_val - return arr +def create_frequency_mask(n_samples: int, cutoff_freq: float, fs: float, sigma_factor: float = 6.0) -> np.ndarray: + """ + Create a smooth low-pass mask + """ + freqs = np.fft.fftfreq(n_samples, 1 / fs) + mask = np.exp(-0.5 * (freqs / cutoff_freq) ** 2) # Gaussian low-pass + mask = gaussian_filter1d(mask, sigma=n_samples / sigma_factor) + return np.clip(mask, 1e-3, 1.0) # avoid zeros def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray, ax=None) -> dict: @@ -106,56 +108,30 @@ def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray, ax if ax is None: fig, ax = plt.subplots(figsize=(8, 5)) - # Plot individual responses lightly for resp in responses: ax.plot(time, resp, alpha=0.2, color="gray") - - # Mean response ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") - - # Uncertainty bounds (±1 std) - ax.fill_between( - time, - mean_resp - std_resp, - mean_resp + std_resp, - color="blue", - alpha=0.2, - label="±1 std", - ) - - # Reference step input + ax.fill_between(time, mean_resp - std_resp, mean_resp + std_resp, color="blue", alpha=0.2, label="±1 std") ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input") - ax.set_xlabel("Time [s]") ax.set_ylabel("Step Response") ax.set_title("Estimated Step Response with Uncertainty") ax.legend() ax.grid(True) - # Compute metrics metrics = compute_step_response_metrics(time, responses) - # --- Display metrics inside plot --- + # Add metrics as annotation metrics_text = "\n".join( - f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() - ) - ax.text( - 0.98, - 0.02, - metrics_text, - ha="right", - va="bottom", - transform=ax.transAxes, - fontsize=9, - color="gray", + f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() if not np.isnan(v[0]) ) + ax.text(0.98, 0.02, metrics_text, ha="right", va="bottom", + transform=ax.transAxes, fontsize=9, color="gray") return metrics - def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> dict: - rise_times, settling_times, overshoots, steady_state_errors = [], [], [], [] for resp in responses: @@ -171,19 +147,18 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di # Settling time (within ±10% of final value) tolerance = 0.1 * final_val - above_lower = np.where(resp < final_val + tolerance)[0] - below_upper = np.where(resp > final_val - tolerance)[0] - within_bounds = np.intersect1d(above_lower, below_upper) - - # Find first index after which response stays within bounds for the rest of the signal - for idx in within_bounds: - if np.all(resp[idx:] <= final_val + tolerance) and np.all(resp[idx:] >= final_val - tolerance): - settling_times.append(time[idx]) - break + within_bounds = np.where(np.abs(resp - final_val) <= tolerance)[0] + if len(within_bounds) > 0: + for idx in within_bounds: + if np.all(np.abs(resp[idx:] - final_val) <= tolerance): + settling_times.append(time[idx]) + break + else: + settling_times.append(np.nan) else: settling_times.append(np.nan) - # Overshoot (%) + # Overshoot overshoot = (np.max(resp) - final_val) / final_val * 100 overshoots.append(overshoot) @@ -194,14 +169,7 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di "rise_time": (np.nanmean(rise_times), np.nanstd(rise_times)), "settling_time": (np.nanmean(settling_times), np.nanstd(settling_times)), "overshoot (%)": (np.nanmean(overshoots), np.nanstd(overshoots)), - "steady_state_error": ( - np.nanmean(steady_state_errors), - np.nanstd(steady_state_errors), - ), + "steady_state_error": (np.nanmean(steady_state_errors), np.nanstd(steady_state_errors)), } - print("Step Response Metrics (mean ± std):") - for k, v in metrics.items(): - print(f"{k}: {v[0]:.3f} ± {v[1]:.3f}") - return metrics From f9fcb5c7329964c00ad03637f6141756fda930d6 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Fri, 3 Oct 2025 17:23:15 +0200 Subject: [PATCH 06/10] autotune: refactor pid_analyse_window --- autotune/pid_analyse_window.py | 171 +++++++++++++++++---------------- 1 file changed, 88 insertions(+), 83 deletions(-) diff --git a/autotune/pid_analyse_window.py b/autotune/pid_analyse_window.py index 5ff9a05..3ae5ff1 100644 --- a/autotune/pid_analyse_window.py +++ b/autotune/pid_analyse_window.py @@ -1,4 +1,3 @@ -# pid_analyse_window.py import matplotlib.pyplot as plt import numpy as np from numpy.lib.stride_tricks import sliding_window_view @@ -26,22 +25,24 @@ def __init__(self, parent=None): def generate_step_response(self, u: np.ndarray, y: np.ndarray, t: np.ndarray) -> dict: """Compute and plot step response, updating the canvas.""" self.ax.clear() - metrics = plot_closed_loop_step_response(u, y, t, ax=self.ax) + + time, responses = compute_step_responses(u, y, t) + if responses is None: + return {} + + metrics = compute_step_response_metrics(time, responses) + plot_step_responses(time, responses, metrics, ax=self.ax) + self.canvas.draw() return metrics -def plot_closed_loop_step_response( - u: np.ndarray, - y: np.ndarray, - t: np.ndarray, - cutfreq: float = 25.0, - window_duration: float = 1.0, - ax=None -) -> dict: +def compute_step_responses(u: np.ndarray, y: np.ndarray, t: np.ndarray, + cutfreq: float = 25.0, window_duration: float = 1.0): + """Compute step responses from input/output signals.""" dt = np.diff(t).mean() if np.isclose(dt, 0): - return {} + return None, None fs = 1 / dt frame_samples = int(window_duration * fs) @@ -49,7 +50,7 @@ def plot_closed_loop_step_response( response_window_samples = int(0.5 * fs) time_response = t[:response_window_samples] - t[0] - # Extract overlapping windows + # Sliding windows u_windows = sliding_window_view(u, frame_samples)[::shift] y_windows = sliding_window_view(y, frame_samples)[::shift] @@ -58,80 +59,16 @@ def plot_closed_loop_step_response( u_windows = u_windows * window_func y_windows = y_windows * window_func - # Wiener deconvolution + + # Deconvolution deconvolved = wiener_deconvolution(u_windows, y_windows, cutfreq, fs) step_responses = deconvolved[:, :response_window_samples].cumsum(axis=1) - # Plot with uncertainty and compute metrics - metrics = plot_step_responses_with_metrics(time_response, step_responses, ax=ax) - return metrics - - -def wiener_deconvolution(input_: np.ndarray, output: np.ndarray, cutoff_freq: float, fs: float, epsilon: float = 1e-3) -> np.ndarray: - """ - Perform Wiener deconvolution on input/output signals - """ - # Pad to next power-of-2 FFT - n_samples = input_.shape[1] - n_fft = 2 ** int(np.ceil(np.log2(n_samples))) - input_padded = np.pad(input_, ((0, 0), (0, n_fft - n_samples)), mode="constant") - output_padded = np.pad(output, ((0, 0), (0, n_fft - n_samples)), mode="constant") - - # FFT - H = np.fft.fft(input_padded, axis=-1) - G = np.fft.fft(output_padded, axis=-1) - - # Frequency-domain Wiener filter - snr = create_frequency_mask(n_fft, cutoff_freq, fs) - H_conj = np.conj(H) - deconv_freq = (H_conj * G) / (H * H_conj + epsilon / snr[None, :]) - - # IFFT to get impulse response - deconvolved = np.real(np.fft.ifft(deconv_freq, axis=-1)) - return deconvolved[:, :n_samples] - - -def create_frequency_mask(n_samples: int, cutoff_freq: float, fs: float, sigma_factor: float = 6.0) -> np.ndarray: - """ - Create a smooth low-pass mask - """ - freqs = np.fft.fftfreq(n_samples, 1 / fs) - mask = np.exp(-0.5 * (freqs / cutoff_freq) ** 2) # Gaussian low-pass - mask = gaussian_filter1d(mask, sigma=n_samples / sigma_factor) - return np.clip(mask, 1e-3, 1.0) # avoid zeros - - -def plot_step_responses_with_metrics(time: np.ndarray, responses: np.ndarray, ax=None) -> dict: - mean_resp = responses.mean(axis=0) - std_resp = responses.std(axis=0) - - if ax is None: - fig, ax = plt.subplots(figsize=(8, 5)) - - for resp in responses: - ax.plot(time, resp, alpha=0.2, color="gray") - ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") - ax.fill_between(time, mean_resp - std_resp, mean_resp + std_resp, color="blue", alpha=0.2, label="±1 std") - ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input") - ax.set_xlabel("Time [s]") - ax.set_ylabel("Step Response") - ax.set_title("Estimated Step Response with Uncertainty") - ax.legend() - ax.grid(True) - - metrics = compute_step_response_metrics(time, responses) - - # Add metrics as annotation - metrics_text = "\n".join( - f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() if not np.isnan(v[0]) - ) - ax.text(0.98, 0.02, metrics_text, ha="right", va="bottom", - transform=ax.transAxes, fontsize=9, color="gray") - - return metrics + return time_response, step_responses def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> dict: + """Compute rise time, settling time, overshoot, steady-state error.""" rise_times, settling_times, overshoots, steady_state_errors = [], [], [], [] for resp in responses: @@ -158,11 +95,13 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di else: settling_times.append(np.nan) - # Overshoot - overshoot = (np.max(resp) - final_val) / final_val * 100 + # Overshoot (relative to final value) + peak_val = np.max(resp[: int(len(resp) * 0.8)]) # ignore drift at end + overshoot = (peak_val - final_val) / final_val * 100 + overshoot = max(0.0, overshoot) # no negative overshoot here overshoots.append(overshoot) - # Steady-state error + # Steady-state error (final value vs ideal 1.0) steady_state_errors.append(final_val - 1) metrics = { @@ -173,3 +112,69 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di } return metrics + + +def plot_step_responses(time: np.ndarray, responses: np.ndarray, metrics: dict = None, ax=None): + """Plot step responses and optionally show metrics.""" + mean_resp = responses.mean(axis=0) + std_resp = responses.std(axis=0) + + if ax is None: + fig, ax = plt.subplots(figsize=(8, 5)) + + # Plot all responses lightly + ax.plot(time, responses.T, alpha=0.2, color="gray") + ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") + ax.fill_between(time, mean_resp - std_resp, mean_resp + std_resp, + color="blue", alpha=0.2, label="±1 std") + + # Reference step input + ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input") + + ax.set_xlabel("Time [s]") + ax.set_ylabel("Step Response") + ax.set_title("Estimated Step Response") + ax.legend() + ax.grid(True) + + if metrics: + metrics_text = "\n".join( + f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() if not np.isnan(v[0]) + ) + ax.text(0.98, 0.02, metrics_text, ha="right", va="bottom", + transform=ax.transAxes, fontsize=9, color="gray") + + return ax + + +def wiener_deconvolution(input_: np.ndarray, output: np.ndarray, + cutoff_freq: float, fs: float, epsilon: float = 1e-3) -> np.ndarray: + """Perform Wiener deconvolution on input/output signals.""" + n_samples = input_.shape[1] + n_fft = 2 ** int(np.ceil(np.log2(n_samples))) + + # Pad for FFT + input_padded = np.pad(input_, ((0, 0), (0, n_fft - n_samples)), mode="constant") + output_padded = np.pad(output, ((0, 0), (0, n_fft - n_samples)), mode="constant") + + # FFT + H = np.fft.fft(input_padded, axis=-1) + G = np.fft.fft(output_padded, axis=-1) + + # Wiener filter + snr = create_frequency_mask(n_fft, cutoff_freq, fs) + H_conj = np.conj(H) + deconv_freq = (H_conj * G) / (H * H_conj + epsilon / snr[None, :]) + + # IFFT to get impulse response + deconvolved = np.real(np.fft.ifft(deconv_freq, axis=-1)) + return deconvolved[:, :n_samples] + + +def create_frequency_mask(n_samples: int, cutoff_freq: float, fs: float, + sigma_factor: float = 6.0) -> np.ndarray: + """Create a smooth low-pass Gaussian frequency mask.""" + freqs = np.fft.fftfreq(n_samples, 1 / fs) + mask = np.exp(-0.5 * (freqs / cutoff_freq) ** 2) + mask = gaussian_filter1d(mask, sigma=n_samples / sigma_factor) + return np.clip(mask, 1e-3, 1.0) From 89de467be23a92ff5d0d0090315869ee07215281 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Mon, 6 Oct 2025 10:33:41 +0200 Subject: [PATCH 07/10] remove empty line --- autotune/data_selection_window.py | 1 - 1 file changed, 1 deletion(-) diff --git a/autotune/data_selection_window.py b/autotune/data_selection_window.py index b1ebc4b..6dec9d8 100644 --- a/autotune/data_selection_window.py +++ b/autotune/data_selection_window.py @@ -340,7 +340,6 @@ def plotPIDAnalysis(self): # Update plot self.pid_window.generate_step_response(u_sel, y_sel, t_sel) - def plotCoherence(self): if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0: return From bb626d9d40db5f9da0927b32df2e2db54dc38fbf Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Mon, 6 Oct 2025 10:34:42 +0200 Subject: [PATCH 08/10] autotune: remove outliers from step responses and formatting --- autotune/pid_analyse_window.py | 94 +++++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/autotune/pid_analyse_window.py b/autotune/pid_analyse_window.py index 3ae5ff1..427fc57 100644 --- a/autotune/pid_analyse_window.py +++ b/autotune/pid_analyse_window.py @@ -1,4 +1,5 @@ import matplotlib.pyplot as plt +from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar import numpy as np from numpy.lib.stride_tricks import sliding_window_view from scipy.ndimage import gaussian_filter1d @@ -7,7 +8,7 @@ class PIDAnalyseWindow(QDialog): - """Dialog window to show estimated step response""" + """Dialog window to show estimated step response.""" def __init__(self, parent=None): super().__init__(parent) @@ -19,11 +20,14 @@ def __init__(self, parent=None): # Layout layout = QVBoxLayout() + layout.addWidget(NavigationToolbar(self.canvas, self)) layout.addWidget(self.canvas) self.setLayout(layout) - def generate_step_response(self, u: np.ndarray, y: np.ndarray, t: np.ndarray) -> dict: - """Compute and plot step response, updating the canvas.""" + def generate_step_response( + self, u: np.ndarray, y: np.ndarray, t: np.ndarray + ) -> dict: + """Compute and plot step response.""" self.ax.clear() time, responses = compute_step_responses(u, y, t) @@ -37,8 +41,13 @@ def generate_step_response(self, u: np.ndarray, y: np.ndarray, t: np.ndarray) -> return metrics -def compute_step_responses(u: np.ndarray, y: np.ndarray, t: np.ndarray, - cutfreq: float = 25.0, window_duration: float = 1.0): +def compute_step_responses( + u: np.ndarray, + y: np.ndarray, + t: np.ndarray, + cutfreq: float = 25.0, + window_duration: float = 1.0, +): """Compute step responses from input/output signals.""" dt = np.diff(t).mean() if np.isclose(dt, 0): @@ -59,11 +68,17 @@ def compute_step_responses(u: np.ndarray, y: np.ndarray, t: np.ndarray, u_windows = u_windows * window_func y_windows = y_windows * window_func - # Deconvolution deconvolved = wiener_deconvolution(u_windows, y_windows, cutfreq, fs) step_responses = deconvolved[:, :response_window_samples].cumsum(axis=1) + # Remove outlier windows + peaks = step_responses.max(axis=1) + median_peak = np.median(peaks) + mad = np.median(np.abs(peaks - median_peak)) + mask = np.abs(peaks - median_peak) < 3 * mad + step_responses = step_responses[mask] + return time_response, step_responses @@ -87,7 +102,10 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di within_bounds = np.where(np.abs(resp - final_val) <= tolerance)[0] if len(within_bounds) > 0: for idx in within_bounds: - if np.all(np.abs(resp[idx:] - final_val) <= tolerance): + # check that response stays within bounds till the end + if np.all( + np.abs(resp[idx:] - final_val) <= tolerance + ): settling_times.append(time[idx]) break else: @@ -95,27 +113,31 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di else: settling_times.append(np.nan) - # Overshoot (relative to final value) + # Overshoot peak_val = np.max(resp[: int(len(resp) * 0.8)]) # ignore drift at end overshoot = (peak_val - final_val) / final_val * 100 - overshoot = max(0.0, overshoot) # no negative overshoot here overshoots.append(overshoot) # Steady-state error (final value vs ideal 1.0) steady_state_errors.append(final_val - 1) metrics = { - "rise_time": (np.nanmean(rise_times), np.nanstd(rise_times)), - "settling_time": (np.nanmean(settling_times), np.nanstd(settling_times)), + "rise_time (s)": (np.nanmean(rise_times), np.nanstd(rise_times)), + "settling_time (s)": (np.nanmean(settling_times), np.nanstd(settling_times)), "overshoot (%)": (np.nanmean(overshoots), np.nanstd(overshoots)), - "steady_state_error": (np.nanmean(steady_state_errors), np.nanstd(steady_state_errors)), + "steady_state_error": ( + np.nanmean(steady_state_errors), + np.nanstd(steady_state_errors), + ), } return metrics -def plot_step_responses(time: np.ndarray, responses: np.ndarray, metrics: dict = None, ax=None): - """Plot step responses and optionally show metrics.""" +def plot_step_responses( + time: np.ndarray, responses: np.ndarray, metrics: dict = None, ax=None +): + """Plot step responses and show metrics.""" mean_resp = responses.mean(axis=0) std_resp = responses.std(axis=0) @@ -125,8 +147,14 @@ def plot_step_responses(time: np.ndarray, responses: np.ndarray, metrics: dict = # Plot all responses lightly ax.plot(time, responses.T, alpha=0.2, color="gray") ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response") - ax.fill_between(time, mean_resp - std_resp, mean_resp + std_resp, - color="blue", alpha=0.2, label="±1 std") + ax.fill_between( + time, + mean_resp - std_resp, + mean_resp + std_resp, + color="blue", + alpha=0.2, + label="±1 std", + ) # Reference step input ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input") @@ -134,22 +162,37 @@ def plot_step_responses(time: np.ndarray, responses: np.ndarray, metrics: dict = ax.set_xlabel("Time [s]") ax.set_ylabel("Step Response") ax.set_title("Estimated Step Response") - ax.legend() + ax.legend(loc="upper right") ax.grid(True) if metrics: metrics_text = "\n".join( - f"{k}: {v[0]:.2f} ± {v[1]:.2f}" for k, v in metrics.items() if not np.isnan(v[0]) + f"{k}: {v[0]:.2f} ± {v[1]:.2f}" + for k, v in metrics.items() + if not np.isnan(v[0]) + ) + ax.text( + 0.98, + 0.02, + metrics_text, + ha="right", + va="bottom", + transform=ax.transAxes, + fontsize=9, + color="gray", ) - ax.text(0.98, 0.02, metrics_text, ha="right", va="bottom", - transform=ax.transAxes, fontsize=9, color="gray") return ax -def wiener_deconvolution(input_: np.ndarray, output: np.ndarray, - cutoff_freq: float, fs: float, epsilon: float = 1e-3) -> np.ndarray: - """Perform Wiener deconvolution on input/output signals.""" +def wiener_deconvolution( + input_: np.ndarray, + output: np.ndarray, + cutoff_freq: float, + fs: float, + epsilon: float = 1e-3, +) -> np.ndarray: + """Wiener deconvolution on input/output signals.""" n_samples = input_.shape[1] n_fft = 2 ** int(np.ceil(np.log2(n_samples))) @@ -171,8 +214,9 @@ def wiener_deconvolution(input_: np.ndarray, output: np.ndarray, return deconvolved[:, :n_samples] -def create_frequency_mask(n_samples: int, cutoff_freq: float, fs: float, - sigma_factor: float = 6.0) -> np.ndarray: +def create_frequency_mask( + n_samples: int, cutoff_freq: float, fs: float, sigma_factor: float = 6.0 +) -> np.ndarray: """Create a smooth low-pass Gaussian frequency mask.""" freqs = np.fft.fftfreq(n_samples, 1 / fs) mask = np.exp(-0.5 * (freqs / cutoff_freq) ** 2) From a49d12a67f96686f82257b880adf773059eedf19 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Mon, 6 Oct 2025 10:47:53 +0200 Subject: [PATCH 09/10] formatting --- autotune/pid_analyse_window.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/autotune/pid_analyse_window.py b/autotune/pid_analyse_window.py index 427fc57..8b98556 100644 --- a/autotune/pid_analyse_window.py +++ b/autotune/pid_analyse_window.py @@ -103,9 +103,7 @@ def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> di if len(within_bounds) > 0: for idx in within_bounds: # check that response stays within bounds till the end - if np.all( - np.abs(resp[idx:] - final_val) <= tolerance - ): + if np.all(np.abs(resp[idx:] - final_val) <= tolerance): settling_times.append(time[idx]) break else: From 50f4264e18072960afc3e54fe5c974963f874145 Mon Sep 17 00:00:00 2001 From: mahima-yoga Date: Mon, 6 Oct 2025 10:49:32 +0200 Subject: [PATCH 10/10] formatting --- autotune/pid_analyse_window.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autotune/pid_analyse_window.py b/autotune/pid_analyse_window.py index 8b98556..7c2bfeb 100644 --- a/autotune/pid_analyse_window.py +++ b/autotune/pid_analyse_window.py @@ -1,10 +1,10 @@ import matplotlib.pyplot as plt -from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar import numpy as np -from numpy.lib.stride_tricks import sliding_window_view -from scipy.ndimage import gaussian_filter1d from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar +from numpy.lib.stride_tricks import sliding_window_view from PyQt5.QtWidgets import QDialog, QVBoxLayout +from scipy.ndimage import gaussian_filter1d class PIDAnalyseWindow(QDialog):