diff --git a/autotune/autotune.py b/autotune/autotune.py index a21f8b4..99f10a4 100644 --- a/autotune/autotune.py +++ b/autotune/autotune.py @@ -46,7 +46,7 @@ from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar from pid_design import computePidGmvc -from PyQt5.QtCore import Qt +from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtWidgets import ( QApplication, QCheckBox, @@ -77,6 +77,97 @@ from system_identification import SystemIdentification +def computeNRMSE(y, y_est): + # Normalized Root Mean Square Error (NRMSE) expressed as a percentage. + norm_ref = np.linalg.norm(y - np.mean(y)) + if norm_ref < 1e-10: + return -np.inf + return 100.0 * (1.0 - np.linalg.norm(y - y_est) / norm_ref) + + +def compute_fit(u, y, t, dt, n_poles, n_zeros, delay, f_hp, f_lp, method="RLS"): + try: + sys_id = SystemIdentification(n_poles, n_zeros, delay, dt) + sys_id.f_hp = f_hp + sys_id.f_lp = f_lp + est = sys_id.fit(u.reshape(-1, 1), y.reshape(-1, 1), method=method) + Gz = ctrl.TransferFunction( + est.G_.num_list[0][0], est.G_.den_list[0][0][: n_poles + 1], dt + ) + u_detrended = detrend(u) + u_delayed = np.concatenate( + ([0] * delay, u_detrended[: len(u_detrended) - delay]) + ) + _, y_est = ctrl.forced_response(Gz, T=t, U=u_delayed) + y_detrended = detrend(y[: len(y_est)]) + y_est_detrended = detrend(y_est) + return computeNRMSE(y_detrended, y_est_detrended) + except Exception: + return -np.inf + + +class ParamSearchWorker(QThread): + finished = pyqtSignal(dict, float) + progress = pyqtSignal(int, int) + + def __init__(self, u, y, t, dt, f_hp, f_lp, method): + super().__init__() + self._cancel = False + self.u = u + self.y = y + self.t = t + self.dt = dt + self.f_hp = f_hp + self.f_lp = f_lp + self.method = method + + def run(self): + n_poles_range = range(2, 8) + n_zeros_range = range(2, 8) + delay_range = range(0, 6) + + combos = [ + (n_poles, n_zeros, delay) + for n_poles in n_poles_range + for n_zeros in n_zeros_range + for delay in delay_range + if n_poles >= n_zeros + ] + total = len(combos) + best_fit = -np.inf + best_params = {} + + for i, (n_poles, n_zeros, delay) in enumerate(combos): + fit = compute_fit( + self.u, + self.y, + self.t, + self.dt, + n_poles, + n_zeros, + delay, + self.f_hp, + self.f_lp, + self.method, + ) + new_order = n_poles + best_order = best_params.get("n_poles", 0) + # Prefer lower-order models: a higher-order model must improve fit + # by more than 1% to be accepted, avoiding overfitting. + if (fit > best_fit + 1.0) or (fit > best_fit and new_order == best_order): + best_fit = fit + best_params = { + "n_poles": n_poles, + "n_zeros": n_zeros, + "delay": delay, + } + self.progress.emit(i + 1, total) + if self._cancel: + break + + self.finished.emit(best_params, best_fit) + + def isNumber(value): try: float(value) @@ -93,6 +184,15 @@ def __init__(self, parent=None): self.input_ref = None self.closed_loop_ref = None self.closed_loop_ax = None + self.measured_step_info = None + self.step_info_patches = [] + self.step_info_spinbox = {} + self.step_info_measured_lbl = {} + self.step_info = { + "rise_time": 0.12, + "overshoot": 10.0, + "settling_time": 0.4, + } self.bode_plot_ref = [] self.pz_plot_refs = [] self.file_name = None @@ -111,6 +211,8 @@ def __init__(self, parent=None): self.sys_id_n_zeros = 2 self.sys_id_n_poles = 2 + self.kDisturbanceTime = 1.0 + # this is the Canvas Widget that displays the `figure` # it takes the `figure` instance as a parameter to __init__ self.canvas = FigureCanvas(self.figure) @@ -129,50 +231,13 @@ def __init__(self, parent=None): left_menu.addWidget(self.btn_open_log) id_params_group = QFormLayout() - self.line_edit_zeros = QSpinBox() - self.line_edit_zeros.setValue(self.sys_id_n_zeros) - self.line_edit_zeros.setRange(0, 6) - self.line_edit_zeros.valueChanged.connect(self.onZerosChanged) - id_params_group.addRow(QLabel("Zeros"), self.line_edit_zeros) - self.line_edit_poles = QSpinBox() - self.line_edit_poles.setValue(self.sys_id_n_poles) - self.line_edit_poles.setRange(0, 6) - self.line_edit_poles.valueChanged.connect(self.onPolesChanged) - id_params_group.addRow(QLabel("Poles"), self.line_edit_poles) - self.line_edit_delays = QSpinBox() - self.line_edit_delays.setValue(self.sys_id_delays) - self.line_edit_delays.setRange(0, 1000) - self.line_edit_delays.valueChanged.connect(self.onDelaysChanged) - id_params_group.addRow(QLabel("Delays"), self.line_edit_delays) - input_scale_group = QGroupBox("Input scaling") - input_scale_group.setToolTip( - "Scale the input to identify a model at trim airspeed (requires true airspeed data)" - ) - - input_scale_form = QFormLayout() - self.input_scale_combo = QComboBox() - self.input_scale_combo.setEditable(False) - self.input_scale_choices = ["True airspeed^2", "True airspeed", "None"] - self.input_scale_combo.addItems(self.input_scale_choices) - self.input_scale_combo.setEnabled(False) - self.input_scale_combo.currentIndexChanged.connect(self.selectInputScale) - input_scale_form.addRow(self.input_scale_combo) - - self.line_edit_trim = QDoubleSpinBox() - self.trim_airspeed = 20.0 - self.line_edit_trim.setValue(self.trim_airspeed) - self.line_edit_trim.setRange(0.0, 100.0) - self.line_edit_trim.textChanged.connect(self.onTrimChanged) - self.line_edit_trim.setEnabled(False) - input_scale_form.addRow(QLabel("Trim airspeed"), self.line_edit_trim) - input_scale_group.setLayout(input_scale_form) - id_params_group.addRow(input_scale_group) - - self.btn_run_sys_id = QPushButton("Run identification") - self.btn_run_sys_id.clicked.connect(self.onSysIdClicked) - self.btn_run_sys_id.setEnabled(False) - - id_params_group.addRow(self.btn_run_sys_id) + self.createPreprocessingWidget(id_params_group) + self.createFindParamsButton(id_params_group) + self.createModelOrderWidgets(id_params_group) + self.createMethodWidget(id_params_group) + self.createRunSysIdButton(id_params_group) + self.createFitWidget(id_params_group) + self.createStabilityWidget(id_params_group) left_menu.addLayout(id_params_group) @@ -206,17 +271,126 @@ def __init__(self, parent=None): layout_plot.addWidget(self.canvas) layout_v.addLayout(layout_h) layout_v.setStretch(0, 1) - layout_v.addWidget(self.tuning_tabs) + bottom_row = QHBoxLayout() + bottom_row.addWidget(self.tuning_tabs, stretch=1) + bottom_row.addWidget(self.createStepInfoGroup()) + layout_v.addLayout(bottom_row) self.setLayout(layout_v) def reset(self): self.model_ref = None self.input_ref = None self.closed_loop_ref = None + self.measured_step_info = None + self.step_info_patches = [] + self.lbl_fit.setText("—") + self.lbl_fit.setStyleSheet("") + self.lbl_stability.setText("—") + self.lbl_stability.setStyleSheet("") + self.btn_stabilize.setVisible(False) self.bode_plot_ref = [] self.pz_plot_refs = [] self.is_system_identified = False + def createModelOrderWidgets(self, layout): + self.line_edit_zeros = QSpinBox() + self.line_edit_zeros.setValue(self.sys_id_n_zeros) + self.line_edit_zeros.setRange(0, 6) + self.line_edit_zeros.valueChanged.connect(self.onZerosChanged) + layout.addRow(QLabel("Zeros"), self.line_edit_zeros) + self.line_edit_poles = QSpinBox() + self.line_edit_poles.setValue(self.sys_id_n_poles) + self.line_edit_poles.setRange(0, 6) + self.line_edit_poles.valueChanged.connect(self.onPolesChanged) + layout.addRow(QLabel("Poles"), self.line_edit_poles) + self.line_edit_delays = QSpinBox() + self.line_edit_delays.setValue(self.sys_id_delays) + self.line_edit_delays.setRange(0, 1000) + self.line_edit_delays.valueChanged.connect(self.onDelaysChanged) + layout.addRow(QLabel("Delays"), self.line_edit_delays) + + def createMethodWidget(self, layout): + self.id_method_combo = QComboBox() + self.id_method_combo.addItems(["OLS", "RLS"]) + self.id_method_combo.currentIndexChanged.connect( + lambda: self.btn_run_sys_id.setEnabled(True) + ) + layout.addRow(QLabel("Method"), self.id_method_combo) + + def createPreprocessingWidget(self, layout): + preproc_group = QGroupBox("Pre-processing") + preproc_form = QFormLayout() + self.f_hp_spinbox = QDoubleSpinBox() + self.f_hp_spinbox.setRange(0.0, 50.0) + self.f_hp_spinbox.setSingleStep(0.1) + self.f_hp_spinbox.setDecimals(1) + self.f_hp_spinbox.setValue(0.5) + self.f_hp_spinbox.valueChanged.connect( + lambda: self.btn_run_sys_id.setEnabled(True) + ) + preproc_form.addRow(QLabel("HP cutoff (Hz)"), self.f_hp_spinbox) + self.f_lp_spinbox = QDoubleSpinBox() + self.f_lp_spinbox.setRange(1.0, 200.0) + self.f_lp_spinbox.setSingleStep(1.0) + self.f_lp_spinbox.setDecimals(1) + self.f_lp_spinbox.setValue(30.0) + self.f_lp_spinbox.valueChanged.connect( + lambda: self.btn_run_sys_id.setEnabled(True) + ) + preproc_form.addRow(QLabel("LP cutoff (Hz)"), self.f_lp_spinbox) + input_scale_group = QGroupBox("Input scaling") + input_scale_group.setToolTip( + "Scale the input to identify a model at trim airspeed (requires true airspeed data)" + ) + input_scale_form = QFormLayout() + self.input_scale_combo = QComboBox() + self.input_scale_combo.setEditable(False) + self.input_scale_choices = ["True airspeed^2", "True airspeed", "None"] + self.input_scale_combo.addItems(self.input_scale_choices) + self.input_scale_combo.setEnabled(False) + self.input_scale_combo.currentIndexChanged.connect(self.selectInputScale) + input_scale_form.addRow(self.input_scale_combo) + self.line_edit_trim = QDoubleSpinBox() + self.trim_airspeed = 20.0 + self.line_edit_trim.setValue(self.trim_airspeed) + self.line_edit_trim.setRange(0.0, 100.0) + self.line_edit_trim.textChanged.connect(self.onTrimChanged) + self.line_edit_trim.setEnabled(False) + input_scale_form.addRow(QLabel("Trim airspeed"), self.line_edit_trim) + input_scale_group.setLayout(input_scale_form) + preproc_form.addRow(input_scale_group) + preproc_group.setLayout(preproc_form) + layout.addRow(preproc_group) + + def createRunSysIdButton(self, layout): + self.btn_run_sys_id = QPushButton("Run identification") + self.btn_run_sys_id.clicked.connect(self.onSysIdClicked) + self.btn_run_sys_id.setEnabled(False) + layout.addRow(self.btn_run_sys_id) + + def createFindParamsButton(self, layout): + self.btn_find_params = QPushButton("Find parameters") + self.btn_find_params.clicked.connect(self.onFindParamsClicked) + self.btn_find_params.setEnabled(False) + layout.addRow(self.btn_find_params) + + def createFitWidget(self, layout): + self.lbl_fit = QLabel("—") + layout.addRow(QLabel("Fit"), self.lbl_fit) + + def createStabilityWidget(self, layout): + self.lbl_stability = QLabel("—") + self.btn_stabilize = QPushButton("Stabilize") + self.btn_stabilize.setVisible(False) + self.btn_stabilize.setToolTip( + "Reflects unstable poles inside the unit circle (p → 1/p*)" + ) + self.btn_stabilize.clicked.connect(self.stabilizeModel) + stability_row = QHBoxLayout() + stability_row.addWidget(self.lbl_stability) + stability_row.addWidget(self.btn_stabilize) + layout.addRow(QLabel("Stability"), stability_row) + def createTfLayout(self): layout_tf = QVBoxLayout() self.t_coeffs = QTableWidget() @@ -313,6 +487,38 @@ def onSysIdClicked(self): self.runIdentification() self.computeController() + def onFindParamsClicked(self): + if ( + hasattr(self, "_param_search_worker") + and self._param_search_worker.isRunning() + ): + self._param_search_worker._cancel = True + return + self.btn_find_params.setText("Cancel (0%)") + self._param_search_worker = ParamSearchWorker( + self.u.copy(), + self.y.copy(), + self.t.copy(), + self.dt, + self.f_hp_spinbox.value(), + self.f_lp_spinbox.value(), + self.id_method_combo.currentText(), + ) + self._param_search_worker.progress.connect(self.onParamSearchProgress) + self._param_search_worker.finished.connect(self.onParamSearchFinished) + self._param_search_worker.start() + + def onParamSearchProgress(self, current, total): + self.btn_find_params.setText(f"Cancel ({100 * current // total}%)") + + def onParamSearchFinished(self, best_params, best_fit): + self.line_edit_poles.setValue(best_params["n_poles"]) + self.line_edit_zeros.setValue(best_params["n_zeros"]) + self.line_edit_delays.setValue(best_params["delay"]) + self.btn_find_params.setText("Find parameters") + self.btn_find_params.setEnabled(True) + self.onSysIdClicked() + def printImproperTfError(self): msg = QMessageBox() msg.setIcon(QMessageBox.Critical) @@ -328,7 +534,7 @@ def createPidLayout(self): "P": {"min": 0.001, "max": 4.0, "step": 0.001}, "I": {"min": 0.0, "max": 20.0, "step": 0.1}, "D": {"min": 0.0, "max": 0.2, "step": 0.001}, - "FF": {"min": 0.0, "max": 1.0, "step": 0.001}, + "FF": {"min": -1.0, "max": 1.0, "step": 0.001}, } def make_slider_callback(gain): @@ -365,6 +571,7 @@ def make_line_edit_callback(gain): self.gain_slider[gain].setMinimum(slider_props[gain]["min"]) self.gain_slider[gain].setMaximum(slider_props[gain]["max"]) self.gain_slider[gain].setInterval(slider_props[gain]["step"]) + self.gain_slider[gain].setValue(self.gains[gain]) self.gain_slider[gain].valueChanged.connect(make_slider_callback(gain)) layout_pid.addWidget(self.gain_slider[gain], row, 1) @@ -391,6 +598,87 @@ def make_line_edit_callback(gain): return layout_pid + def createStepInfoGroup(self): + specs = { + "rise_time": ("Rise time", 0.12, 0.01, 2.0, 0.01, "s"), + "overshoot": ("Overshoot", 10.0, 0.0, 100.0, 0.5, "%"), + "settling_time": ("Settling time", 0.4, 0.01, 5.0, 0.01, "s"), + } + group = QGroupBox("Step info") + grid = QGridLayout() + grid.addWidget(QLabel("Max"), 0, 1) + grid.addWidget(QLabel("Measured"), 0, 2) + for row, (key, (label, default, lo, hi, step, unit)) in enumerate( + specs.items(), start=1 + ): + sb = QDoubleSpinBox() + sb.setRange(lo, hi) + sb.setSingleStep(step) + sb.setDecimals( + len(str(step).rstrip("0").split(".")[-1]) if "." in str(step) else 0 + ) + sb.setValue(default) + sb.valueChanged.connect(self.onStepInfoChanged) + self.step_info_spinbox[key] = sb + + measured_lbl = QLabel("—") + self.step_info_measured_lbl[key] = measured_lbl + + grid.addWidget(QLabel(label + " (" + unit + ")"), row, 0) + grid.addWidget(sb, row, 1) + grid.addWidget(measured_lbl, row, 2) + + group.setLayout(grid) + return group + + def onStepInfoChanged(self): + for key in self.step_info: + self.step_info[key] = self.step_info_spinbox[key].value() + self.updateStepInfoEnvelope() + + def updateStepInfoEnvelope(self): + if self.closed_loop_ax is None: + return + + for patch in self.step_info_patches: + patch.remove() + self.step_info_patches = [] + + ax = self.closed_loop_ax + step_info = self.step_info + kw = dict(color="#ff4444", linestyle="--", linewidth=1) + + y_limit = 1.0 + step_info["overshoot"] / 100.0 + p = ax.axhline(y_limit, **kw) + self.step_info_patches.append(p) + + p = ax.plot( + [step_info["settling_time"], step_info["settling_time"]], [0, 1.0], **kw + )[0] + self.step_info_patches.append(p) + + measured_map = { + "rise_time": ("RiseTime", lambda v: f"{v:.3f}"), + "overshoot": ("Overshoot", lambda v: f"{v:.1f}"), + "settling_time": ("SettlingTime", lambda v: f"{v:.3f}"), + } + for key, (info_key, fmt) in measured_map.items(): + lbl = self.step_info_measured_lbl[key] + if self.measured_step_info is None: + lbl.setText("—") + lbl.setStyleSheet("") + else: + measured = self.measured_step_info[info_key] + lbl.setText(fmt(measured)) + if np.isnan(measured): + lbl.setStyleSheet("") + elif measured > self.step_info[key]: + lbl.setStyleSheet("color: red") + else: + lbl.setStyleSheet("color: green") + + self.canvas.draw() + def updateGainFromSlider(self, gain: str): if self.gain_slider[gain].hasFocus(): self.gains[gain] = self.gain_slider[gain].value() @@ -487,8 +775,14 @@ def runIdentification(self): m = self.sys_id_n_zeros # order of the numerator (b_0,...,b_m) d = self.sys_id_delays # number of delays id = SystemIdentification(n, m, d, self.dt) + id.f_hp = self.f_hp_spinbox.value() + id.f_lp = self.f_lp_spinbox.value() - est = id.fit(self.u.reshape(-1, 1), self.y.reshape(-1, 1)) + est = id.fit( + self.u.reshape(-1, 1), + self.y.reshape(-1, 1), + method=self.id_method_combo.currentText(), + ) self.num = est.G_.num_list[0][0] self.den = est.G_.den_list[0][0][0 : n + 1] @@ -514,7 +808,41 @@ def replayInputData(self): self.t_est, self.y_est = ctrl.forced_response(self.Gz, T=self.t, U=u_delayed) if len(self.t_est) > len(self.y_est): self.t_est = self.t_est[0 : len(self.y_est - 1)] + + y_detrended = detrend(self.y[: len(self.y_est)]) + y_est_detrended = detrend(self.y_est) + fit = computeNRMSE(y_detrended, y_est_detrended) + self.lbl_fit.setText(f"{fit:.1f}%") + if fit >= 80: + self.lbl_fit.setStyleSheet("color: green") + elif fit >= 60: + self.lbl_fit.setStyleSheet("color: orange") + else: + self.lbl_fit.setStyleSheet("color: red") + self.plotInputOutput() + self.checkStability() + + def checkStability(self): + unstable = np.any(np.abs(self.Gz.poles()) > 1) + if unstable: + self.lbl_stability.setText("⚠ Unstable") + self.lbl_stability.setStyleSheet("color: red") + self.btn_stabilize.setVisible(True) + else: + self.lbl_stability.setText("✓ Stable") + self.lbl_stability.setStyleSheet("color: green") + self.btn_stabilize.setVisible(False) + + def stabilizeModel(self): + poles = self.Gz.poles() + stable_poles = np.where(np.abs(poles) > 1, 1.0 / np.conj(poles), poles) + self.den = np.real(np.poly(stable_poles)) + self.Gz = ctrl.TransferFunction(self.num, self.den, self.dt) + self.updateTfDisplay(self.den[1:], self.num) + self.plotPolesZeros() + self.replayInputData() + self.computeController() def updateTfDisplay(self, a_coeffs, b_coeffs): @@ -732,7 +1060,7 @@ def updateClosedLoop(self): outputs="y", ) d = np.zeros_like(t_out) - d[t_out >= 1.0] = -0.05 # TODO: parameterize + d[t_out >= self.kDisturbanceTime] = -0.05 # TODO: parameterize _, y_d = ctrl.forced_response(disturbance_loop, t_out, d) y_out += y_d @@ -764,6 +1092,15 @@ def updateClosedLoop(self): self.plotBode(open_loop, closed_loop) def plotClosedLoop(self, t, y): + # Compute metrics on pre-disturbance portion only + mask = t < self.kDisturbanceTime + try: + self.measured_step_info = ctrl.step_info( + y[mask], timepts=t[mask], final_output=1.0 + ) + except (IndexError, ValueError): + self.measured_step_info = None + if self.closed_loop_ref is None: ax = self.figure.add_subplot(3, 3, 7) ax.step(t, [1 if i > 0 else 0 for i in t], "k--") @@ -778,7 +1115,7 @@ def plotClosedLoop(self, t, y): self.closed_loop_ref.set_ydata(y) self.closed_loop_ax.set_ylim(np.min(y), np.max([1.5, np.max(y)])) - self.canvas.draw() + self.updateStepInfoEnvelope() def plotBode(self, open_loop, closed_loop): @@ -912,6 +1249,7 @@ def loadLog(self): self.line_edit_trim.setValue(trim_airspeed) self.refreshInputOutputData() + self.btn_find_params.setEnabled(True) self.runIdentification() self.computeController() diff --git a/autotune/system_identification.py b/autotune/system_identification.py index 290be14..781a0bf 100644 --- a/autotune/system_identification.py +++ b/autotune/system_identification.py @@ -41,7 +41,42 @@ import control as ctrl import numpy as np from arx_rls import ArxRls -from scipy.optimize import lsq_linear, minimize +from scipy.optimize import lsq_linear + + +def apply_filters(u, y, f_hp, f_lp, dt): + n_steps = len(u) + + if f_hp > 0.0: + tau_hp = 1 / (2 * np.pi * f_hp) + alpha_hp = tau_hp / (tau_hp + dt) + else: + alpha_hp = 0.0 + + tau_lp = 1 / (2 * np.pi * f_lp) + alpha_lp = tau_lp / (tau_lp + dt) + + u_hp = np.zeros(n_steps) + y_hp = np.zeros(n_steps) + u_hp[0] = u[0] + y_hp[0] = y[0] + u_lp = np.zeros(n_steps) + y_lp = np.zeros(n_steps) + u_lp[0] = u[0] + y_lp[0] = y[0] + + for k in range(1, n_steps): + if alpha_hp > 0.0: + u_hp[k] = alpha_hp * u_hp[k - 1] + alpha_hp * (u[k] - u[k - 1]) + y_hp[k] = alpha_hp * y_hp[k - 1] + alpha_hp * (y[k] - y[k - 1]) + else: + u_hp[k] = u[k] + y_hp[k] = y[k] + + u_lp[k] = alpha_lp * u_lp[k - 1] + (1 - alpha_lp) * u_hp[k] + y_lp[k] = alpha_lp * y_lp[k - 1] + (1 - alpha_lp) * y_hp[k] + + return u_lp, y_lp class SysIdResult(object): @@ -62,86 +97,38 @@ def __init__(self, n=2, m=2, d=1, dt=0.0): tau = 60.0 # forgetting period self.lbda = 1.0 - self.dt / tau - def fit(self, u, y): + def fit(self, u, y, method="RLS"): n_steps = len(u) - # High-pass filter parameters - if self.f_hp > 0.0: - tau_hp = 1 / (2 * np.pi * self.f_hp) - alpha_hp = tau_hp / (tau_hp + self.dt) - else: - alpha_hp = 0.0 - - u_hp = np.zeros(n_steps) - y_hp = np.zeros(n_steps) - u_hp[0] = u[0] - y_hp[0] = y[0] - - # Low-pass filter parameters - tau_lp = 1 / (2 * np.pi * self.f_lp) - alpha_lp = tau_lp / (tau_lp + self.dt) - u_lp = np.zeros(n_steps) - y_lp = np.zeros(n_steps) - u_lp[0] = u[0] - y_lp[0] = y[0] - a_coeffs = np.zeros((self.n, n_steps)) b_coeffs = np.zeros((self.m + 1, n_steps)) - # Apply high and low-pass filters - for k in range(n_steps): - if k > 0: - if alpha_hp > 0.0: - u_hp[k] = alpha_hp * u_hp[k - 1] + alpha_hp * (u[k] - u[k - 1]) - y_hp[k] = alpha_hp * y_hp[k - 1] + alpha_hp * (y[k] - y[k - 1]) - else: - u_hp[k] = u[k] - y_hp[k] = y[k] - - u_lp[k] = alpha_lp * u_lp[k - 1] + (1 - alpha_lp) * u_hp[k] - y_lp[k] = alpha_lp * y_lp[k - 1] + (1 - alpha_lp) * y_hp[k] - - use_rls = True - if use_rls: - # Identification + u_lp, y_lp = apply_filters(u, y, self.f_hp, self.f_lp, self.dt) + + if method == "RLS": rls = ArxRls( self.n, self.m, self.d, lbda=(1 - self.dt / self.forgetting_tc) ) - for k in range(n_steps): - # Update model rls.update(u_lp[k], y_lp[k]) - theta_hat = rls._theta_hat - - # Save for plotting for i in range(self.n): a_coeffs[i, k] = theta_hat[i] for i in range(self.m + 1): b_coeffs[i, k] = theta_hat[i + self.n] - - else: # use LS - # Build matrix of regressors - A = np.zeros((n_steps, self.n + self.m + 1)) - for row in range(n_steps): + elif method == "OLS": + skip = max(self.n, self.d + self.m) + rows = n_steps - skip + A = np.zeros((rows, self.n + self.m + 1)) + B = np.zeros(rows) + for row in range(skip, n_steps): for i in range(self.n): - A[row, i] = -y_lp[row - (i + 1)] + A[row - skip, i] = -y_lp[row - (i + 1)] for i in range(self.m + 1): - A[row, i + self.n] = u_lp[row - (self.d + i)] - - B = [y_lp[i] for i in range(n_steps)] # Measured values - - res = lsq_linear(A, B, lsmr_tol="auto", verbose=1) + A[row - skip, i + self.n] = u_lp[row - (self.d + i)] + B[row - skip] = y_lp[row] + res = lsq_linear(A, B, lsmr_tol="auto") theta_hat = res.x - - # Refine model using output-error optimization - J = lambda x: np.sum( - np.power(abs(np.array(B) - self.simulateModel(x, u_lp, self.dt)), 2.0) - ) # cost function - x0 = np.append(res.x, 0) # initial conditions - res = minimize(J, x0, method="nelder-mead", options={"disp": True}) - theta_hat = res.x - for i in range(self.n): a_coeffs[i, -1] = theta_hat[i] for i in range(self.m + 1): @@ -152,25 +139,6 @@ def fit(self, u, y): estimate = SysIdResult(self.getNum(), self.getDen(), self.dt) return estimate - def simulateModel(self, x, u, dt): - a_coeffs = np.ones(self.n + 1) - b_coeffs = np.zeros(self.m + 1) - - for i in range(self.n): - a_coeffs[i + 1] = x[i] - for i in range(self.m + 1): - b_coeffs[i] = x[i + self.n] - - delays = ctrl.TransferFunction( - [1], np.append([1], np.zeros(self.d)), dt, inputs="r", outputs="rd" - ) - plant = ctrl.TransferFunction(b_coeffs, a_coeffs, dt, inputs="rd", outputs="y") - - system = ctrl.interconnect([delays, plant], inputs="r", outputs="y") - - _, y = ctrl.forced_response(system, U=u) - return y - def getNum(self): num = [ self.theta_hat.item(i) for i in range(self.n, self.n + self.m + 1)