diff --git a/adminforth/dataConnectors/clickhouse.ts b/adminforth/dataConnectors/clickhouse.ts index 49519cbb..5c21cd78 100644 --- a/adminforth/dataConnectors/clickhouse.ts +++ b/adminforth/dataConnectors/clickhouse.ts @@ -1,4 +1,4 @@ -import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn } from '../types/Back.js'; +import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import AdminForthBaseConnector from './baseConnector.js'; import dayjs from 'dayjs'; import { createClient } from '@clickhouse/client' @@ -444,13 +444,79 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth return { where, params }; } + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: { [alias: string]: IAggregationRule }; + groupBy?: IGroupByRule; + }): Promise > { + + const tableName = `${this.dbName}.${resource.table}`; + + const selectParts: string[] = []; + let groupExpr: string | null = null; + + if (groupBy?.type === 'date_trunc') { + const g = groupBy as IGroupByDateTrunc; + const tz = g.timezone ?? 'UTC'; + + const field = `toTimeZone(${g.field}, '${tz}')`; + + switch (g.truncation) { + case 'day': groupExpr = `toDate(toStartOfDay(${field}))`; break; + case 'month': groupExpr = `toDate(toStartOfMonth(${field}))`; break; + case 'week': groupExpr = `toDate(toStartOfWeek(${field}))`; break; + case 'year': groupExpr = `toDate(toStartOfYear(${field}))`; break; + } + + selectParts.push(`${groupExpr} AS \`group\``); + + } else if (groupBy?.type === 'field') { + const g = groupBy as IGroupByField; + groupExpr = `${g.field}`; + selectParts.push(`${groupExpr} AS \`group\``); + } + + for (const [alias, rule] of Object.entries(aggregations)) { + switch (rule.operation) { + case 'count': selectParts.push(`count() AS \`${alias}\``); break; + case 'sum': selectParts.push(`sum(${rule.field}) AS \`${alias}\``); break; + case 'avg': selectParts.push(`avg(${rule.field}) AS \`${alias}\``); break; + case 'min': selectParts.push(`min(${rule.field}) AS \`${alias}\``); break; + case 'max': selectParts.push(`max(${rule.field}) AS \`${alias}\``); break; + case 'median': selectParts.push(`quantile(0.5)(${rule.field}) AS \`${alias}\``); break; + } + } + + const { where, params } = this.whereClause(resource, filters); + + let query = `SELECT ${selectParts.join(', ')} FROM ${tableName} ${where}`; + + if (groupExpr) { + query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; + } + + const result = await this.client.query({ + query, + format: 'JSONEachRow', + query_params: params, + }); + + const rows = await result.json(); + + return rows.map((r: any) => ({ + group: r.group, + ...r, + })); + } + async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: { resource: AdminForthResource, limit: number, offset: number, sort: { field: string, direction: AdminForthSortDirections }[], filters: IAdminForthAndOrFilter, - }): Promise { + }): Promise> { const columns = resource.dataSourceColumns.map((col) => { // for decimal cast to string if (col.type == AdminForthDataTypes.DECIMAL) { diff --git a/adminforth/dataConnectors/mongo.ts b/adminforth/dataConnectors/mongo.ts index 7f26d1a7..9cc76918 100644 --- a/adminforth/dataConnectors/mongo.ts +++ b/adminforth/dataConnectors/mongo.ts @@ -1,7 +1,7 @@ import dayjs from 'dayjs'; import { MongoClient } from 'mongodb'; import { Decimal128, Double } from 'bson'; -import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource } from '../types/Back.js'; +import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import AdminForthBaseConnector from './baseConnector.js'; import { afLogger } from '../modules/logger.js'; import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js'; @@ -305,6 +305,95 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS .filter((f) => (f as IAdminForthSingleFilter).insecureRawSQL === undefined) .map((f) => this.getFilterQuery(resource, f))); } + + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: any; + groupBy?: any; + }): Promise> { + + const collection = this.client.db().collection(resource.table); + + const match = filters?.subFilters?.length ? this.getFilterQuery(resource, filters) : {}; + + let groupId: any = null; + + if (groupBy?.type === 'field') { + groupId = `$${groupBy.field}`; + } + + if (groupBy?.type === 'date_trunc') { + const tz = groupBy.timezone ?? 'UTC'; + const dateTruncSpec: any = { date: `$${groupBy.field}`, unit: groupBy.truncation, timezone: tz,}; + if (groupBy.truncation === 'week') { + dateTruncSpec.startOfWeek = 'Mon'; + } + groupId = { $dateTrunc: dateTruncSpec,}; + } + + const groupStage: any = { + _id: groupId, + }; + + for (const [alias, rule] of Object.entries(aggregations) as any) { + switch (rule.operation) { + case 'count': groupStage[alias] = { $sum: 1 }; break; + case 'sum': groupStage[alias] = { $sum: { $toDouble: `$${rule.field}` } }; break; + case 'avg': groupStage[alias] = { $avg: { $toDouble: `$${rule.field}` } }; break; + case 'min': groupStage[alias] = { $min: { $toDouble: `$${rule.field}` } }; break; + case 'max': groupStage[alias] = { $max: { $toDouble: `$${rule.field}` } }; break; + case 'median': groupStage[alias] = { $push: { $toDouble: `$${rule.field}` } }; break; + } + } + + const pipeline: any[] = []; + + if (Object.keys(match).length) { + pipeline.push({ $match: match }); + } + + pipeline.push({ $group: groupStage }); + + pipeline.push({ + $project: { + _id: 0, + group: !groupBy ? "$$REMOVE" : groupBy.type === 'date_trunc' ? { + $cond: { + if: { $eq: [{ $type: "$_id" }, "date"] }, + then: { $dateToString: { format: "%Y-%m-%d", date: "$_id", timezone: groupBy?.timezone ?? 'UTC' } }, + else: "$_id" + } + } + : "$_id", + ...Object.fromEntries( + Object.keys(groupStage) + .filter(k => k !== '_id') + .map(k => [k, `$${k}`]) + ), + }, + }); + + const calculateMedian = (arr: any[]) => { + if (!Array.isArray(arr) || arr.length === 0) return null; + const sorted = [...arr].sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 === 0 + ? (sorted[mid - 1] + sorted[mid]) / 2 + : sorted[mid]; + }; + + const result = await collection.aggregate(pipeline).toArray(); + + const medianAliases = Object.keys(aggregations).filter(alias => aggregations[alias].operation === 'median'); + + return result.map(row => { + medianAliases.forEach(alias => { + row[alias] = calculateMedian(row[alias]); + }); + return row; + }); + } async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: { diff --git a/adminforth/dataConnectors/mysql.ts b/adminforth/dataConnectors/mysql.ts index 368f1f40..63bca6e6 100644 --- a/adminforth/dataConnectors/mysql.ts +++ b/adminforth/dataConnectors/mysql.ts @@ -1,5 +1,5 @@ import dayjs from 'dayjs'; -import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig } from '../types/Back.js'; +import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js'; import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js'; import AdminForthBaseConnector from './baseConnector.js'; import mysql from 'mysql2/promise'; @@ -338,13 +338,128 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS } : { sql: '', values: [] }; } + async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: { + resource: AdminForthResource; + filters: IAdminForthAndOrFilter; + aggregations: { [alias: string]: IAggregationRule }; + groupBy?: IGroupByRule; + }): Promise> { + const tableName = resource.table; + const selectParts: string[] = []; + const medianFields: { alias: string; field: string }[] = []; + let groupExpr: string | null = null; + + if (groupBy?.type === 'field') { + groupExpr = `\`${groupBy.field}\``; + selectParts.push(`${groupExpr} AS \`group\``); + } else if (groupBy?.type === 'date_trunc') { + const g = groupBy as IGroupByDateTrunc; + const tz = g.timezone ?? 'UTC'; + if (!/^[A-Za-z0-9/_+\-]+$/.test(tz)) { + throw new Error(`Invalid timezone value: ${tz}`); + } + const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`; + switch (g.truncation) { + case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break; + case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break; + case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break; + case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break; + } + selectParts.push(`${groupExpr} AS \`group\``); + } + + for (const [alias, rule] of Object.entries(aggregations)) { + const f = `\`${rule.field}\``; + switch (rule.operation) { + case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break; + case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break; + case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break; + case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break; + case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break; + case 'median': medianFields.push({ alias, field: rule.field }); break; + } + } + + const { sql: where, values: filterValues } = this.whereClauseAndValues(filters); + + type AggRow = { group?: string } & Record; + + // Run non-median aggregations + let rows: AggRow[] = []; + const hasNonMedian = selectParts.length > (groupExpr ? 1 : 0); + if (hasNonMedian) { + let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`; + if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`; + dbLogger.trace(`🪲📜 MySQL AGG Q: ${query} values: ${JSON.stringify(filterValues)}`); + const [result] = await this.client.execute(query, filterValues); + rows = result as AggRow[]; + } + + // Run each median via window functions (MySQL 8+) — no session variables, no memory pressure + for (const { alias, field } of medianFields) { + const f = `\`${field}\``; + const nullGuard = where ? `${where} AND ${f} IS NOT NULL` : `WHERE ${f} IS NOT NULL`; + + let medianQuery: string; + if (groupExpr) { + medianQuery = ` + SELECT \`group\`, AVG(${f}) AS \`${alias}\` + FROM ( + SELECT ${groupExpr} AS \`group\`, ${f}, + ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn, + COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt + FROM \`${tableName}\` ${nullGuard} + ) t + WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0)) + GROUP BY \`group\` + ORDER BY \`group\` ASC + `; + } else { + medianQuery = ` + SELECT AVG(${f}) AS \`${alias}\` + FROM ( + SELECT ${f}, + ROW_NUMBER() OVER (ORDER BY ${f}) AS rn, + COUNT(*) OVER () AS cnt + FROM \`${tableName}\` ${nullGuard} + ) t + WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0)) + `; + } + + dbLogger.trace(`🪲📜 MySQL MEDIAN Q: ${medianQuery} values: ${JSON.stringify(filterValues)}`); + const [medianResult] = await this.client.execute(medianQuery, filterValues); + const medianRows = medianResult as AggRow[]; + + if (groupExpr) { + if (rows.length === 0) { + rows = medianRows.map((r) => ({ group: r.group, [alias]: r[alias] })); + } else { + const byGroup = new Map(medianRows.map((r) => [String(r.group), r[alias]])); + for (const row of rows) { + row[alias] = byGroup.get(String(row.group)) ?? null; + } + } + } else { + const medianVal = medianRows[0]?.[alias] ?? null; + if (rows.length === 0) { + rows = [{ [alias]: medianVal }]; + } else { + rows[0][alias] = medianVal; + } + } + } + + return rows; + } + async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise { - const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', '); + const columns = resource.dataSourceColumns.map((col: { name: string }) => `${col.name}`).join(', '); const tableName = resource.table; const { sql: where, values: filterValues } = this.whereClauseAndValues(filters); - const orderBy = sort.length ? `ORDER BY ${sort.map((s) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : ''; + const orderBy = sort.length ? `ORDER BY ${sort.map((s: { field: string; direction: AdminForthSortDirections }) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : ''; let selectQuery = `SELECT ${columns} FROM ${tableName}`; if (where) selectQuery += ` ${where}`; if (orderBy) selectQuery += ` ${orderBy}`; @@ -385,7 +500,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS async getMinMaxForColumnsWithOriginalTypes({ resource, columns }) { const tableName = resource.table; const result = {}; - await Promise.all(columns.map(async (col) => { + await Promise.all(columns.map(async (col: { name: string }) => { const q = `SELECT MIN(${col.name}) as min, MAX(${col.name}) as max FROM ${tableName}`; dbLogger.trace(`🪲📜 MySQL Q: ${q}`); const [results] = await this.client.execute(q); @@ -410,7 +525,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS async updateRecordOriginalValues({ resource, recordId, newValues }) { const values = [...Object.values(newValues), recordId]; - const columnsWithPlaceholders = Object.keys(newValues).map((col, i) => `${col} = ?`).join(', '); + const columnsWithPlaceholders = Object.keys(newValues).map((col) => `${col} = ?`).join(', '); const q = `UPDATE ${resource.table} SET ${columnsWithPlaceholders} WHERE ${this.getPrimaryKey(resource)} = ?`; dbLogger.trace(`🪲📜 MySQL Q: ${q} values: ${JSON.stringify(values)}`); await this.client.execute(q, values);