From 63322e79afe74d9dc49b6c0a9a0e728b3627139d Mon Sep 17 00:00:00 2001 From: Pavlo Kulyk Date: Fri, 24 Apr 2026 11:51:22 +0300 Subject: [PATCH 1/6] feat: implement aggregation methods with support for grouping and median calculations in Clickhouse, MongoDB, and MySQL connectors --- adminforth/dataConnectors/clickhouse.ts | 70 ++++++++++++++++- adminforth/dataConnectors/mongo.ts | 99 ++++++++++++++++++++++++- adminforth/dataConnectors/mysql.ts | 78 ++++++++++++++++++- 3 files changed, 243 insertions(+), 4 deletions(-) diff --git a/adminforth/dataConnectors/clickhouse.ts b/adminforth/dataConnectors/clickhouse.ts index 49519cbb9..208b730d5 100644 --- a/adminforth/dataConnectors/clickhouse.ts +++ b/adminforth/dataConnectors/clickhouse.ts @@ -1,4 +1,4 @@ -import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn } from '../types/Back.js'; +import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import AdminForthBaseConnector from './baseConnector.js'; import dayjs from 'dayjs'; import { createClient } from '@clickhouse/client' @@ -444,13 +444,79 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth return { where, params }; } + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: { [alias: string]: IAggregationRule }; + groupBy?: IGroupByRule; + }): Promise> { + + const tableName = `${this.dbName}.${resource.table}`; + + const selectParts: string[] = []; + let groupExpr: string | null = null; + + if (groupBy?.type === 'date_trunc') { + const g = groupBy as IGroupByDateTrunc; + const tz = g.timezone ?? 'UTC'; + + const field = `toTimeZone(${g.field}, '${tz}')`; + + switch (g.truncation) { + case 'day': groupExpr = `toDate(toStartOfDay(${field}))`; break; + case 'month': groupExpr = `toDate(toStartOfMonth(${field}))`; break; + case 'week': groupExpr = `toDate(toStartOfWeek(${field}))`; break; + case 'year': groupExpr = `toDate(toStartOfYear(${field}))`; break; + } + + selectParts.push(`${groupExpr} AS \`group\``); + + } else if (groupBy?.type === 'field') { + const g = groupBy as IGroupByField; + groupExpr = `${g.field}`; + selectParts.push(`${groupExpr} AS \`group\``); + } + + for (const [alias, rule] of Object.entries(aggregations)) { + switch (rule.operation) { + case 'count': selectParts.push(`count() AS \`${alias}\``); break; + case 'sum': selectParts.push(`sum(${rule.field}) AS \`${alias}\``); break; + case 'avg': selectParts.push(`avg(${rule.field}) AS \`${alias}\``); break; + case 'min': selectParts.push(`min(${rule.field}) AS \`${alias}\``); break; + case 'max': selectParts.push(`max(${rule.field}) AS \`${alias}\``); break; + case 'median': selectParts.push(`quantile(0.5)(${rule.field}) AS \`${alias}\``); break; + } + } + + const { where, params } = this.whereClause(resource, filters); + + let query = `SELECT ${selectParts.join(', ')} FROM ${tableName} ${where}`; + + if (groupExpr) { + query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; + } + + const result = await this.client.query({ + query, + format: 'JSONEachRow', + query_params: params, + }); + + const rows = await result.json(); + + return rows.map((r: any) => ({ + group: r.group, + ...r, + })); + } + async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: { resource: AdminForthResource, limit: number, offset: number, sort: { field: string, direction: AdminForthSortDirections }[], filters: IAdminForthAndOrFilter, - }): Promise { + }): Promise> { const columns = resource.dataSourceColumns.map((col) => { // for decimal cast to string if (col.type == AdminForthDataTypes.DECIMAL) { diff --git a/adminforth/dataConnectors/mongo.ts b/adminforth/dataConnectors/mongo.ts index 7f26d1a7a..f951bd732 100644 --- a/adminforth/dataConnectors/mongo.ts +++ b/adminforth/dataConnectors/mongo.ts @@ -1,7 +1,7 @@ import dayjs from 'dayjs'; import { MongoClient } from 'mongodb'; import { Decimal128, Double } from 'bson'; -import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource } from '../types/Back.js'; +import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import AdminForthBaseConnector from './baseConnector.js'; import { afLogger } from '../modules/logger.js'; import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js'; @@ -305,6 +305,103 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS .filter((f) => (f as IAdminForthSingleFilter).insecureRawSQL === undefined) .map((f) => this.getFilterQuery(resource, f))); } + + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: any; + groupBy?: any; + }): Promise> { + + const collection = this.client.db().collection(resource.table); + + const match = filters?.subFilters?.length ? this.getFilterQuery(resource, filters) : {}; + + let groupId: any = null; + + if (groupBy?.type === 'field') { + groupId = `$${groupBy.field}`; + } + + if (groupBy?.type === 'date_trunc') { + const tz = groupBy.timezone ?? 'UTC'; + + groupId = { + $dateTrunc: { + date: `$${groupBy.field}`, + unit: groupBy.truncation, + timezone: tz, + }, + }; + } + + const groupStage: any = { + _id: groupId, + }; + + for (const [alias, rule] of Object.entries(aggregations) as any) { + switch (rule.operation) { + case 'count': groupStage[alias] = { $sum: 1 }; break; + case 'sum': groupStage[alias] = { $sum: { $toDouble: `$${rule.field}` } }; break; + case 'avg': groupStage[alias] = { $avg: { $toDouble: `$${rule.field}` } }; break; + case 'min': groupStage[alias] = { $min: { $toDouble: `$${rule.field}` } }; break; + case 'max': groupStage[alias] = { $max: { $toDouble: `$${rule.field}` } }; break; + case 'median': groupStage[alias] = { $push: { $toDouble: `$${rule.field}` } }; break; + } + } + + const pipeline: any[] = []; + + if (Object.keys(match).length) { + pipeline.push({ $match: match }); + } + + pipeline.push({ $group: groupStage }); + + pipeline.push({ + $project: { + _id: 0, + group: { + $cond: { + if: { $isNumber: "$_id" }, + then: "$_id", + else: { + $cond: { + if: { $toString: "$_id" }, + then: { $dateToString: { format: "%Y-%m-%d", date: "$_id", timezone: groupBy?.timezone ?? 'UTC' } }, + else: "$_id" + } + } + } + }, + ...Object.fromEntries( + Object.keys(groupStage) + .filter(k => k !== '_id') + .map(k => [k, `$${k}`]) + ), + }, + }); + + const calculateMedian = (arr: any[]) => { + if (!Array.isArray(arr) || arr.length === 0) return null; + const sorted = [...arr].sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 === 0 + ? (sorted[mid - 1] + sorted[mid]) / 2 + : sorted[mid]; + }; + + const result = await collection.aggregate(pipeline).toArray(); + + const medianAliases = Object.keys(aggregations).filter(alias => aggregations[alias].operation === 'median'); + + return result.map(row => { + medianAliases.forEach(alias => { + row[alias] = calculateMedian(row[alias]); + }); + return row; + }); + } async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: { diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index 368f1f403..5a5e4b1d3 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -1,5 +1,5 @@ import dayjs from 'dayjs'; -import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig } from '../types/Back.js'; +import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js'; import AdminForthBaseConnector from './baseConnector.js'; import mysql from 'mysql2/promise'; @@ -338,6 +338,82 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS } : { sql: '', values: [] }; } + private calculateMedian(values: number[]): number | null { + if (!values.length) return null; + const sorted = values.sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 === 0 + ? (sorted[mid - 1] + sorted[mid]) / 2 + : sorted[mid]; + } + + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: { [alias: string]: IAggregationRule }; + groupBy?: IGroupByRule; + }): Promise> { + const tableName = resource.table; + const selectParts: string[] = []; + const medianAliases: string[] = []; + let groupExpr: string | null = null; + + if (groupBy?.type === 'field') { + groupExpr = `\`${groupBy.field}\``; + selectParts.push(`${groupExpr} AS \`group\``); + } else if (groupBy?.type === 'date_trunc') { + const g = groupBy as IGroupByDateTrunc; + const tz = g.timezone ?? 'UTC'; + + const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`; + + switch (g.truncation) { + case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break; + case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break; + case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break; + case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break; + } + + selectParts.push(`${groupExpr} AS \`group\``); + } + + for (const [alias, rule] of Object.entries(aggregations)) { + const f = `\`${rule.field}\``; + switch (rule.operation) { + case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break; + case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break; + case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break; + case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break; + case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break; + case 'median': + selectParts.push(`GROUP_CONCAT(${f}) AS \`${alias}\``); + medianAliases.push(alias); + break; + } + } + + const { sql: where, values: filterValues } = this.whereClauseAndValues(filters); + let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`; + if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; + + await this.client.execute("SET SESSION group_concat_max_len = 1000000;"); + + const [rows]: any = await this.client.execute(query, filterValues); + + if (medianAliases.length > 0) { + return rows.map(row => { + medianAliases.forEach(alias => { + const raw = row[alias]; + const nums = raw ? raw.split(',').map(Number).filter(n => !isNaN(n)) : []; + row[alias] = this.calculateMedian(nums); + }); + return row; + }); + } + + return rows; + } + async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise { const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', '); const tableName = resource.table; From 0b8f475f0a7d710bfdd95de1fe279c91e79286c7 Mon Sep 17 00:00:00 2001 From: Pavlo Kulyk Date: Fri, 24 Apr 2026 12:44:02 +0300 Subject: [PATCH 2/6] feat: implement aggregation methods with support for grouping and median calculations in Clickhouse, MongoDB, and MySQL connectors --- adminforth/dataConnectors/clickhouse.ts | 2 +- adminforth/dataConnectors/mongo.ts | 18 +++++++----------- adminforth/dataConnectors/mysql.ts | 6 ++++-- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/adminforth/dataConnectors/clickhouse.ts b/adminforth/dataConnectors/clickhouse.ts index 208b730d5..92cd48781 100644 --- a/adminforth/dataConnectors/clickhouse.ts +++ b/adminforth/dataConnectors/clickhouse.ts @@ -449,7 +449,7 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth filters: IAdminForthAndOrFilter; aggregations: { [alias: string]: IAggregationRule }; groupBy?: IGroupByRule; - }): Promise> { + }): Promise { const tableName = `${this.dbName}.${resource.table}`; diff --git a/adminforth/dataConnectors/mongo.ts b/adminforth/dataConnectors/mongo.ts index f951bd732..6daddcb2e 100644 --- a/adminforth/dataConnectors/mongo.ts +++ b/adminforth/dataConnectors/mongo.ts @@ -361,19 +361,15 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS pipeline.push({ $project: { _id: 0, - group: { - $cond: { - if: { $isNumber: "$_id" }, - then: "$_id", - else: { - $cond: { - if: { $toString: "$_id" }, - then: { $dateToString: { format: "%Y-%m-%d", date: "$_id", timezone: groupBy?.timezone ?? 'UTC' } }, - else: "$_id" - } + group: groupBy?.type === 'date_trunc' + ? { + $cond: { + if: { $eq: [{ $type: "$_id" }, "date"] }, + then: { $dateToString: { format: "%Y-%m-%d", date: "$_id", timezone: groupBy?.timezone ?? 'UTC' } }, + else: "$_id" } } - }, + : "$_id", ...Object.fromEntries( Object.keys(groupStage) .filter(k => k !== '_id') diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index 5a5e4b1d3..ec3daf4f6 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -386,7 +386,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break; case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break; case 'median': - selectParts.push(`GROUP_CONCAT(${f}) AS \`${alias}\``); + selectParts.push(`GROUP_CONCAT(${f} ORDER BY ${f} ASC SEPARATOR ',') AS \`${alias}\``); medianAliases.push(alias); break; } @@ -396,7 +396,9 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`; if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; - await this.client.execute("SET SESSION group_concat_max_len = 1000000;"); + if (medianAliases.length > 0) { + await this.client.execute("SET SESSION group_concat_max_len = 1000000;"); + } const [rows]: any = await this.client.execute(query, filterValues); From 235814e598e4641efffc67b692271e604400dda0 Mon Sep 17 00:00:00 2001 From: Pavlo Kulyk Date: Fri, 24 Apr 2026 13:45:23 +0300 Subject: [PATCH 3/6] feat: update Clickhouse and MongoDB connectors to enhance aggregation return types and improve date truncation handling --- adminforth/dataConnectors/clickhouse.ts | 2 +- adminforth/dataConnectors/mongo.ts | 20 ++++++++------------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/adminforth/dataConnectors/clickhouse.ts b/adminforth/dataConnectors/clickhouse.ts index 92cd48781..5c21cd783 100644 --- a/adminforth/dataConnectors/clickhouse.ts +++ b/adminforth/dataConnectors/clickhouse.ts @@ -449,7 +449,7 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth filters: IAdminForthAndOrFilter; aggregations: { [alias: string]: IAggregationRule }; groupBy?: IGroupByRule; - }): Promise { + }): Promise > { const tableName = `${this.dbName}.${resource.table}`; diff --git a/adminforth/dataConnectors/mongo.ts b/adminforth/dataConnectors/mongo.ts index 6daddcb2e..9cc76918a 100644 --- a/adminforth/dataConnectors/mongo.ts +++ b/adminforth/dataConnectors/mongo.ts @@ -325,14 +325,11 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS if (groupBy?.type === 'date_trunc') { const tz = groupBy.timezone ?? 'UTC'; - - groupId = { - $dateTrunc: { - date: `$${groupBy.field}`, - unit: groupBy.truncation, - timezone: tz, - }, - }; + const dateTruncSpec: any = { date: `$${groupBy.field}`, unit: groupBy.truncation, timezone: tz,}; + if (groupBy.truncation === 'week') { + dateTruncSpec.startOfWeek = 'Mon'; + } + groupId = { $dateTrunc: dateTruncSpec,}; } const groupStage: any = { @@ -361,10 +358,9 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS pipeline.push({ $project: { _id: 0, - group: groupBy?.type === 'date_trunc' - ? { - $cond: { - if: { $eq: [{ $type: "$_id" }, "date"] }, + group: !groupBy ? "$$REMOVE" : groupBy.type === 'date_trunc' ? { + $cond: { + if: { $eq: [{ $type: "$_id" }, "date"] }, then: { $dateToString: { format: "%Y-%m-%d", date: "$_id", timezone: groupBy?.timezone ?? 'UTC' } }, else: "$_id" } From ed5ae7e9c66477baa6c963841f9cc9af3a9d5f94 Mon Sep 17 00:00:00 2001 From: Pavlo Kulyk Date: Fri, 24 Apr 2026 14:20:30 +0300 Subject: [PATCH 4/6] feat: enhance median calculation in MySQL connector by managing session variable for group_concat_max_len --- adminforth/dataConnectors/mysql.ts | 46 ++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index ec3daf4f6..8ddc6c721 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -397,23 +397,39 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; if (medianAliases.length > 0) { - await this.client.execute("SET SESSION group_concat_max_len = 1000000;"); - } - - const [rows]: any = await this.client.execute(query, filterValues); - - if (medianAliases.length > 0) { - return rows.map(row => { - medianAliases.forEach(alias => { - const raw = row[alias]; - const nums = raw ? raw.split(',').map(Number).filter(n => !isNaN(n)) : []; - row[alias] = this.calculateMedian(nums); + let connection; + let originalMaxLen = null; + try { + connection = await this.client.getConnection(); + + const [originalLenRows]: any = await connection.execute("SELECT @@SESSION.group_concat_max_len as original_len"); + originalMaxLen = originalLenRows[0].original_len; + + await connection.execute("SET SESSION group_concat_max_len = 1000000"); + + const [rows]: any = await connection.execute(query, filterValues); + + return rows.map((row: any) => { + medianAliases.forEach(alias => { + const raw = row[alias]; + const nums = raw ? raw.split(',').map(Number).filter((n: number) => !isNaN(n)) : []; + row[alias] = this.calculateMedian(nums); + }); + return row; }); - return row; - }); - } - return rows; + } finally { + if (connection) { + if (originalMaxLen !== null) { + await connection.execute(`SET SESSION group_concat_max_len = ${originalMaxLen}`); + } + connection.release(); + } + } + } else { + const [rows]: any = await this.client.execute(query, filterValues); + return rows; + } } async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise { From a1f7db1d1cf9c0fe7149fe06a9b44d7fc2be57a0 Mon Sep 17 00:00:00 2001 From: Vitalii Kulyk Date: Fri, 24 Apr 2026 14:47:58 +0300 Subject: [PATCH 5/6] feat: refactor median calculation in MySQL connector to use window functions and improve performance --- adminforth/dataConnectors/mysql.ts | 145 +++++++++++++++++------------ 1 file changed, 83 insertions(+), 62 deletions(-) diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index 8ddc6c721..c6783ca51 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -338,15 +338,6 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS } : { sql: '', values: [] }; } - private calculateMedian(values: number[]): number | null { - if (!values.length) return null; - const sorted = values.sort((a, b) => a - b); - const mid = Math.floor(sorted.length / 2); - return sorted.length % 2 === 0 - ? (sorted[mid - 1] + sorted[mid]) / 2 - : sorted[mid]; - } - async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { resource: AdminForthResource; filters: IAdminForthAndOrFilter; @@ -355,7 +346,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS }): Promise> { const tableName = resource.table; const selectParts: string[] = []; - const medianAliases: string[] = []; + const medianFields: { alias: string; field: string }[] = []; let groupExpr: string | null = null; if (groupBy?.type === 'field') { @@ -364,81 +355,111 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS } else if (groupBy?.type === 'date_trunc') { const g = groupBy as IGroupByDateTrunc; const tz = g.timezone ?? 'UTC'; - + if (!/^[A-Za-z0-9/_+\-]+$/.test(tz)) { + throw new Error(`Invalid timezone value: ${tz}`); + } const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`; - switch (g.truncation) { - case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break; + case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break; case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break; - case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break; - case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break; + case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break; + case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break; } - selectParts.push(`${groupExpr} AS \`group\``); } for (const [alias, rule] of Object.entries(aggregations)) { const f = `\`${rule.field}\``; switch (rule.operation) { - case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break; - case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break; - case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break; - case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break; - case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break; - case 'median': - selectParts.push(`GROUP_CONCAT(${f} ORDER BY ${f} ASC SEPARATOR ',') AS \`${alias}\``); - medianAliases.push(alias); - break; + case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break; + case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break; + case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break; + case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break; + case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break; + case 'median': medianFields.push({ alias, field: rule.field }); break; } } const { sql: where, values: filterValues } = this.whereClauseAndValues(filters); - let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`; - if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; - - if (medianAliases.length > 0) { - let connection; - let originalMaxLen = null; - try { - connection = await this.client.getConnection(); - - const [originalLenRows]: any = await connection.execute("SELECT @@SESSION.group_concat_max_len as original_len"); - originalMaxLen = originalLenRows[0].original_len; - - await connection.execute("SET SESSION group_concat_max_len = 1000000"); - - const [rows]: any = await connection.execute(query, filterValues); - - return rows.map((row: any) => { - medianAliases.forEach(alias => { - const raw = row[alias]; - const nums = raw ? raw.split(',').map(Number).filter((n: number) => !isNaN(n)) : []; - row[alias] = this.calculateMedian(nums); - }); - return row; - }); - - } finally { - if (connection) { - if (originalMaxLen !== null) { - await connection.execute(`SET SESSION group_concat_max_len = ${originalMaxLen}`); + + type AggRow = { group?: string } & Record; + + // Run non-median aggregations + let rows: AggRow[] = []; + const hasNonMedian = selectParts.length > (groupExpr ? 1 : 0); + if (hasNonMedian) { + let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`; + if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; + dbLogger.trace(`🪲📜 MySQL AGG Q: ${query} values: ${JSON.stringify(filterValues)}`); + const [result] = await this.client.execute(query, filterValues); + rows = result as AggRow[]; + } + + // Run each median via window functions (MySQL 8+) — no session variables, no memory pressure + for (const { alias, field } of medianFields) { + const f = `\`${field}\``; + const nullGuard = where ? `${where} AND ${f} IS NOT NULL` : `WHERE ${f} IS NOT NULL`; + + let medianQuery: string; + if (groupExpr) { + medianQuery = ` + SELECT ${groupExpr} AS \`group\`, AVG(${f}) AS \`${alias}\` + FROM ( + SELECT ${groupExpr}, ${f}, + ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn, + COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt + FROM \`${tableName}\` ${nullGuard} + ) t + WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0)) + GROUP BY ${groupExpr} + ORDER BY ${groupExpr} ASC + `; + } else { + medianQuery = ` + SELECT AVG(${f}) AS \`${alias}\` + FROM ( + SELECT ${f}, + ROW_NUMBER() OVER (ORDER BY ${f}) AS rn, + COUNT(*) OVER () AS cnt + FROM \`${tableName}\` ${nullGuard} + ) t + WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0)) + `; + } + + dbLogger.trace(`🪲📜 MySQL MEDIAN Q: ${medianQuery} values: ${JSON.stringify(filterValues)}`); + const [medianResult] = await this.client.execute(medianQuery, filterValues); + const medianRows = medianResult as AggRow[]; + + if (groupExpr) { + if (rows.length === 0) { + rows = medianRows.map((r) => ({ group: r.group, [alias]: r[alias] })); + } else { + const byGroup = new Map(medianRows.map((r) => [String(r.group), r[alias]])); + for (const row of rows) { + row[alias] = byGroup.get(String(row.group)) ?? null; } - connection.release(); + } + } else { + const medianVal = medianRows[0]?.[alias] ?? null; + if (rows.length === 0) { + rows = [{ [alias]: medianVal }]; + } else { + rows[0][alias] = medianVal; } } - } else { - const [rows]: any = await this.client.execute(query, filterValues); - return rows; } + + return rows; } async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise { - const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', '); + const columns = resource.dataSourceColumns.map((col: { name: string }) => `${col.name}`).join(', '); const tableName = resource.table; const { sql: where, values: filterValues } = this.whereClauseAndValues(filters); - const orderBy = sort.length ? `ORDER BY ${sort.map((s) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : ''; + const orderBy = sort.length ? `ORDER BY ${sort.map((s: { field: string; direction: AdminForthSortDirections }) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : ''; let selectQuery = `SELECT ${columns} FROM ${tableName}`; if (where) selectQuery += ` ${where}`; if (orderBy) selectQuery += ` ${orderBy}`; @@ -479,7 +500,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS async getMinMaxForColumnsWithOriginalTypes({ resource, columns }) { const tableName = resource.table; const result = {}; - await Promise.all(columns.map(async (col) => { + await Promise.all(columns.map(async (col: { name: string }) => { const q = `SELECT MIN(${col.name}) as min, MAX(${col.name}) as max FROM ${tableName}`; dbLogger.trace(`🪲📜 MySQL Q: ${q}`); const [results] = await this.client.execute(q); @@ -504,7 +525,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS async updateRecordOriginalValues({ resource, recordId, newValues }) { const values = [...Object.values(newValues), recordId]; - const columnsWithPlaceholders = Object.keys(newValues).map((col, i) => `${col} = ?`).join(', '); + const columnsWithPlaceholders = Object.keys(newValues).map((col) => `${col} = ?`).join(', '); const q = `UPDATE ${resource.table} SET ${columnsWithPlaceholders} WHERE ${this.getPrimaryKey(resource)} = ?`; dbLogger.trace(`🪲📜 MySQL Q: ${q} values: ${JSON.stringify(values)}`); await this.client.execute(q, values); From 70366c6ad5e38810d5900a2cb1f77d7705492a00 Mon Sep 17 00:00:00 2001 From: Vitalii Kulyk Date: Fri, 24 Apr 2026 14:59:35 +0300 Subject: [PATCH 6/6] feat: enhance MySQL connector median query and add new aggregate API endpoints for car statistics --- adminforth/dataConnectors/mysql.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index c6783ca51..63bca6e65 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -403,16 +403,16 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS let medianQuery: string; if (groupExpr) { medianQuery = ` - SELECT ${groupExpr} AS \`group\`, AVG(${f}) AS \`${alias}\` + SELECT \`group\`, AVG(${f}) AS \`${alias}\` FROM ( - SELECT ${groupExpr}, ${f}, + SELECT ${groupExpr} AS \`group\`, ${f}, ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn, COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt FROM \`${tableName}\` ${nullGuard} ) t WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0)) - GROUP BY ${groupExpr} - ORDER BY ${groupExpr} ASC + GROUP BY \`group\` + ORDER BY \`group\` ASC `; } else { medianQuery = `