diff --git a/configure/docker/application.properties b/configure/docker/application.properties index 89441658e1..064095234d 100644 --- a/configure/docker/application.properties +++ b/configure/docker/application.properties @@ -37,33 +37,10 @@ spring.redis.database=0 # spring.redis.password= ################################ Executor configure ################################# -# If this directory is not set -# the system will get the project root directory to build the data subdirectory - -# ------ Apache Seatunnel for Spark ------ # -datacap.executor.data= -datacap.executor.way=LOCAL -datacap.executor.mode=CLIENT -datacap.executor.engine=SPARK -datacap.executor.startScript=start-seatunnel-spark-connector-v2.sh -datacap.executor.seatunnel.home=/opt/lib/seatunnel - -# ------ Apache Seatunnel for Flink ------ # -# datacap.executor.data= -# datacap.executor.way=LOCAL -# datacap.executor.mode=CLIENT -# datacap.executor.engine=FLINK -# datacap.executor.startScript=start-seatunnel-flink-13-connector-v2.sh -# datacap.executor.seatunnel.home=/opt/lib/seatunnel - -# ------ Apache Seatunnel for seatunnel ------ # -# datacap.executor.data= -## Only support LOCAL -# datacap.executor.way=LOCAL -# datacap.executor.mode=CLIENT -# datacap.executor.engine=SEATUNNEL -# datacap.executor.startScript=seatunnel.sh -# datacap.executor.seatunnel.home=/opt/lib/seatunnel +# Executor configuration (formerly datacap.executor.*) is now stored in the database table +# datacap_configure (category=EXECUTOR, name=). Defaults come from each +# ExecutorPlugin's configures() SPI declaration and are auto-seeded on first startup. +# Edit via the admin "Runtime Configuration" page at /system/configure. ################################ Upload configure ################################# datacap.config.data= @@ -110,14 +87,9 @@ datacap.pipeline.maxQueue=200 datacap.pipeline.reset=STOPPED ################################# DataSet configure ################################# -datacap.dataset.type=ClickHouse -datacap.dataset.host=app-clickhouse -datacap.dataset.port=8123 -datacap.dataset.username=default -datacap.dataset.password=da39a3ee5e6b4b0d3255bfef95601890afd80709 -datacap.dataset.database=datacap -datacap.dataset.tablePrefix=datacap_ -datacap.dataset.tableDefaultEngine=MergeTree +# Dataset target storage configuration (formerly datacap.dataset.*) is now stored in +# datacap_configure (category=DATASET, name=Default). Defaults come from DatasetTargetSchema +# and are auto-seeded on first startup. Edit via /system/configure. ################################# Plugin configure ################################# datacap.parser.sql.defaultEngine=Trino diff --git a/configure/etc/conf/application.properties b/configure/etc/conf/application.properties index 942fdafc57..20bd090d2c 100644 --- a/configure/etc/conf/application.properties +++ b/configure/etc/conf/application.properties @@ -37,33 +37,10 @@ spring.redis.database=0 # spring.redis.password= ################################ Executor configure ################################# -# If this directory is not set -# the system will get the project root directory to build the data subdirectory - -# ------ Apache Seatunnel for Spark ------ # -datacap.executor.data= -datacap.executor.way=LOCAL -datacap.executor.mode=CLIENT -datacap.executor.engine=SPARK -datacap.executor.startScript=start-seatunnel-spark-connector-v2.sh -datacap.executor.seatunnel.home=/opt/lib/seatunnel - -# ------ Apache Seatunnel for Flink ------ # -# datacap.executor.data= -# datacap.executor.way=LOCAL -# datacap.executor.mode=CLIENT -# datacap.executor.engine=FLINK -# datacap.executor.startScript=start-seatunnel-flink-13-connector-v2.sh -# datacap.executor.seatunnel.home=/opt/lib/seatunnel - -# ------ Apache Seatunnel for seatunnel ------ # -# datacap.executor.data= -## Only support LOCAL -# datacap.executor.way=LOCAL -# datacap.executor.mode=CLIENT -# datacap.executor.engine=SEATUNNEL -# datacap.executor.startScript=seatunnel.sh -# datacap.executor.seatunnel.home=/opt/lib/seatunnel +# Executor configuration (formerly datacap.executor.*) is now stored in the database table +# datacap_configure (category=EXECUTOR, name=). Defaults come from each +# ExecutorPlugin's configures() SPI declaration and are auto-seeded on first startup. +# Edit via the admin "Runtime Configuration" page at /system/configure. ################################ Upload configure ################################# datacap.config.data= @@ -110,14 +87,9 @@ datacap.pipeline.maxQueue=200 datacap.pipeline.reset=STOPPED ################################# DataSet configure ################################# -datacap.dataset.type=ClickHouse -datacap.dataset.host=localhost -datacap.dataset.port=8123 -datacap.dataset.username= -datacap.dataset.password= -datacap.dataset.database=datacap -datacap.dataset.tablePrefix=datacap_ -datacap.dataset.tableDefaultEngine=MergeTree +# Dataset target storage configuration (formerly datacap.dataset.*) is now stored in +# datacap_configure (category=DATASET, name=Default). Defaults come from DatasetTargetSchema +# and are auto-seeded on first startup. Edit via /system/configure. ################################# Plugin configure ################################# datacap.parser.sql.defaultEngine=Trino diff --git a/configure/etc/conf/executor/category.yaml b/configure/etc/conf/executor/category.yaml index 8c99f9e6cc..48984be673 100644 --- a/configure/etc/conf/executor/category.yaml +++ b/configure/etc/conf/executor/category.yaml @@ -1,12 +1,12 @@ - label: Input value: source supportExecutors: - - SeatunnelExecutor + - Seatunnel - label: Transform value: transform supportExecutors: - - SeatunnelExecutor + - Seatunnel - label: Output value: sink supportExecutors: - - SeatunnelExecutor + - Seatunnel diff --git a/configure/etc/conf/i18n/messages_en.properties b/configure/etc/conf/i18n/messages_en.properties index e3f90d7e72..b5ad6a39d9 100644 --- a/configure/etc/conf/i18n/messages_en.properties +++ b/configure/etc/conf/i18n/messages_en.properties @@ -13,6 +13,8 @@ state.common.running=Running state.common.success=Run Successful state.common.failure=Run Failed state.common.stop=Stopped +state.common.stopping=Stopping +state.common.interrupted=Interrupted state.common.timeout=Run Timed Out state.common.queue=Queued ## Query i18n @@ -594,6 +596,17 @@ dataset.common.stateOfStarted=Started dataset.common.stateOfMetadata=Metadata State dataset.common.stateOfMetadataStarted=Metadata Started dataset.common.stateOfCreateTable=Create Table State +dataset.state.metadataStart=Metadata Start +dataset.state.metadataFailed=Metadata Failed +dataset.state.metadataSuccess=Metadata Success +dataset.state.tableStart=Table Start +dataset.state.tableFailed=Table Failed +dataset.state.tableSuccess=Table Success +dataset.state.dataStart=Data Start +dataset.state.dataFailed=Data Failed +dataset.state.dataSuccess=Data Success +dataset.state.completeFailed=Complete Failed +dataset.state.completeSuccess=Complete Success dataset.common.syncData=Sync Data dataset.common.visual=Visualization dataset.common.visualType=Visualization Type @@ -670,6 +683,24 @@ dataset.common.lifeCycleDay=Day dataset.common.lifeCycleHour=Hour dataset.common.notSpecifiedTitle=Not Specified dataset.common.history=Sync History +dataset.history.totalCount=Total +dataset.history.processedCount=Processed +dataset.history.progress=Progress +dataset.history.viewLog=View Log +dataset.history.logger=Sync Log +dataset.history.autoRefresh=Auto Refresh +dataset.history.refreshInterval=refresh every {seconds}s +dataset.history.stop=Stop +dataset.history.stopRequested=Stop requested. The task will exit at the next row checkpoint. +dataset.history.viewConfigure=View Configure +dataset.history.configureTitle=Effective Executor Configuration +dataset.sync.overrideTitle=Per-run Override (tunable fields) +configure.runtime.title=Runtime Configuration +configure.runtime.categoryExecutor=Executor +configure.runtime.categoryDataset=Dataset Target +configure.runtime.adminOnly=admin-only +configure.runtime.empty=(no entries yet, will be seeded on next startup) +configure.runtime.selectHint=Select a configuration entry on the left to edit. dataset.common.clearData=Clear Data dataset.common.error=View Errors dataset.common.info=View Details diff --git a/configure/etc/conf/i18n/messages_zh-cn.properties b/configure/etc/conf/i18n/messages_zh-cn.properties index cc18988816..9af6246eda 100644 --- a/configure/etc/conf/i18n/messages_zh-cn.properties +++ b/configure/etc/conf/i18n/messages_zh-cn.properties @@ -13,6 +13,8 @@ state.common.running=\u8FD0\u884C\u4E2D state.common.success=\u8FD0\u884C\u6210\u529F state.common.failure=\u8FD0\u884C\u5931\u8D25 state.common.stop=\u5DF2\u505C\u6B62 +state.common.stopping=\u505C\u6B62\u4E2D +state.common.interrupted=\u5F02\u5E38\u4E2D\u65AD state.common.timeout=\u8FD0\u884C\u8D85\u65F6 state.common.queue=\u6392\u961F\u4E2D ## Query i18n @@ -594,6 +596,17 @@ dataset.common.stateOfStarted=\u5DF2\u542F\u52A8 dataset.common.stateOfMetadata=\u5143\u6570\u636E\u72B6\u6001 dataset.common.stateOfMetadataStarted=\u5143\u6570\u636E\u5DF2\u542F\u52A8 dataset.common.stateOfCreateTable=\u521B\u5EFA\u8868\u72B6\u6001 +dataset.state.metadataStart=\u5143\u6570\u636E\u5F00\u59CB +dataset.state.metadataFailed=\u5143\u6570\u636E\u5931\u8D25 +dataset.state.metadataSuccess=\u5143\u6570\u636E\u6210\u529F +dataset.state.tableStart=\u5EFA\u8868\u5F00\u59CB +dataset.state.tableFailed=\u5EFA\u8868\u5931\u8D25 +dataset.state.tableSuccess=\u5EFA\u8868\u6210\u529F +dataset.state.dataStart=\u6570\u636E\u540C\u6B65\u5F00\u59CB +dataset.state.dataFailed=\u6570\u636E\u540C\u6B65\u5931\u8D25 +dataset.state.dataSuccess=\u6570\u636E\u540C\u6B65\u6210\u529F +dataset.state.completeFailed=\u4EFB\u52A1\u5931\u8D25 +dataset.state.completeSuccess=\u4EFB\u52A1\u6210\u529F dataset.common.syncData=\u540C\u6B65\u6570\u636E dataset.common.visual=\u53EF\u89C6\u5316 dataset.common.visualType=\u53EF\u89C6\u5316\u7C7B\u578B @@ -670,6 +683,24 @@ dataset.common.lifeCycleDay=\u5929 dataset.common.lifeCycleHour=\u5C0F\u65F6 dataset.common.notSpecifiedTitle=\u672A\u6307\u5B9A dataset.common.history=\u540C\u6B65\u5386\u53F2 +dataset.history.totalCount=\u603B\u6570 +dataset.history.processedCount=\u5DF2\u5B8C\u6210 +dataset.history.progress=\u8FDB\u5EA6 +dataset.history.viewLog=\u67E5\u770B\u65E5\u5FD7 +dataset.history.logger=\u540C\u6B65\u65E5\u5FD7 +dataset.history.autoRefresh=\u81EA\u52A8\u5237\u65B0 +dataset.history.refreshInterval=\u6BCF {seconds} \u79D2\u5237\u65B0 +dataset.history.stop=\u505C\u6B62 +dataset.history.stopRequested=\u5DF2\u8BF7\u6C42\u505C\u6B62\uFF0C\u4EFB\u52A1\u5C06\u5728\u4E0B\u4E00\u884C\u68C0\u67E5\u70B9\u9000\u51FA +dataset.history.viewConfigure=\u67E5\u770B\u914D\u7F6E +dataset.history.configureTitle=\u672C\u6B21\u540C\u6B65\u5B9E\u9645\u751F\u6548\u7684\u6267\u884C\u5668\u914D\u7F6E +dataset.sync.overrideTitle=\u672C\u6B21\u8FD0\u884C\u8986\u76D6\uFF08\u4EC5\u53EF\u8C03\u5B57\u6BB5\uFF09 +configure.runtime.title=\u8FD0\u884C\u65F6\u914D\u7F6E +configure.runtime.categoryExecutor=\u6267\u884C\u5668 +configure.runtime.categoryDataset=\u6570\u636E\u96C6\u76EE\u6807\u5B58\u50A8 +configure.runtime.adminOnly=\u4EC5\u7BA1\u7406\u5458 +configure.runtime.empty=\uFF08\u6682\u65E0\u6761\u76EE\uFF0C\u4E0B\u6B21\u542F\u52A8\u4F1A\u81EA\u52A8 seed\uFF09 +configure.runtime.selectHint=\u8BF7\u5728\u5DE6\u4FA7\u9009\u62E9\u4E00\u4E2A\u914D\u7F6E\u9879\u8FDB\u884C\u7F16\u8F91 dataset.common.clearData=\u6E05\u9664\u6570\u636E dataset.common.error=\u67E5\u770B\u9519\u8BEF dataset.common.info=\u67E5\u770B\u8BE6\u60C5 diff --git a/configure/schema/2026.0.0/schema.sql b/configure/schema/2026.0.0/schema.sql new file mode 100644 index 0000000000..9a0a3dbe28 --- /dev/null +++ b/configure/schema/2026.0.0/schema.sql @@ -0,0 +1,79 @@ +USE +`datacap`; + +-- Executor 命名统一:去掉冗余的 "Executor" 后缀,与 SPI ExecutorService.name() / ExecutorPlugin.getName() 对齐。 +-- 例如 'SeatunnelExecutor' -> 'Seatunnel','LocalExecutor' -> 'Local'。 +-- Unify executor identifier with SPI's name(): strip the redundant "Executor" suffix. + +UPDATE `datacap_dataset` +SET `executor` = 'Local' +WHERE `executor` = 'LocalExecutor'; + +UPDATE `datacap_dataset` +SET `executor` = 'Seatunnel' +WHERE `executor` = 'SeatunnelExecutor'; + +ALTER TABLE `datacap_dataset` + ALTER COLUMN `executor` SET DEFAULT 'Local'; + +UPDATE `datacap_workflow` +SET `executor` = 'Local' +WHERE `executor` = 'LocalExecutor'; + +UPDATE `datacap_workflow` +SET `executor` = 'Seatunnel' +WHERE `executor` = 'SeatunnelExecutor'; + +-- 同步历史新增 总数 / 已完成 / 进度 三列 +-- Sync history adds total count / processed count / progress columns +ALTER TABLE `datacap_dataset_history` + ADD COLUMN `total_count` BIGINT DEFAULT NULL COMMENT 'Total rows from source query, NULL if pre-count is disabled', + ADD COLUMN `processed_count` BIGINT DEFAULT NULL COMMENT 'Rows successfully written to target', + ADD COLUMN `progress` DECIMAL(5, 2) DEFAULT NULL COMMENT 'processed_count / total_count * 100, NULL when total unknown'; + +-- 同步历史新增 任务名 / 工作目录,用于定位 executor 写出的独立任务日志文件 +-- Sync history adds task name / work home so the UI can locate the executor's task log file +ALTER TABLE `datacap_dataset_history` + ADD COLUMN `task_name` VARCHAR(64) DEFAULT NULL COMMENT 'Executor task name; also the log file basename', + ADD COLUMN `work_home` VARCHAR(512) DEFAULT NULL COMMENT 'Executor task workHome; logs live at {work_home}/{task_name}.log'; + +-- 同步历史新增 effective executor 配置 JSON:本次同步实际生效的所有配置(DB 默认 + 用户临时覆盖合并后) +-- Sync history adds the effective executor configuration JSON used by this run +ALTER TABLE `datacap_dataset_history` + ADD COLUMN `executor_configure` TEXT DEFAULT NULL COMMENT 'Effective executor configuration JSON used for this sync'; + +-- 通用配置表:承载 EXECUTOR / DATASET 等不同范畴的运行时配置 +-- Generic runtime configuration table backing datacap_configure_service. +-- One row per (category, name); configure column is JSON-serialized Map. +CREATE TABLE IF NOT EXISTS `datacap_configure` +( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `name` VARCHAR(128) NOT NULL COMMENT 'e.g. Local / Seatunnel / Default', + `code` VARCHAR(64) DEFAULT NULL, + `active` TINYINT(1) DEFAULT 1, + `category` VARCHAR(32) NOT NULL COMMENT 'EXECUTOR / DATASET', + `configure` TEXT DEFAULT NULL COMMENT 'JSON-serialized configuration map', + `description` VARCHAR(512) DEFAULT NULL, + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_configure_category_name` (`category`, `name`) +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'Runtime configuration storage (datacap_configure)'; + +-- 新菜单:管理员 → 系统 → 运行时配置(/system/configure) +-- Admin menu entry for the runtime configuration page. Idempotent insert by `code`. +INSERT INTO `datacap_menu` (`name`, `code`, `description`, `url`, `group_name`, `sorted`, `type`, `parent`, `active`, `i18n_key`, `icon`) +SELECT '管理员 - 系统 - 运行时配置', 'RUNTIME_CONFIGURE', + '管理员:管理员权限用户可以访问\n位置:顶部管理一级子菜单', + '/system/configure', NULL, 7, 'VIEW', 8, 1, 'configure.runtime.title', 'Settings2' +WHERE NOT EXISTS (SELECT 1 FROM `datacap_menu` WHERE `code` = 'RUNTIME_CONFIGURE'); + +-- 绑定该菜单到 admin 角色(role_id=1)。menu_id 字段为 mediumtext,CAST 后插入。 +INSERT INTO `datacap_role_menu_relation` (`role_id`, `menu_id`) +SELECT '1', CAST(`id` AS CHAR) +FROM `datacap_menu` +WHERE `code` = 'RUNTIME_CONFIGURE' + AND NOT EXISTS ( + SELECT 1 FROM `datacap_role_menu_relation` r + WHERE r.role_id = '1' AND r.menu_id = CAST(`datacap_menu`.id AS CHAR) +); diff --git a/configure/schema/datacap.sql b/configure/schema/datacap.sql index 9f979e224b..e076248b6b 100644 --- a/configure/schema/datacap.sql +++ b/configure/schema/datacap.sql @@ -164,7 +164,7 @@ CREATE TABLE `datacap_dataset` ( `code` varchar(100) DEFAULT (uuid()), `column_mode` varchar(100) DEFAULT 'DIMENSION', `scheduler` varchar(100) DEFAULT 'LocalScheduler', - `executor` varchar(100) DEFAULT 'LocalExecutor', + `executor` varchar(100) DEFAULT 'Local', `total_rows` bigint DEFAULT '0', `total_size` varchar(100) DEFAULT NULL, `life_cycle` bigint DEFAULT '0', diff --git a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/Plugin.java b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/Plugin.java index 4a2e85c692..8a5893e2e8 100644 --- a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/Plugin.java +++ b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/Plugin.java @@ -219,6 +219,18 @@ public String getName() return StringUtils.remove(this.getClass().getSimpleName(), "Plugin"); } + /** + * 声明该插件可配置的字段(名称 / 类型 / 默认值 / 是否允许普通用户临时覆盖)。 + * 默认空:旧插件零改动。Executor / Scheduler / FS / Notify 等子类按需重写。 + * + * Declare configurable fields exposed by this plugin. Default empty for full + * backward compatibility — subclasses (Executor / Scheduler / FS / Notify / ...) override as needed. + */ + public java.util.List configures() + { + return java.util.Collections.emptyList(); + } + public String getVersion() { if (cachedVersion != null) { diff --git a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginConfigureField.java b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginConfigureField.java new file mode 100644 index 0000000000..47f93d04f1 --- /dev/null +++ b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginConfigureField.java @@ -0,0 +1,34 @@ +package io.edurt.datacap.plugin.configure; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 描述一个插件可配置字段。所有插件类型(Executor / Scheduler / FS / …)共用。 + * - name: 字段 key,序列化到 JSON 配置中的属性名 + * - type: 字段类型,决定 UI 渲染(文本框 / 数字 / 开关 / 密码) + * - defaultValue: 默认值的字符串表示;调用方按 type 解析 + * - description: 字段说明,UI tooltip 用 + * - tunable: true = 普通用户在触发任务时可临时覆盖;false = 仅管理员可在系统配置改 + * + * Generic configurable-field descriptor used by any plugin category. + * - tunable=false fields are administrator-only. + * - tunable=true fields can also be overridden by end users at task invocation time. + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PluginConfigureField +{ + private String name; + private PluginFieldType type; + private String defaultValue; + private String description; + private boolean tunable; + + public PluginConfigureField(String name, PluginFieldType type, String defaultValue, String description) + { + this(name, type, defaultValue, description, false); + } +} diff --git a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginFieldType.java b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginFieldType.java new file mode 100644 index 0000000000..bc097692a3 --- /dev/null +++ b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/configure/PluginFieldType.java @@ -0,0 +1,17 @@ +package io.edurt.datacap.plugin.configure; + +/** + * 插件可配置字段的类型,决定 UI 渲染方式与默认值的解析方式。 + * 故意保持最小集,所有插件类型(Executor / Scheduler / FS / Notify …)通用。 + * + * Lightweight field type shared by all plugin categories. + * Kept minimal on purpose so any plugin layer can declare configurable fields without + * pulling in business-side enums. + */ +public enum PluginFieldType +{ + STRING, + NUMBER, + BOOLEAN, + PASSWORD +} diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java index 42a2a60085..5356eadd79 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java @@ -50,9 +50,16 @@ public CommonResponse> getColumnsByCode(@PathVariable } @PutMapping(value = "syncData/{code}") - public CommonResponse syncData(@PathVariable String code) + public CommonResponse syncData(@PathVariable String code, + @RequestBody(required = false) java.util.Map overrides) { - return service.syncData(code); + return service.syncData(code, overrides); + } + + @GetMapping(value = "syncFields/{code}") + public CommonResponse> syncFields(@PathVariable String code) + { + return service.getSyncFields(code); } @PutMapping(value = "clearData/{code}") @@ -75,6 +82,18 @@ public CommonResponse> history(@PathVariable St return this.service.getHistory(code, filter); } + @GetMapping(value = "history/log/{id}") + public CommonResponse> historyLog(@PathVariable Long id) + { + return this.service.getHistoryLog(id); + } + + @PutMapping(value = "history/stop/{id}") + public CommonResponse historyStop(@PathVariable Long id) + { + return this.service.stopHistory(id); + } + @GetMapping(value = "getActuators") public CommonResponse> getActuators() { diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/RuntimeConfigureController.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/RuntimeConfigureController.java new file mode 100644 index 0000000000..2a616ed3da --- /dev/null +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/RuntimeConfigureController.java @@ -0,0 +1,130 @@ +package io.edurt.datacap.server.controller; + +import io.edurt.datacap.common.response.CommonResponse; +import io.edurt.datacap.plugin.Plugin; +import io.edurt.datacap.plugin.PluginManager; +import io.edurt.datacap.plugin.PluginType; +import io.edurt.datacap.plugin.configure.PluginConfigureField; +import io.edurt.datacap.service.configure.DatasetTargetSchema; +import io.edurt.datacap.service.entity.ConfigureEntity; +import io.edurt.datacap.service.initializer.DataSetConfigure; +import io.edurt.datacap.service.service.RuntimeConfigureService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Admin-only CRUD for runtime configuration rows (datacap_configure). + * - GET /list/{category} list all rows of a category + * - GET /detail/{category}/{name} fetch effective values + schema for one row + * - PUT /save/{category}/{name} save the row (admin only by route guard / role) + */ +@Slf4j +@RestController +@RequestMapping(value = "/api/v1/configure/runtime") +public class RuntimeConfigureController +{ + private final PluginManager pluginManager; + private final RuntimeConfigureService service; + private final DataSetConfigure dataSetConfigure; + + public RuntimeConfigureController(PluginManager pluginManager, RuntimeConfigureService service, DataSetConfigure dataSetConfigure) + { + this.pluginManager = pluginManager; + this.service = service; + this.dataSetConfigure = dataSetConfigure; + } + + @GetMapping(value = "list/{category}") + public CommonResponse> list(@PathVariable String category) + { + return CommonResponse.success(service.list(category)); + } + + @GetMapping(value = "detail/{category}/{name}") + public CommonResponse detail(@PathVariable String category, @PathVariable String name) + { + List schema = resolveSchema(category, name); + if (schema == null) { + return CommonResponse.failure("No schema for " + category + "/" + name); + } + Map effective = service.getEffective(category, name, schema); + return CommonResponse.success(new DetailView(category, name, schema, effective)); + } + + @PutMapping(value = "save/{category}/{name}") + public CommonResponse save( + @PathVariable String category, + @PathVariable String name, + @RequestBody Map configMap) + { + // 校验:只保留 schema 中存在的 key(防止脏字段写入) + List schema = resolveSchema(category, name); + Map sanitized = new LinkedHashMap<>(); + if (schema != null) { + for (PluginConfigureField field : schema) { + if (configMap != null && configMap.containsKey(field.getName())) { + sanitized.put(field.getName(), configMap.get(field.getName())); + } + } + } + CommonResponse response = service.save(category, name, sanitized, null); + // 写完热刷新 DataSetConfigure(dataset target 类配置) + if (Boolean.TRUE.equals(response.getStatus()) + && RuntimeConfigureService.CATEGORY_DATASET.equals(category) + && DatasetTargetSchema.NAME.equals(name)) { + try { + dataSetConfigure.reload(); + } + catch (Exception ex) { + log.warn("Reload dataset target configure failed: {}", ex.getMessage()); + } + } + return response; + } + + private List resolveSchema(String category, String name) + { + if (RuntimeConfigureService.CATEGORY_EXECUTOR.equals(category)) { + return pluginManager.getPluginInfos().stream() + .filter(m -> m.getType() == PluginType.EXECUTOR) + .map(m -> m.getInstance()) + .filter(i -> i instanceof Plugin) + .map(i -> (Plugin) i) + .filter(p -> name.equals(p.getName())) + .findFirst() + .map(Plugin::configures) + .orElse(null); + } + if (RuntimeConfigureService.CATEGORY_DATASET.equals(category) && DatasetTargetSchema.NAME.equals(name)) { + return DatasetTargetSchema.fields(); + } + return Collections.emptyList(); + } + + public static class DetailView + { + public String category; + public String name; + public List schema; + public Map values; + + public DetailView(String category, String name, List schema, Map values) + { + this.category = category; + this.name = name; + this.schema = schema == null ? new ArrayList<>() : schema; + this.values = values; + } + } +} diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/DatasetHistoryRecoveryRunner.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/DatasetHistoryRecoveryRunner.java new file mode 100644 index 0000000000..51e29a0195 --- /dev/null +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/DatasetHistoryRecoveryRunner.java @@ -0,0 +1,70 @@ +package io.edurt.datacap.server.runner; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.executor.common.RunState; +import io.edurt.datacap.service.entity.DatasetHistoryEntity; +import io.edurt.datacap.service.repository.DatasetHistoryRepository; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * 启动时把上次服务异常退出留下的"未完结"同步历史标记为 INTERRUPTED。 + * 仅处理 RUNNING / CREATED / STOPPING 这些非终态。 + * 在 SchedulerRunner 之前执行(@Order 较小),避免被即将启动的定时任务"运行中"覆盖。 + * + * On startup, mark sync-history rows left in transient states by a prior crash as INTERRUPTED. + */ +@Slf4j +@Service +@Order(10) +@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) +public class DatasetHistoryRecoveryRunner + implements CommandLineRunner +{ + private static final List TRANSIENT_STATES = Arrays.asList( + RunState.RUNNING, + RunState.CREATED, + RunState.STOPPING + ); + + private final DatasetHistoryRepository historyRepository; + + public DatasetHistoryRecoveryRunner(DatasetHistoryRepository historyRepository) + { + this.historyRepository = historyRepository; + } + + @Override + public void run(String... args) + { + try { + List stale = historyRepository.findAllByStateIn(TRANSIENT_STATES); + if (stale.isEmpty()) { + log.info("Dataset history recovery: no stale records."); + return; + } + String now = DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); + String message = String.format("Marked as INTERRUPTED by server restart at %s (previous state was transient).", now); + Date updateTime = new Date(); + for (DatasetHistoryEntity history : stale) { + RunState previous = history.getState(); + history.setState(RunState.INTERRUPTED); + history.setMessage(message); + history.setUpdateTime(updateTime); + log.info("Recovering sync history [ {} ] previousState={} -> INTERRUPTED", history.getId(), previous); + } + historyRepository.saveAll(stale); + log.info("Dataset history recovery: {} stale record(s) marked INTERRUPTED.", stale.size()); + } + catch (Exception ex) { + log.error("Dataset history recovery failed", ex); + } + } +} diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/RuntimeConfigureSeedRunner.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/RuntimeConfigureSeedRunner.java new file mode 100644 index 0000000000..71e9cd0c01 --- /dev/null +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/RuntimeConfigureSeedRunner.java @@ -0,0 +1,71 @@ +package io.edurt.datacap.server.runner; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.plugin.Plugin; +import io.edurt.datacap.plugin.PluginManager; +import io.edurt.datacap.plugin.PluginType; +import io.edurt.datacap.plugin.configure.PluginConfigureField; +import io.edurt.datacap.service.service.RuntimeConfigureService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * 启动时把每个 ExecutorPlugin 声明的可配置字段(Plugin.configures())按默认值落到 datacap_configure。 + * 已存在的行不动;只为缺失的 (EXECUTOR, name) 做 seed。 + * + * On startup, iterate ExecutorPlugin instances and seed default configuration rows into + * datacap_configure when missing. Existing rows are left untouched. + * + * 在 SchedulerRunner / DatasetHistoryRecoveryRunner 之前执行(@Order=5), + * 保证后续业务逻辑读到的是已 seed 过的配置。 + */ +@Slf4j +@Service +@Order(5) +@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) +public class RuntimeConfigureSeedRunner + implements CommandLineRunner +{ + private final PluginManager pluginManager; + private final RuntimeConfigureService runtimeConfigureService; + + public RuntimeConfigureSeedRunner(PluginManager pluginManager, RuntimeConfigureService runtimeConfigureService) + { + this.pluginManager = pluginManager; + this.runtimeConfigureService = runtimeConfigureService; + } + + @Override + public void run(String... args) + { + try { + pluginManager.getPluginInfos().stream() + .filter(meta -> meta.getType() == PluginType.EXECUTOR) + .forEach(meta -> { + Object instance = meta.getInstance(); + if (!(instance instanceof Plugin)) { + return; + } + Plugin plugin = (Plugin) instance; + List fields = plugin.configures(); + if (fields == null || fields.isEmpty()) { + return; + } + String name = plugin.getName(); + runtimeConfigureService.seedIfAbsent( + RuntimeConfigureService.CATEGORY_EXECUTOR, + name, + fields, + "Configuration for executor " + name + ); + }); + } + catch (Exception ex) { + log.error("Seed executor runtime configure failed", ex); + } + } +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/configure/DatasetTargetSchema.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/configure/DatasetTargetSchema.java new file mode 100644 index 0000000000..9777d0f2bd --- /dev/null +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/configure/DatasetTargetSchema.java @@ -0,0 +1,43 @@ +package io.edurt.datacap.service.configure; + +import io.edurt.datacap.plugin.configure.PluginConfigureField; +import io.edurt.datacap.plugin.configure.PluginFieldType; + +import java.util.Arrays; +import java.util.List; + +/** + * 数据集目标存储(落库 ClickHouse 等)的可配置字段 schema。 + * 字段与原 application.properties 中 datacap.dataset.* 一一对应。 + * + * Field schema for the dataset target storage (formerly bound via {@code @ConfigurationProperties(prefix = "datacap.dataset")}). + * Used by RuntimeConfigureService to seed / merge / serve effective values. + */ +public final class DatasetTargetSchema +{ + private DatasetTargetSchema() {} + + public static final String NAME = "Default"; + + public static List fields() + { + return Arrays.asList( + new PluginConfigureField("type", PluginFieldType.STRING, "ClickHouse", + "Target storage plugin type", false), + new PluginConfigureField("host", PluginFieldType.STRING, "app-clickhouse", + "Target host", false), + new PluginConfigureField("port", PluginFieldType.STRING, "8123", + "Target port", false), + new PluginConfigureField("username", PluginFieldType.STRING, "default", + "Target username", false), + new PluginConfigureField("password", PluginFieldType.PASSWORD, "", + "Target password", false), + new PluginConfigureField("database", PluginFieldType.STRING, "datacap", + "Target database", false), + new PluginConfigureField("tableDefaultEngine", PluginFieldType.STRING, "MergeTree", + "Default table engine when materializing", false), + new PluginConfigureField("tablePrefix", PluginFieldType.STRING, "datacap_", + "Table name prefix", false) + ); + } +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/ConfigureEntity.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/ConfigureEntity.java new file mode 100644 index 0000000000..d2a7feda5a --- /dev/null +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/ConfigureEntity.java @@ -0,0 +1,53 @@ +package io.edurt.datacap.service.entity; + +import com.fasterxml.jackson.annotation.JsonView; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.common.view.EntityView; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.springframework.data.jpa.domain.support.AuditingEntityListener; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EntityListeners; +import javax.persistence.Table; +import javax.persistence.UniqueConstraint; + +/** + * 通用配置表,承载 EXECUTOR / DATASET 等不同范畴的运行时配置。 + * 字段 schema 由各插件通过 Plugin.configures() 声明;value 为序列化后的 JSON。 + * + * Generic configuration row keyed by (category, name). + * - category: e.g. "EXECUTOR", "DATASET" + * - name: e.g. "Local", "Seatunnel", or a dataset target identifier + * - configure: JSON-serialized map of effective field values + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +@Entity +@Table(name = "datacap_configure", uniqueConstraints = { + @UniqueConstraint(name = "uk_configure_category_name", columnNames = {"category", "name"}) +}) +@EntityListeners(AuditingEntityListener.class) +@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"}) +public class ConfigureEntity + extends BaseEntity +{ + @Column(name = "category", length = 32, nullable = false) + @JsonView(value = {EntityView.UserView.class, EntityView.AdminView.class}) + private String category; + + @Column(name = "configure", columnDefinition = "TEXT") + @JsonView(value = {EntityView.UserView.class, EntityView.AdminView.class}) + private String configure; + + @Column(name = "description", length = 512) + @JsonView(value = {EntityView.UserView.class, EntityView.AdminView.class}) + private String description; +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/DatasetHistoryEntity.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/DatasetHistoryEntity.java index 9c28afb249..51c9382315 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/DatasetHistoryEntity.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/DatasetHistoryEntity.java @@ -20,6 +20,8 @@ import javax.persistence.ManyToOne; import javax.persistence.Table; +import java.math.BigDecimal; + @Data @SuperBuilder @NoArgsConstructor @@ -44,6 +46,25 @@ public class DatasetHistoryEntity @Column(name = "count") private int count; + @Column(name = "total_count") + private Long totalCount; + + @Column(name = "processed_count") + private Long processedCount; + + @Column(name = "progress") + private BigDecimal progress; + + @Column(name = "task_name") + private String taskName; + + @Column(name = "work_home") + private String workHome; + + /** 本次同步实际生效的 executor 配置 JSON(DB 默认 + 本次临时覆盖后的合并结果) */ + @Column(name = "executor_configure", columnDefinition = "TEXT") + private String executorConfigure; + @Column(name = "query_mode") @Enumerated(EnumType.STRING) private QueryMode mode; diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DataSetConfigure.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DataSetConfigure.java index 67f04e1c2d..b9868f89bf 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DataSetConfigure.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DataSetConfigure.java @@ -1,12 +1,26 @@ package io.edurt.datacap.service.initializer; +import io.edurt.datacap.service.configure.DatasetTargetSchema; +import io.edurt.datacap.service.service.RuntimeConfigureService; import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; + +import java.util.Map; + +/** + * 数据集目标存储的运行时配置。 + * 数据源已从 application.properties (datacap.dataset.*) 迁移到 datacap_configure 表, + * 由 RuntimeConfigureService 提供 effective 值;首次启动若 DB 行缺失会自动 seed schema 默认值。 + * + * Backed by datacap_configure (category=DATASET, name=Default) via RuntimeConfigureService. + * The previous {@code @ConfigurationProperties(prefix = "datacap.dataset")} binding has been removed. + */ +@Slf4j @Data @Component -@ConfigurationProperties(prefix = "datacap.dataset") public class DataSetConfigure { private String type; @@ -17,4 +31,43 @@ public class DataSetConfigure private String database; private String tableDefaultEngine; private String tablePrefix; + + private final RuntimeConfigureService runtimeConfigureService; + + public DataSetConfigure(RuntimeConfigureService runtimeConfigureService) + { + this.runtimeConfigureService = runtimeConfigureService; + } + + @PostConstruct + public void load() + { + // 首次启动 seed:DB 行不存在则按 schema 默认值落一条,方便管理员后续从 UI 改 + runtimeConfigureService.seedIfAbsent( + RuntimeConfigureService.CATEGORY_DATASET, + DatasetTargetSchema.NAME, + DatasetTargetSchema.fields(), + "Dataset target storage configuration" + ); + Map cfg = runtimeConfigureService.getEffective( + RuntimeConfigureService.CATEGORY_DATASET, + DatasetTargetSchema.NAME, + DatasetTargetSchema.fields() + ); + this.type = cfg.get("type"); + this.host = cfg.get("host"); + this.port = cfg.get("port"); + this.username = cfg.get("username"); + this.password = cfg.get("password"); + this.database = cfg.get("database"); + this.tableDefaultEngine = cfg.get("tableDefaultEngine"); + this.tablePrefix = cfg.get("tablePrefix"); + log.info("DataSetConfigure loaded from runtime configure: type={} host={} database={}", type, host, database); + } + + /** 管理员在系统配置页改完后由 controller 主动调一次,避免重启 */ + public void reload() + { + load(); + } } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/job/DatasetJob.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/job/DatasetJob.java index c63e2e01ff..883f83aaba 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/job/DatasetJob.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/job/DatasetJob.java @@ -25,7 +25,12 @@ public void setService(DataSetService service) @Override protected void executeInternal(JobExecutionContext context) { - String code = context.getJobDetail().getJobDataMap().get("code").toString(); + Object idValue = context.getJobDetail().getJobDataMap().get("id"); + if (idValue == null) { + log.warn("Job [ {} ] skipped: missing 'id' in JobDataMap", context.getJobDetail().getKey()); + return; + } + String code = idValue.toString(); log.info("Job [ {} ] run time [ {} ]", code, context.getFireTime().getTime()); this.service.syncData(code); } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/ConfigureRepository.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/ConfigureRepository.java new file mode 100644 index 0000000000..228f712323 --- /dev/null +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/ConfigureRepository.java @@ -0,0 +1,19 @@ +package io.edurt.datacap.service.repository; + +import io.edurt.datacap.service.entity.ConfigureEntity; +import org.springframework.data.repository.PagingAndSortingRepository; + +import java.util.List; +import java.util.Optional; + +public interface ConfigureRepository + extends PagingAndSortingRepository +{ + /** 按范畴 + 名称定位唯一一条配置(如 EXECUTOR / Local) */ + Optional findByCategoryAndName(String category, String name); + + /** 列出某范畴下的全部配置(系统配置页面用) */ + List findAllByCategoryOrderByName(String category); + + boolean existsByCategoryAndName(String category, String name); +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/DatasetHistoryRepository.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/DatasetHistoryRepository.java index 4e289ea672..08dde9aeaa 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/DatasetHistoryRepository.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/DatasetHistoryRepository.java @@ -1,13 +1,23 @@ package io.edurt.datacap.service.repository; +import io.edurt.datacap.executor.common.RunState; import io.edurt.datacap.service.entity.DataSetEntity; import io.edurt.datacap.service.entity.DatasetHistoryEntity; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.repository.PagingAndSortingRepository; +import java.util.Collection; +import java.util.List; + public interface DatasetHistoryRepository extends PagingAndSortingRepository { Page findAllByDatasetOrderByCreateTimeDesc(DataSetEntity dataSet, Pageable pageable); + + /** + * 启动恢复用:查出"未完结"的同步历史 + * Used by startup recovery to find sync history rows that did not reach a terminal state. + */ + List findAllByStateIn(Collection states); } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java index dd1917f880..c144da47ac 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java @@ -22,6 +22,19 @@ public interface DataSetService CommonResponse syncData(String code); + /** + * 同步并允许调用方临时覆盖一组 tunable 字段;非 tunable 的字段会被忽略。 + * Trigger sync with optional per-invocation overrides; only tunable fields are honored. + */ + CommonResponse syncData(String code, java.util.Map overrides); + + /** + * 返回当前 executor 的可调字段(tunable=true),defaultValue 已替换为该数据集对应 executor 的 effective 值, + * 供同步对话框预填表单使用。 + * Return the executor's tunable fields with defaultValue pre-populated from current effective config. + */ + CommonResponse> getSyncFields(String code); + CommonResponse clearData(String code); CommonResponse adhoc(String code, Adhoc configure); @@ -31,4 +44,16 @@ public interface DataSetService CommonResponse getInfo(String code); CommonResponse> getHistory(String code, FilterBody filter); + + /** + * 读取指定同步历史的运行日志(独立任务日志文件,由 executor 写入 workHome) + * Read the executor's task log for the given sync history record + */ + CommonResponse> getHistoryLog(Long id); + + /** + * 停止正在运行的同步任务。仅对 state=RUNNING 且当前进程持有该 taskName 的任务生效 + * Stop a running sync task. Effective only when state=RUNNING and the task is held by this process + */ + CommonResponse stopHistory(Long id); } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/RuntimeConfigureService.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/RuntimeConfigureService.java new file mode 100644 index 0000000000..2becf89824 --- /dev/null +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/RuntimeConfigureService.java @@ -0,0 +1,45 @@ +package io.edurt.datacap.service.service; + +import io.edurt.datacap.common.response.CommonResponse; +import io.edurt.datacap.plugin.configure.PluginConfigureField; +import io.edurt.datacap.service.entity.ConfigureEntity; + +import java.util.List; +import java.util.Map; + +/** + * 运行时可写配置(datacap_configure 表)读写服务。 + * 与 ConfigureService(读 executor YAML 元数据,UI 分类用)语义不同: + * 这里管的是 Executor / Dataset Target 真正生效的运行参数。 + * + * Read / write runtime configuration backed by datacap_configure. + * Field schema comes from Plugin.configures(); the service merges DB row over schema defaults + * so callers always get a complete value map. + */ +public interface RuntimeConfigureService +{ + String CATEGORY_EXECUTOR = "EXECUTOR"; + String CATEGORY_DATASET = "DATASET"; + + /** + * 合并 DB 行与 schema 默认值,返回该 (category, name) 的完整 effective 配置。 + * - DB 行存在的字段使用 DB 值;缺失字段回落 schema.defaultValue + * - 字符串形式返回;调用方根据 schema 类型按需 parse + */ + Map getEffective(String category, String name, List fields); + + /** 不参考 schema 默认值,仅返回 DB 行(可能空 map),用于 UI 展示"用户改过哪些项" */ + Map getRaw(String category, String name); + + /** 列出某范畴下的全部配置项,系统配置页用 */ + List list(String category); + + /** 管理员写:不存在则新建,存在则更新 configure JSON */ + CommonResponse save(String category, String name, Map configMap, String description); + + /** + * 首次启动 seed:当 (category, name) 不存在时,按 schema 默认值生成一条;存在则不动。 + * 用于把 plugin 默认值"落地"到 DB,让管理员之后可以从 UI 改。 + */ + void seedIfAbsent(String category, String name, List fields, String description); +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java index dd5be2df34..7f624167c2 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java @@ -60,6 +60,7 @@ import io.edurt.datacap.service.repository.SourceRepository; import io.edurt.datacap.service.security.UserDetailsService; import io.edurt.datacap.service.service.DataSetService; +import io.edurt.datacap.service.service.RuntimeConfigureService; import io.edurt.datacap.spi.PluginService; import io.edurt.datacap.spi.PluginType; import io.edurt.datacap.spi.generator.definition.TableDefinition; @@ -102,6 +103,17 @@ public class DataSetServiceImpl implements DataSetService { + // 同步期间每 SIZE_REFRESH_EVERY 次进度回调才刷一次 totalSize(查 ClickHouse system.parts 较重) + private static final int SIZE_REFRESH_EVERY = 10; + + // 已请求停止但还没真正停下来的 history id 集合:进度回调据此把 state 维持为 STOPPING 而不是 RUNNING + // Set of history ids whose sync has been asked to stop but hasn't reached its cancellation checkpoint yet. + private static final java.util.Set stoppingHistoryIds = java.util.concurrent.ConcurrentHashMap.newKeySet(); + + // 本次同步的 tunable 字段临时覆盖。由 syncData(code, overrides) 在异步线程入口写入,由内部 sync 体读取后清理。 + // Per-invocation tunable overrides; set by the public syncData entry, read inside the async body. + private static final ThreadLocal> currentSyncOverrides = new ThreadLocal<>(); + public final DataSetColumnRepository columnRepository; private final DataSetRepository repository; private final DatasetHistoryRepository historyRepository; @@ -112,8 +124,9 @@ public class DataSetServiceImpl private final SourceRepository sourceRepository; private final NotificationRepository notificationRepository; private final NotificationEventPublisher eventPublisher; + private final RuntimeConfigureService runtimeConfigureService; - public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, PluginManager pluginManager, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment, SourceRepository sourceRepository, NotificationRepository notificationRepository, NotificationEventPublisher eventPublisher) + public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, PluginManager pluginManager, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment, SourceRepository sourceRepository, NotificationRepository notificationRepository, NotificationEventPublisher eventPublisher, RuntimeConfigureService runtimeConfigureService) { this.repository = repository; this.columnRepository = columnRepository; @@ -125,6 +138,7 @@ public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository this.sourceRepository = sourceRepository; this.notificationRepository = notificationRepository; this.eventPublisher = eventPublisher; + this.runtimeConfigureService = runtimeConfigureService; } @Transactional @@ -165,6 +179,19 @@ public CommonResponse> getColumnsByCode(String code) .orElseGet(() -> CommonResponse.failure(String.format("DataSet [ %s ] not found", code))); } + private static int parseIntOrDefault(String raw, int fallback) + { + if (raw == null || raw.isEmpty()) { + return fallback; + } + try { + return Integer.parseInt(raw.trim()); + } + catch (NumberFormatException ex) { + return fallback; + } + } + private static void setAuthenticationContext(UserEntity user) { List authorities = user.getRoles().stream() @@ -321,6 +348,86 @@ public CommonResponse> getHistory(String code, .orElse(CommonResponse.failure(String.format("DataSet [ %s ] not found", code))); } + @Override + public CommonResponse> getHistoryLog(Long id) + { + return historyRepository.findById(id) + .map(history -> { + String workHome = history.getWorkHome(); + String taskName = history.getTaskName(); + if (StringUtils.isBlank(workHome) || StringUtils.isBlank(taskName)) { + return CommonResponse.>failure("Sync history has no log location (older record or non-local executor)"); + } + // 防穿越:拼好后规整为绝对路径,并校验仍在 dataHome 下 + java.io.File logFile = new java.io.File(workHome, taskName + ".log"); + String dataHome = initializerConfigure.getDataHome(); + try { + String canonicalLog = logFile.getCanonicalPath(); + String canonicalHome = new java.io.File(dataHome).getCanonicalPath(); + if (!canonicalLog.startsWith(canonicalHome)) { + return CommonResponse.>failure("Illegal log path"); + } + } + catch (java.io.IOException ex) { + return CommonResponse.>failure("Resolve log path failed: " + ex.getMessage()); + } + if (!logFile.exists()) { + return CommonResponse.>success(Lists.newArrayList()); + } + try (java.io.FileInputStream stream = new java.io.FileInputStream(logFile)) { + List lines = org.apache.commons.io.IOUtils.readLines(stream, java.nio.charset.StandardCharsets.UTF_8); + return CommonResponse.>success(lines); + } + catch (java.io.IOException ex) { + log.error("Read sync log [ {} ] failed", logFile.getAbsolutePath(), ex); + return CommonResponse.>failure("Read log file failed: " + ex.getMessage()); + } + }) + .orElseGet(() -> CommonResponse.>failure(String.format("Sync history [ %d ] not found", id))); + } + + @Override + public CommonResponse stopHistory(Long id) + { + log.info("Stop history [ {} ] requested", id); + return historyRepository.findById(id) + .map(history -> { + log.info("Stop history [ {} ] state={} taskName={}", id, history.getState(), history.getTaskName()); + if (history.getState() != RunState.RUNNING && history.getState() != RunState.CREATED) { + return CommonResponse.failure(String.format("Sync history [ %d ] is not running (current state: %s)", id, history.getState())); + } + String taskName = history.getTaskName(); + if (StringUtils.isBlank(taskName)) { + return CommonResponse.failure("Sync history has no taskName (older record), cannot stop"); + } + DataSetEntity dataset = history.getDataset(); + if (dataset == null || StringUtils.isBlank(dataset.getExecutor())) { + return CommonResponse.failure("Sync history has no associated dataset executor"); + } + log.info("Stop history [ {} ] resolving executor plugin [ {} ]", id, dataset.getExecutor()); + Optional executorPlugin = pluginManager.getPlugin(dataset.getExecutor()); + if (executorPlugin.isEmpty()) { + return CommonResponse.failure(String.format("Executor [ %s ] not found", dataset.getExecutor())); + } + ExecutorService executorService = executorPlugin.get().getService(ExecutorService.class); + // stop 只需要 taskName 定位活跃任务,input/output 用空 configure 占位 + ExecutorConfigure stopPlaceholder = new ExecutorConfigure(null); + ExecutorRequest stopRequest = new ExecutorRequest(taskName, "", stopPlaceholder, stopPlaceholder); + log.info("Stop history [ {} ] calling executor.stop(taskName={})", id, taskName); + ExecutorResponse stopResponse = executorService.stop(stopRequest); + log.info("Stop history [ {} ] executor returned successful={} message={}", id, stopResponse.getSuccessful(), stopResponse.getMessage()); + if (Boolean.TRUE.equals(stopResponse.getSuccessful())) { + // 让 UI 立刻看到 STOPPING 中间态;进度回调会继续把它保持住直到真正停下 + stoppingHistoryIds.add(id); + history.setState(RunState.STOPPING); + historyRepository.save(history); + return CommonResponse.success(Boolean.TRUE); + } + return CommonResponse.failure(stopResponse.getMessage() == null ? "Stop failed" : stopResponse.getMessage()); + }) + .orElseGet(() -> CommonResponse.failure(String.format("Sync history [ %d ] not found", id))); + } + @Override @Transactional @SendNotification(type = NotificationType.DELETED) @@ -631,30 +738,77 @@ private PluginService getOutputPlugin() @Override public CommonResponse syncData(String code) + { + return syncData(code, null); + } + + @Override + public CommonResponse syncData(String code, java.util.Map overrides) { return repository.findByCode(code) .map(entity -> { UserEntity currentUser = UserDetailsService.getUser(); java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor(); + java.util.Map safeOverrides = overrides == null + ? java.util.Collections.emptyMap() + : new java.util.HashMap<>(overrides); service.submit(() -> { // 在异步线程中设置用户上下文 setAuthenticationContext(currentUser); - DataSetEntity result = syncData(entity, service); - eventPublisher.publishNotificationEvent( - result, - null, - "", - EntityType.DATASET, - NotificationType.SYNCDATA, - new String[] {} - ); + currentSyncOverrides.set(safeOverrides); + try { + DataSetEntity result = syncData(entity, service); + eventPublisher.publishNotificationEvent( + result, + null, + "", + EntityType.DATASET, + NotificationType.SYNCDATA, + new String[] {} + ); + } + finally { + currentSyncOverrides.remove(); + } }); return CommonResponse.success(entity); }) .orElseGet(() -> CommonResponse.failure(String.format("DataSet [ %s ] not found", code))); } + @Override + public CommonResponse> getSyncFields(String code) + { + return repository.findByCode(code) + .>>map(entity -> { + return pluginManager.getPlugin(entity.getExecutor()) + .map(plugin -> { + java.util.List schema = plugin.configures(); + java.util.Map effective = runtimeConfigureService.getEffective( + RuntimeConfigureService.CATEGORY_EXECUTOR, + plugin.getName(), + schema + ); + java.util.List tunable = schema.stream() + .filter(io.edurt.datacap.plugin.configure.PluginConfigureField::isTunable) + .map(f -> new io.edurt.datacap.plugin.configure.PluginConfigureField( + f.getName(), + f.getType(), + effective.getOrDefault(f.getName(), f.getDefaultValue()), + f.getDescription(), + true + )) + .collect(java.util.stream.Collectors.toList()); + return CommonResponse.success(tunable); + }) + .orElseGet(() -> CommonResponse.>failure( + String.format("Executor [ %s ] not found", entity.getExecutor()))); + }) + .orElseGet(() -> CommonResponse.>failure( + String.format("DataSet [ %s ] not found", code))); + } + /** * Flushes the synchronous data for the given DataSetEntity. * @@ -1038,37 +1192,173 @@ private DataSetEntity syncData(DataSetEntity entity, java.util.concurrent.Execut ); String taskName = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS"); + String executorKey = executor.getName().toLowerCase(); String workHome = FolderUtils.getWorkHome( initializerConfigure.getDataHome(), entity.getUser().getUsername(), - String.join(File.separator, "dataset", entity.getExecutor().toLowerCase(), taskName) + String.join(File.separator, "dataset", "executor", executorKey, taskName) ); - + // 把 taskName / workHome 落到 history,供 UI 定位日志文件 + history.setTaskName(taskName); + history.setWorkHome(workHome); + + // 从 datacap_configure 取该 executor 的 effective 配置(DB 行覆盖 SPI schema 默认值) + java.util.Map executorCfg = runtimeConfigureService.getEffective( + RuntimeConfigureService.CATEGORY_EXECUTOR, + executor.getName(), + executor.configures() + ); + // 本次同步如有用户临时覆盖:只接受 tunable=true 的字段,其余忽略 + java.util.Map overrides = currentSyncOverrides.get(); + if (overrides != null && !overrides.isEmpty()) { + java.util.Set tunableKeys = executor.configures().stream() + .filter(io.edurt.datacap.plugin.configure.PluginConfigureField::isTunable) + .map(io.edurt.datacap.plugin.configure.PluginConfigureField::getName) + .collect(java.util.stream.Collectors.toSet()); + overrides.forEach((k, v) -> { + if (tunableKeys.contains(k) && v != null) { + executorCfg.put(k, v); + } + }); + } + // 把本次同步真正生效的配置 JSON 落到 history,便于事后审计 / UI 展示 + try { + history.setExecutorConfigure(new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(executorCfg)); + } + catch (Exception ex) { + log.warn("Serialize effective executor configure failed: {}", ex.getMessage()); + } + historyRepository.save(history); + int fetchSize = parseIntOrDefault(executorCfg.get("fetchSize"), 1000); + int batchSize = parseIntOrDefault(executorCfg.get("batchSize"), 1000); + boolean preCount = Boolean.parseBoolean(executorCfg.getOrDefault("preCount", "false")); ExecutorRequest request = new ExecutorRequest( taskName, entity.getUser().getUsername(), input, output, - environment.getProperty(String.format("datacap.executor.%s.home", entity.getExecutor().toLowerCase())), + executorCfg.get("home"), workHome, this.pluginManager, 600, - RunWay.valueOf(requireNonNull(environment.getProperty("datacap.executor.way"))), - RunMode.valueOf(requireNonNull(environment.getProperty("datacap.executor.mode"))), - environment.getProperty("datacap.executor.startScript"), - RunEngine.valueOf(requireNonNull(environment.getProperty("datacap.executor.engine"))), + RunWay.valueOf(executorCfg.getOrDefault("way", "LOCAL")), + RunMode.valueOf(executorCfg.getOrDefault("mode", "CLIENT")), + executorCfg.get("startScript"), + RunEngine.valueOf(executorCfg.getOrDefault("engine", "SPARK")), + null, + fetchSize, + batchSize, + preCount, null ); + // 进度回调:每个 batch 写完后更新 datacap_dataset_history 以及数据集自身的 totalRows / totalSize + // totalRows 直接用已写入计数;totalSize 必须查 ClickHouse system.parts,开销大且不可控, + // 必须放到独立线程异步执行,否则会阻塞 MySQL 流式 cursor 触发 net_write_timeout + final DatasetHistoryEntity progressHistory = history; + final DataSetEntity progressEntity = entity; + final java.util.concurrent.atomic.AtomicInteger progressTick = new java.util.concurrent.atomic.AtomicInteger(); + final java.util.concurrent.atomic.AtomicBoolean sizeRefreshInFlight = new java.util.concurrent.atomic.AtomicBoolean(false); + final java.util.concurrent.ExecutorService sizeRefreshExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "dataset-size-refresh-" + taskName); + t.setDaemon(true); + return t; + }); + request.setProgressListener((processed, total) -> { + progressHistory.setProcessedCount(processed); + if (total >= 0) { + progressHistory.setTotalCount(total); + if (total > 0) { + java.math.BigDecimal percent = java.math.BigDecimal + .valueOf(processed * 100.0 / total) + .setScale(2, java.math.RoundingMode.HALF_UP); + progressHistory.setProgress(percent); + } + } + // 收到停止请求后,进度回调不能把 state 写回 RUNNING;保持 STOPPING 直到真正退出 + if (stoppingHistoryIds.contains(progressHistory.getId())) { + progressHistory.setState(RunState.STOPPING); + } + try { + historyRepository.save(progressHistory); + } + catch (Exception ex) { + log.warn("Update sync history progress failed: {}", ex.getMessage()); + } + // 同步期间用已写入行数滚动刷新 dataset.totalRows,方便列表实时看到进度 + try { + progressEntity.setTotalRows(String.valueOf(processed)); + repository.save(progressEntity); + } + catch (Exception ex) { + log.warn("Update dataset totalRows failed: {}", ex.getMessage()); + } + // 节流 + 异步刷新 totalSize:每 SIZE_REFRESH_EVERY 次 progress 提交一次到独立线程; + // 上一次还没回来就跳过,绝不阻塞主流(MySQL 真流式 cursor 对客户端读速度敏感) + if (progressTick.incrementAndGet() % SIZE_REFRESH_EVERY == 0 + && sizeRefreshInFlight.compareAndSet(false, true)) { + sizeRefreshExecutor.submit(() -> { + try { + this.flushTableMetadata( + progressEntity, + outputPlugin, + database, + requireNonNull(output.getOriginConfigure()), + outPlugin + ); + } + catch (Exception ex) { + log.warn("Refresh dataset totalSize failed: {}", ex.getMessage()); + } + finally { + sizeRefreshInFlight.set(false); + } + }); + } + }); + history.setState(RunState.RUNNING); historyRepository.save(history); - ExecutorResponse response = executorService.start(request); + ExecutorResponse response; + try { + response = executorService.start(request); + } + finally { + // 任务一旦真正退出,从 STOPPING 集合移除(不管是正常 / 失败 / 被停止) + stoppingHistoryIds.remove(progressHistory.getId()); + // 关闭异步 size 刷新线程,等正在跑的最多 30 秒 + sizeRefreshExecutor.shutdown(); + try { + if (!sizeRefreshExecutor.awaitTermination(30, java.util.concurrent.TimeUnit.SECONDS)) { + sizeRefreshExecutor.shutdownNow(); + } + } + catch (InterruptedException ie) { + sizeRefreshExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } history.setUpdateTime(new Date()); history.setElapsed((history.getUpdateTime().getTime() - history.getCreateTime().getTime()) / 1000); history.setMode(QueryMode.SYNC); history.setCount(response.getCount()); history.setState(response.getState()); + if (response.getMessage() != null) { + history.setMessage(response.getMessage()); + } + if (response.getSuccessful()) { + history.setProcessedCount((long) response.getCount()); + if (history.getTotalCount() == null || history.getTotalCount() < 0L) { + history.setTotalCount((long) response.getCount()); + } + history.setProgress(java.math.BigDecimal.valueOf(100.0).setScale(2, java.math.RoundingMode.HALF_UP)); + } + historyRepository.save(history); + // STOPPED 是用户主动停止,已经把最终状态写入 history,无需当作异常抛出 / 也不刷 table metadata + if (response.getState() == RunState.STOPPED) { + return; + } Preconditions.checkArgument(response.getSuccessful(), response.getMessage()); this.flushTableMetadata( @@ -1081,7 +1371,7 @@ private DataSetEntity syncData(DataSetEntity entity, java.util.concurrent.Execut }); }, () -> { - log.warn("Executor [ {} ] not found", entity.getExecutor()); + log.error("Executor [ {} ] not found", entity.getExecutor()); history.setMessage(String.format("Executor [ %s ] not found", entity.getExecutor())); history.setState(RunState.FAILURE); historyRepository.save(history); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/RuntimeConfigureServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/RuntimeConfigureServiceImpl.java new file mode 100644 index 0000000000..a6975fc40f --- /dev/null +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/RuntimeConfigureServiceImpl.java @@ -0,0 +1,125 @@ +package io.edurt.datacap.service.service.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.common.response.CommonResponse; +import io.edurt.datacap.plugin.configure.PluginConfigureField; +import io.edurt.datacap.service.entity.ConfigureEntity; +import io.edurt.datacap.service.repository.ConfigureRepository; +import io.edurt.datacap.service.service.RuntimeConfigureService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Slf4j +@Service +@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) +public class RuntimeConfigureServiceImpl + implements RuntimeConfigureService +{ + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE = new TypeReference>() {}; + + private final ConfigureRepository repository; + + public RuntimeConfigureServiceImpl(ConfigureRepository repository) + { + this.repository = repository; + } + + @Override + public Map getEffective(String category, String name, List fields) + { + Map result = new LinkedHashMap<>(); + // 先 fill 默认值,保证调用方拿到的 map 字段齐全 + if (fields != null) { + for (PluginConfigureField field : fields) { + if (field.getDefaultValue() != null) { + result.put(field.getName(), field.getDefaultValue()); + } + } + } + // 再 overlay DB 行 + result.putAll(getRaw(category, name)); + return result; + } + + @Override + public Map getRaw(String category, String name) + { + return repository.findByCategoryAndName(category, name) + .map(entity -> parseJson(entity.getConfigure())) + .orElseGet(Collections::emptyMap); + } + + @Override + public List list(String category) + { + return repository.findAllByCategoryOrderByName(category); + } + + @Override + public CommonResponse save(String category, String name, Map configMap, String description) + { + try { + Optional existing = repository.findByCategoryAndName(category, name); + ConfigureEntity entity = existing.orElseGet(ConfigureEntity::new); + entity.setCategory(category); + entity.setName(name); + entity.setConfigure(MAPPER.writeValueAsString(configMap == null ? Collections.emptyMap() : configMap)); + if (description != null) { + entity.setDescription(description); + } + entity.setUpdateTime(new Date()); + ConfigureEntity saved = repository.save(entity); + return CommonResponse.success(saved); + } + catch (Exception ex) { + log.error("Save runtime configure failed: category={} name={}", category, name, ex); + return CommonResponse.failure("Save configure failed: " + ex.getMessage()); + } + } + + @Override + public void seedIfAbsent(String category, String name, List fields, String description) + { + if (repository.existsByCategoryAndName(category, name)) { + return; + } + Map defaults = new LinkedHashMap<>(); + if (fields != null) { + for (PluginConfigureField field : fields) { + if (field.getDefaultValue() != null) { + defaults.put(field.getName(), field.getDefaultValue()); + } + } + } + CommonResponse response = save(category, name, defaults, description); + if (Boolean.TRUE.equals(response.getStatus())) { + log.info("Seeded runtime configure: category={} name={} fields={}", category, name, defaults.size()); + } + } + + private Map parseJson(String json) + { + if (json == null || json.isEmpty()) { + return new HashMap<>(); + } + try { + Map parsed = MAPPER.readValue(json, MAP_TYPE); + return parsed == null ? new HashMap<>() : parsed; + } + catch (Exception ex) { + log.warn("Parse runtime configure JSON failed: {}", ex.getMessage()); + return new HashMap<>(); + } + } +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/WorkflowServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/WorkflowServiceImpl.java index f5143c6f67..69f7f138ff 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/WorkflowServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/WorkflowServiceImpl.java @@ -22,6 +22,7 @@ import io.edurt.datacap.service.repository.BaseRepository; import io.edurt.datacap.service.repository.WorkflowRepository; import io.edurt.datacap.service.security.UserDetailsService; +import io.edurt.datacap.service.service.RuntimeConfigureService; import io.edurt.datacap.service.service.WorkflowService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.MapUtils; @@ -42,7 +43,6 @@ import java.util.concurrent.Executors; import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; @Slf4j @Service @@ -54,13 +54,15 @@ public class WorkflowServiceImpl private final PluginManager pluginManager; private final Environment environment; private final WorkflowRepository repository; + private final RuntimeConfigureService runtimeConfigureService; - public WorkflowServiceImpl(InitializerConfigure initializer, PluginManager pluginManager, Environment environment, WorkflowRepository repository) + public WorkflowServiceImpl(InitializerConfigure initializer, PluginManager pluginManager, Environment environment, WorkflowRepository repository, RuntimeConfigureService runtimeConfigureService) { this.initializer = initializer; this.pluginManager = pluginManager; this.environment = environment; this.repository = repository; + this.runtimeConfigureService = runtimeConfigureService; } @Override @@ -147,10 +149,13 @@ public CommonResponse saveOrUpdate(BaseRepository executorCfg = runtimeConfigureService.getEffective( + RuntimeConfigureService.CATEGORY_EXECUTOR, + plugin.getName(), + plugin.configures() ); + String executorHome = executorCfg.get("home"); log.debug("Executor home directory: {}", executorHome); ExecutorRequest request = new ExecutorRequest( @@ -160,10 +165,10 @@ public CommonResponse saveOrUpdate(BaseRepository columns, + int batchSize) + { + if (type().equals(PluginType.JDBC)) { + return JdbcStreamAdapter.openBatchWriter(this, configure, database, table, columns, batchSize); + } + else { + throw new UnsupportedOperationException("Batch write is not supported for plugin " + this.name()); + } + } + /** * 获取数据库支持的引擎 * Get database supported engines diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/BatchWriter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/BatchWriter.java new file mode 100644 index 0000000000..5878d4470c --- /dev/null +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/BatchWriter.java @@ -0,0 +1,14 @@ +package io.edurt.datacap.spi.adapter; + +import java.util.List; + +public interface BatchWriter + extends AutoCloseable +{ + void addRow(List row); + + long writtenCount(); + + @Override + void close(); +} diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcStreamAdapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcStreamAdapter.java new file mode 100644 index 0000000000..52666fca9b --- /dev/null +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcStreamAdapter.java @@ -0,0 +1,308 @@ +package io.edurt.datacap.spi.adapter; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.spi.PluginService; +import io.edurt.datacap.spi.connection.JdbcConnection; +import io.edurt.datacap.spi.model.Configure; +import io.edurt.datacap.spi.model.Response; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@Slf4j +@SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION", "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"}) +public final class JdbcStreamAdapter +{ + private static final int DEFAULT_FETCH_SIZE = 1000; + private static final int DEFAULT_BATCH_SIZE = 1000; + + private JdbcStreamAdapter() {} + + public static void executeStream( + PluginService plugin, + Configure configure, + String sql, + int fetchSize, + RowCallback callback) + { + int effectiveFetchSize = fetchSize > 0 ? fetchSize : DEFAULT_FETCH_SIZE; + JdbcConnection jdbcConnection = openConnection(plugin, configure); + try { + Connection connection = (Connection) jdbcConnection.getConnection(); + if (connection == null) { + throw new IllegalStateException("Open jdbc connection failed: " + + Optional.ofNullable(jdbcConnection.getResponse().getMessage()).orElse("unknown")); + } + + boolean restoreAutoCommit = false; + boolean originalAutoCommit = true; + try { + originalAutoCommit = connection.getAutoCommit(); + if (originalAutoCommit) { + connection.setAutoCommit(false); + restoreAutoCommit = true; + } + } + catch (SQLException ignore) { + // Some drivers (e.g. read-only HTTP based) do not support autoCommit toggling; just continue. + } + + try (Statement statement = connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY)) { + applyFetchSize(statement, effectiveFetchSize, configure); + // 暴露给上层,方便取消逻辑直接调用 Statement.cancel() + callback.onStatement(statement); + try (ResultSet rs = statement.executeQuery(sql)) { + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + List headers = new ArrayList<>(columnCount); + List types = new ArrayList<>(columnCount); + for (int i = 1; i <= columnCount; i++) { + String label = metaData.getColumnLabel(i); + headers.add(label != null && !label.isEmpty() ? label : metaData.getColumnName(i)); + types.add(metaData.getColumnTypeName(i)); + } + callback.onSchema(headers, types); + while (rs.next()) { + List row = new ArrayList<>(columnCount); + for (int i = 1; i <= columnCount; i++) { + row.add(rs.getObject(i)); + } + callback.onRow(row); + } + } + } + finally { + if (restoreAutoCommit) { + try { + connection.setAutoCommit(originalAutoCommit); + } + catch (SQLException ignore) { + } + } + } + } + catch (SQLException ex) { + throw new IllegalStateException("Stream read failed: " + ex.getMessage(), ex); + } + finally { + jdbcConnection.destroy(); + } + } + + public static BatchWriter openBatchWriter( + PluginService plugin, + Configure configure, + String database, + String table, + List columns, + int batchSize) + { + if (columns == null || columns.isEmpty()) { + throw new IllegalArgumentException("Batch writer requires at least one column"); + } + int effectiveBatchSize = batchSize > 0 ? batchSize : DEFAULT_BATCH_SIZE; + JdbcConnection jdbcConnection = openConnection(plugin, configure); + Connection connection = (Connection) jdbcConnection.getConnection(); + if (connection == null) { + jdbcConnection.destroy(); + throw new IllegalStateException("Open jdbc connection failed: " + + Optional.ofNullable(jdbcConnection.getResponse().getMessage()).orElse("unknown")); + } + return new JdbcBatchWriter(jdbcConnection, connection, database, table, columns, effectiveBatchSize); + } + + private static JdbcConnection openConnection(PluginService plugin, Configure configure) + { + Response response = new Response(); + configure.setDriver(plugin.driver()); + configure.setType(plugin.connectType()); + configure.setUrl(Optional.of(plugin.url(configure))); + return new JdbcConnection(configure, response); + } + + private static void applyFetchSize(Statement statement, int fetchSize, Configure configure) + { + try { + String type = configure.getType(); + // MySQL only streams when fetchSize is Integer.MIN_VALUE with TYPE_FORWARD_ONLY + CONCUR_READ_ONLY. + // The plugin's connectType is "datacap"; the original driver type is in the plugin itself, but the URL + // encodes the actual database. Detect by URL prefix. + String url = configure.getUrl().orElse(""); + if (url.startsWith("jdbc:mysql") || url.startsWith("jdbc:mariadb")) { + statement.setFetchSize(Integer.MIN_VALUE); + } + else { + statement.setFetchSize(fetchSize); + } + if (type != null && type.toLowerCase().contains("postgres")) { + // PostgreSQL also needs autoCommit=false for cursor-based fetch (handled by caller). + statement.setFetchSize(fetchSize); + } + } + catch (SQLException ex) { + log.warn("Set fetch size failed, falling back to driver default: {}", ex.getMessage()); + } + } + + private static final class JdbcBatchWriter + implements BatchWriter + { + private final JdbcConnection jdbcConnection; + private final Connection connection; + private final PreparedStatement statement; + private final int columnCount; + private final int batchSize; + private int pending; + private long written; + private boolean originalAutoCommit = true; + private boolean restoreAutoCommit; + + JdbcBatchWriter( + JdbcConnection jdbcConnection, + Connection connection, + String database, + String table, + List columns, + int batchSize) + { + this.jdbcConnection = jdbcConnection; + this.connection = connection; + this.columnCount = columns.size(); + this.batchSize = batchSize; + + try { + this.originalAutoCommit = connection.getAutoCommit(); + if (this.originalAutoCommit) { + connection.setAutoCommit(false); + this.restoreAutoCommit = true; + } + } + catch (SQLException ignore) { + } + + String sql = buildInsertTemplate(database, table, columns); + try { + this.statement = connection.prepareStatement(sql); + } + catch (SQLException ex) { + safeClose(); + throw new IllegalStateException("Prepare insert failed: " + ex.getMessage(), ex); + } + } + + @Override + public void addRow(List row) + { + if (row == null || row.size() != columnCount) { + throw new IllegalArgumentException( + "Row size " + (row == null ? -1 : row.size()) + " does not match column count " + columnCount); + } + try { + for (int i = 0; i < columnCount; i++) { + Object value = row.get(i); + if (value == null) { + statement.setObject(i + 1, null); + } + else { + statement.setObject(i + 1, value); + } + } + statement.addBatch(); + pending++; + if (pending >= batchSize) { + flush(); + } + } + catch (SQLException ex) { + throw new IllegalStateException("Add batch row failed: " + ex.getMessage(), ex); + } + } + + @Override + public long writtenCount() + { + return written; + } + + @Override + public void close() + { + try { + if (pending > 0) { + flush(); + } + } + catch (SQLException ex) { + throw new IllegalStateException("Flush final batch failed: " + ex.getMessage(), ex); + } + finally { + safeClose(); + } + } + + private void flush() + throws SQLException + { + statement.executeBatch(); + connection.commit(); + written += pending; + pending = 0; + statement.clearBatch(); + } + + private void safeClose() + { + try { + if (statement != null) { + statement.close(); + } + } + catch (SQLException ex) { + log.warn("Close prepared statement failed: {}", ex.getMessage()); + } + try { + if (restoreAutoCommit) { + connection.setAutoCommit(originalAutoCommit); + } + } + catch (SQLException ignore) { + } + jdbcConnection.destroy(); + } + } + + private static String buildInsertTemplate(String database, String table, List columns) + { + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO "); + if (database != null && !database.isEmpty()) { + sb.append('`').append(database).append("`."); + } + sb.append('`').append(table).append("` ("); + for (int i = 0; i < columns.size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append('`').append(columns.get(i)).append('`'); + } + sb.append(") VALUES ("); + for (int i = 0; i < columns.size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append('?'); + } + sb.append(')'); + return sb.toString(); + } +} diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/RowCallback.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/RowCallback.java new file mode 100644 index 0000000000..16c495b80e --- /dev/null +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/RowCallback.java @@ -0,0 +1,27 @@ +package io.edurt.datacap.spi.adapter; + +import java.sql.Statement; +import java.util.List; + +public interface RowCallback +{ + /** + * 在第一行之前回调一次,传入列元数据。默认空实现。 + * Called once before any row, carrying the column metadata of the source result. + */ + default void onSchema(List headers, List types) {} + + /** + * 在执行 query 之前回调一次,把底层 Statement 暴露给上层, + * 让取消逻辑可以直接调用 {@link Statement#cancel()} 中断阻塞中的 fetch。 + * Called once right before the underlying query executes, exposing the JDBC Statement so the caller + * can issue {@link Statement#cancel()} to interrupt a blocked fetch. + */ + default void onStatement(Statement statement) {} + + /** + * 每一行回调一次。 + * Called per row. Values are aligned to {@code headers} from {@link #onSchema}. + */ + void onRow(List row); +} diff --git a/core/datacap-ui/src/router/default.ts b/core/datacap-ui/src/router/default.ts index 860e253c05..5a920c3611 100644 --- a/core/datacap-ui/src/router/default.ts +++ b/core/datacap-ui/src/router/default.ts @@ -113,6 +113,15 @@ const createSystemRouter = (router: any) => { isRoot: false }, component: () => import('@/views/pages/system/menu/MenuHome.vue') + }, + { + name: 'configure', + path: 'configure', + meta: { + title: 'configure.runtime.title', + isRoot: false + }, + component: () => import('@/views/pages/system/configure/ConfigureHome.vue') } ] } diff --git a/core/datacap-ui/src/services/dataset.ts b/core/datacap-ui/src/services/dataset.ts index e06eece3f6..6f0a03dff9 100644 --- a/core/datacap-ui/src/services/dataset.ts +++ b/core/datacap-ui/src/services/dataset.ts @@ -59,15 +59,48 @@ export class DatasetService return new HttpUtils().post(`${ DEFAULT_PATH }/history/${ code }`, configure) } + /** + * Read the executor's task log of a given sync history record. + * + * @param {number} id - the dataset history id + * @return {Promise} response containing the log lines (List) + */ + getHistoryLog(id: number): Promise + { + return new HttpUtils().get(`${ DEFAULT_PATH }/history/log/${ id }`) + } + + /** + * Stop a running sync task identified by the given history record id. + * + * @param {number} id - the dataset history id (must be RUNNING) + * @return {Promise} success when the cancel flag has been set + */ + stopHistory(id: number): Promise + { + return new HttpUtils().put(`${ DEFAULT_PATH }/history/stop/${ id }`) + } + /** * Sync data with the server using the provided id. + * Optional `overrides` is a map of tunable executor fields the user wants to change for this run only. * * @param {string} code - The id of the data to sync + * @param {Record} [overrides] - per-invocation overrides for tunable fields * @return {Promise} A promise that resolves with the response from the server */ - syncData(code: string): Promise + syncData(code: string, overrides?: Record): Promise + { + return new HttpUtils().put(`${ DEFAULT_PATH }/syncData/${ code }`, overrides || {}) + } + + /** + * Fetch the tunable executor fields for this dataset (with current effective values prefilled). + * Used by the sync dialog to render an editable form. + */ + getSyncFields(code: string): Promise { - return new HttpUtils().put(`${ DEFAULT_PATH }/syncData/${ code }`) + return new HttpUtils().get(`${ DEFAULT_PATH }/syncFields/${ code }`) } /** diff --git a/core/datacap-ui/src/services/runtimeConfigure.ts b/core/datacap-ui/src/services/runtimeConfigure.ts new file mode 100644 index 0000000000..2405e878ca --- /dev/null +++ b/core/datacap-ui/src/services/runtimeConfigure.ts @@ -0,0 +1,24 @@ +import { ResponseModel } from '@/model/response' +import { HttpUtils } from '@/utils/http' + +const DEFAULT_PATH = '/api/v1/configure/runtime' + +export class RuntimeConfigureService +{ + list(category: string): Promise + { + return new HttpUtils().get(`${ DEFAULT_PATH }/list/${ category }`) + } + + detail(category: string, name: string): Promise + { + return new HttpUtils().get(`${ DEFAULT_PATH }/detail/${ category }/${ name }`) + } + + save(category: string, name: string, configure: Record): Promise + { + return new HttpUtils().put(`${ DEFAULT_PATH }/save/${ category }/${ name }`, configure) + } +} + +export default new RuntimeConfigureService() diff --git a/core/datacap-ui/src/utils/common.ts b/core/datacap-ui/src/utils/common.ts index 0bacd15da8..16837ab7f8 100644 --- a/core/datacap-ui/src/utils/common.ts +++ b/core/datacap-ui/src/utils/common.ts @@ -29,8 +29,12 @@ const getColor = (origin: string): string => { return 'hsl(142.1 76.2% 36.3%)' case 'FAILURE': return 'hsl(346.8 77.2% 49.8%)' + case 'STOPPING': + return 'hsl(38 92% 50%)' case 'STOPPED': return '#17233d' + case 'INTERRUPTED': + return 'hsl(280 60% 50%)' case 'TIMEOUT': return 'hsl(47.9 95.8% 53.1%)' default: @@ -79,8 +83,12 @@ export function useUtil() return t('state.common.success') case 'FAILURE': return t('state.common.failure') + case 'STOPPING': + return t('state.common.stopping') case 'STOPPED': return t('state.common.stop') + case 'INTERRUPTED': + return t('state.common.interrupted') case 'TIMEOUT': return t('state.common.timeout') case 'QUEUE': @@ -90,7 +98,42 @@ export function useUtil() } } + /** + * Translate a DataSetState enum code (e.g. "TABLE_SUCCESS") to a localized label. + * Falls back to the raw code when no translation key matches. + */ + const getDatasetStateText = (origin: string | null | undefined): string => { + if (!origin) return '' + switch (origin) { + case 'METADATA_START': + return t('dataset.state.metadataStart') + case 'METADATA_FAILED': + return t('dataset.state.metadataFailed') + case 'METADATA_SUCCESS': + return t('dataset.state.metadataSuccess') + case 'TABLE_START': + return t('dataset.state.tableStart') + case 'TABLE_FAILED': + return t('dataset.state.tableFailed') + case 'TABLE_SUCCESS': + return t('dataset.state.tableSuccess') + case 'DATA_START': + return t('dataset.state.dataStart') + case 'DATA_FAILED': + return t('dataset.state.dataFailed') + case 'DATA_SUCCESS': + return t('dataset.state.dataSuccess') + case 'COMPLETE_FAILED': + return t('dataset.state.completeFailed') + case 'COMPLETE_SUCCESS': + return t('dataset.state.completeSuccess') + default: + return origin + } + } + return { - getText + getText, + getDatasetStateText } } diff --git a/core/datacap-ui/src/views/components/editor/flow/FlowEditor.vue b/core/datacap-ui/src/views/components/editor/flow/FlowEditor.vue index 2130d858ac..4f354bab99 100644 --- a/core/datacap-ui/src/views/components/editor/flow/FlowEditor.vue +++ b/core/datacap-ui/src/views/components/editor/flow/FlowEditor.vue @@ -171,7 +171,7 @@ export default defineComponent({ return { configureVisible: false, contextData: null, - configure: { executor: 'SeatunnelExecutor', from: null, to: null, flow: null } + configure: { executor: 'Seatunnel', from: null, to: null, flow: null } } }, methods: { diff --git a/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistory.vue b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistory.vue index d35ada2670..81f3a550ab 100644 --- a/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistory.vue +++ b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistory.vue @@ -8,7 +8,7 @@ + + + + + + + +
{{ formatJson(configureInfo) }}
+ +
+ { + if (response.status) { + this.$Message.success({ + content: this.$t('dataset.history.stopRequested'), + showIcon: true + }) + this.handleInitialize() + } + else { + this.$Message.error({ + content: response.message, + showIcon: true + }) + } + }) + .finally(() => this.stoppingId = null) } } }) diff --git a/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistoryLogger.vue b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistoryLogger.vue new file mode 100644 index 0000000000..f6dbb98567 --- /dev/null +++ b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHistoryLogger.vue @@ -0,0 +1,155 @@ + + + diff --git a/core/datacap-ui/src/views/pages/admin/dataset/DatasetHome.vue b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHome.vue index d8c4ada51f..f31aa65391 100644 --- a/core/datacap-ui/src/views/pages/admin/dataset/DatasetHome.vue +++ b/core/datacap-ui/src/views/pages/admin/dataset/DatasetHome.vue @@ -160,6 +160,7 @@ import MarkdownPreview from '@/views/components/markdown/MarkdownView.vue' import DatasetRebuild from '@/views/pages/admin/dataset/DatasetRebuild.vue' import DatasetClear from '@/views/pages/admin/dataset/DatasetClear.vue' import DatasetDelete from '@/views/pages/admin/dataset/DatasetDelete.vue' +import { useUtil } from '@/utils/common' export default defineComponent({ name: 'DatasetHome', @@ -168,10 +169,12 @@ export default defineComponent({ { const filter: FilterModel = new FilterModel() const { headers } = useDatasetHeaders() + const { getDatasetStateText } = useUtil() return { filter, - headers + headers, + getDatasetStateText } }, data() @@ -282,7 +285,8 @@ export default defineComponent({ getState(state: Array | null): string | null { if (state && state.length > 0) { - return state[state.length - 1] + const last = state[state.length - 1] + return last ? this.getDatasetStateText(String(last)) : null } return null }, diff --git a/core/datacap-ui/src/views/pages/admin/dataset/DatasetInfo.vue b/core/datacap-ui/src/views/pages/admin/dataset/DatasetInfo.vue index fcd5edc6c3..abd07101db 100644 --- a/core/datacap-ui/src/views/pages/admin/dataset/DatasetInfo.vue +++ b/core/datacap-ui/src/views/pages/admin/dataset/DatasetInfo.vue @@ -338,7 +338,7 @@ export default defineComponent({ source: { code: null }, expression: null as string | null, scheduler: 'Default', - executor: 'LocalExecutor', + executor: 'Local', lifeCycle: null as number | null, lifeCycleColumn: null as string | null, lifeCycleType: null as string | null diff --git a/core/datacap-ui/src/views/pages/admin/dataset/DatasetSync.vue b/core/datacap-ui/src/views/pages/admin/dataset/DatasetSync.vue index 5933fc9266..18d2a21d45 100644 --- a/core/datacap-ui/src/views/pages/admin/dataset/DatasetSync.vue +++ b/core/datacap-ui/src/views/pages/admin/dataset/DatasetSync.vue @@ -2,6 +2,22 @@ +
{{ $t('common.loading') }}
+ +
+
{{ $t('dataset.sync.overrideTitle') }}
+
+ +
{{ field.description }}
+ + + + + +
+
+