diff --git a/pkg/quickwit/response_parser.go b/pkg/quickwit/response_parser.go index 4da9ddc..57d9bbe 100644 --- a/pkg/quickwit/response_parser.go +++ b/pkg/quickwit/response_parser.go @@ -160,9 +160,27 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields } } + // Always set a unique id per row. Grafana's virtualized log panel uses + // LogRowModel.uid (derived from the "id" field) as a cache key for + // row height measurements. Without unique ids, rows sharing the same + // cache key cause an infinite resetAfterIndex loop. Prefer the hit's + // own _index/_id when present (matches built-in ES datasource), fall + // back to the row index otherwise. + hitIndex, _ := hit["_index"].(string) + hitID, _ := hit["_id"].(string) + switch { + case hitIndex != "" && hitID != "": + doc["id"] = hitIndex + "#" + hitID + case hitID != "": + doc["id"] = hitID + default: + doc["id"] = strconv.Itoa(hitIdx) + } + docs[hitIdx] = doc } + propNames["id"] = true sortedPropNames := sortPropNames(propNames, configuredFields, true) fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields) @@ -1086,11 +1104,14 @@ func flatten(target map[string]interface{}) map[string]interface{} { // if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string { hasTimeField := false + hasLogMessageField := false var sortedPropNames []string for k := range propNames { if configuredFields.TimeField != "" && k == configuredFields.TimeField { hasTimeField = true + } else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField { + hasLogMessageField = true } else { sortedPropNames = append(sortedPropNames, k) } @@ -1098,6 +1119,10 @@ func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFiel sort.Strings(sortedPropNames) + if hasLogMessageField { + sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...) + } + if hasTimeField { sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...) } @@ -1112,6 +1137,9 @@ func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propNa return doc[propName] } } + if len(docs) == 0 { + return nil + } return docs[0][propName] } diff --git a/pkg/quickwit/response_parser_test.go b/pkg/quickwit/response_parser_test.go index fe9e1c6..e1be196 100644 --- a/pkg/quickwit/response_parser_test.go +++ b/pkg/quickwit/response_parser_test.go @@ -253,22 +253,31 @@ func TestProcessLogsResponse(t *testing.T) { require.Len(t, dataframes, 1) frame := dataframes[0] - require.Equal(t, 11, len(frame.Fields)) + require.Equal(t, 12, len(frame.Fields)) // Fields have the correct length require.Equal(t, 2, frame.Fields[0].Len()) + + fieldMap := make(map[string]*data.Field) + for _, f := range frame.Fields { + fieldMap[f.Name] = f + } + // First field is timeField + require.Equal(t, "@timestamp", frame.Fields[0].Name) require.Equal(t, data.FieldTypeNullableTime, frame.Fields[0].Type()) + // Second field is logMessageField + require.Equal(t, "line", frame.Fields[1].Name) require.Equal(t, data.FieldTypeNullableString, frame.Fields[1].Type()) - require.Equal(t, "line", frame.Fields[4].Name) - // Correctly uses string types - require.Equal(t, data.FieldTypeNullableString, frame.Fields[1].Type()) + // Synthetic id field added for log row uid stability + require.Contains(t, fieldMap, "id") + require.Equal(t, data.FieldTypeNullableString, fieldMap["id"].Type()) // Correctly detects float64 types - require.Equal(t, data.FieldTypeNullableFloat64, frame.Fields[2].Type()) + require.Equal(t, data.FieldTypeNullableFloat64, fieldMap["float"].Type()) // Correctly flattens fields - require.Equal(t, "nested.field.double_nested", frame.Fields[7].Name) - require.Equal(t, data.FieldTypeNullableString, frame.Fields[7].Type()) + require.Contains(t, fieldMap, "nested.field.double_nested") + require.Equal(t, data.FieldTypeNullableString, fieldMap["nested.field.double_nested"].Type()) // Correctly detects type even if first value is null - require.Equal(t, data.FieldTypeNullableJSON, frame.Fields[9].Type()) + require.Equal(t, data.FieldTypeNullableJSON, fieldMap["shapes"].Type()) }) } diff --git a/src/datasource/supplementaryQueries.ts b/src/datasource/supplementaryQueries.ts index bf4f63d..43e0866 100644 --- a/src/datasource/supplementaryQueries.ts +++ b/src/datasource/supplementaryQueries.ts @@ -1,23 +1,9 @@ import { - DataFrame, DataQueryRequest, - DataQueryResponse, - DataSourceApi, - DataSourceJsonData, DataSourceWithSupplementaryQueriesSupport, - FieldColorModeId, - FieldType, - LoadingState, - LogLevel, - LogsVolumeCustomMetaData, - LogsVolumeType, SupplementaryQueryType, } from '@grafana/data'; -import { BarAlignment, DataQuery, GraphDrawStyle, StackingMode } from "@grafana/schema"; -import { colors } from "@grafana/ui"; -import { getIntervalInfo } from '@/utils/time'; -import { cloneDeep, groupBy } from "lodash"; -import { Observable, isObservable, from } from 'rxjs'; +import { cloneDeep } from "lodash"; import { BucketAggregation, ElasticsearchQuery } from '@/types'; import { BaseQuickwitDataSourceConstructor } from './base'; @@ -25,20 +11,18 @@ export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-'; export function withSupplementaryQueries ( Base: T ){ return class DSWithSupplementaryQueries extends Base implements DataSourceWithSupplementaryQueriesSupport { + /** - * Returns an observable that will be used to fetch supplementary data based on the provided - * supplementary query type and original request. + * Returns a DataQueryRequest for the supplementary query type. + * Grafana's Explore layer handles the Observable lifecycle. */ - getDataProvider( + getSupplementaryRequest( type: SupplementaryQueryType, request: DataQueryRequest - ): Observable | undefined { - if (!this.getSupportedSupplementaryQueryTypes().includes(type)) { - return undefined; - } + ): DataQueryRequest | undefined { switch (type) { case SupplementaryQueryType.LogsVolume: - return this.getLogsVolumeDataProvider(request); + return this.getLogsVolumeRequest(request); default: return undefined; } @@ -55,18 +39,15 @@ export function withSupplementaryQueries): Observable | undefined { + private getLogsVolumeRequest( + request: DataQueryRequest + ): DataQueryRequest | undefined { const logsVolumeRequest = cloneDeep(request); const targets = logsVolumeRequest.targets .map((target) => this.getSupplementaryQuery({ type: SupplementaryQueryType.LogsVolume }, target)) @@ -120,172 +104,7 @@ export function withSupplementaryQueries getLogLevelFromKey(dataFrame || ''), - } - ); - } - }; -} - -// Copy/pasted from grafana/data as it is deprecated there. -function getLogLevelFromKey(dataframe: DataFrame): LogLevel { - const name = dataframe.fields[1].config.displayNameFromDS || ``; - const level = (LogLevel as any)[name.toString().toLowerCase()]; - if (level) { - return level; - } - return LogLevel.unknown; -} - -/** - * Creates an observable, which makes requests to get logs volume and aggregates results. - */ - -export function queryLogsVolume( - datasource: DataSourceApi, - logsVolumeRequest: DataQueryRequest, - options: any -): Observable { - const timespan = options.range.to.valueOf() - options.range.from.valueOf(); - const intervalInfo = getIntervalInfo(timespan, 400); - - logsVolumeRequest.interval = intervalInfo.interval; - logsVolumeRequest.scopedVars.__interval = { value: intervalInfo.interval, text: intervalInfo.interval }; - - if (intervalInfo.intervalMs !== undefined) { - logsVolumeRequest.intervalMs = intervalInfo.intervalMs; - logsVolumeRequest.scopedVars.__interval_ms = { value: intervalInfo.intervalMs, text: intervalInfo.intervalMs }; + return { ...logsVolumeRequest, targets }; } - - logsVolumeRequest.hideFromInspector = true; - - return new Observable((observer) => { - let logsVolumeData: DataFrame[] = []; - observer.next({ - state: LoadingState.Loading, - error: undefined, - data: [], - }); - - const queryResponse = datasource.query(logsVolumeRequest); - const queryObservable = isObservable(queryResponse) ? queryResponse : from(queryResponse); - - const subscription = queryObservable.subscribe({ - complete: () => { - observer.complete(); - }, - next: (dataQueryResponse: DataQueryResponse) => { - const { error } = dataQueryResponse; - if (error !== undefined) { - observer.next({ - state: LoadingState.Error, - error, - data: [], - }); - observer.error(error); - } else { - const framesByRefId = groupBy(dataQueryResponse.data, 'refId'); - logsVolumeData = dataQueryResponse.data.map((dataFrame) => { - let sourceRefId = dataFrame.refId || ''; - if (sourceRefId.startsWith('log-volume-')) { - sourceRefId = sourceRefId.substr('log-volume-'.length); - } - - const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = { - logsVolumeType: LogsVolumeType.FullRange, - absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() }, - datasourceName: datasource.name, - sourceQuery: options.targets.find((dataQuery: any) => dataQuery.refId === sourceRefId)!, - }; - - dataFrame.meta = { - ...dataFrame.meta, - custom: { - ...dataFrame.meta?.custom, - ...logsVolumeCustomMetaData, - }, - }; - return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1); - }); - - observer.next({ - state: dataQueryResponse.state, - error: undefined, - data: logsVolumeData, - }); - } - }, - error: (error: any) => { - observer.next({ - state: LoadingState.Error, - error: error, - data: [], - }); - observer.error(error); - }, - }); - return () => { - subscription?.unsubscribe(); - }; - }); -} -const updateLogsVolumeConfig = ( - dataFrame: DataFrame, - extractLevel: (dataFrame: DataFrame) => LogLevel, - oneLevelDetected: boolean -): DataFrame => { - dataFrame.fields = dataFrame.fields.map((field) => { - if (field.type === FieldType.number) { - field.config = { - ...field.config, - ...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected), - }; - } - return field; - }); - return dataFrame; -}; -const LogLevelColor = { - [LogLevel.critical]: colors[7], - [LogLevel.warning]: colors[1], - [LogLevel.error]: colors[4], - [LogLevel.info]: colors[0], - [LogLevel.debug]: colors[5], - [LogLevel.trace]: colors[2], - [LogLevel.unknown]: '#8e8e8e' // or '#bdc4cd', -}; -/** - * Returns field configuration used to render logs volume bars - */ -function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) { - const name = oneLevelDetected && level === LogLevel.unknown ? 'logs' : level; - const color = LogLevelColor[level]; - return { - displayNameFromDS: name, - color: { - mode: FieldColorModeId.Fixed, - fixedColor: color, - }, - custom: { - drawStyle: GraphDrawStyle.Bars, - barAlignment: BarAlignment.Center, - lineColor: color, - pointColor: color, - fillColor: color, - lineWidth: 1, - fillOpacity: 100, - stacking: { - mode: StackingMode.Normal, - group: 'A', - }, - }, }; } - -