Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,27 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
}
}

// Always set a unique id per row. Grafana's virtualized log panel uses
// LogRowModel.uid (derived from the "id" field) as a cache key for
// row height measurements. Without unique ids, rows sharing the same
// cache key cause an infinite resetAfterIndex loop. Prefer the hit's
// own _index/_id when present (matches built-in ES datasource), fall
// back to the row index otherwise.
hitIndex, _ := hit["_index"].(string)
hitID, _ := hit["_id"].(string)
switch {
case hitIndex != "" && hitID != "":
doc["id"] = hitIndex + "#" + hitID
case hitID != "":
doc["id"] = hitID
default:
doc["id"] = strconv.Itoa(hitIdx)
}

docs[hitIdx] = doc
}

propNames["id"] = true
sortedPropNames := sortPropNames(propNames, configuredFields, true)
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)

Expand Down Expand Up @@ -1086,18 +1104,25 @@ func flatten(target map[string]interface{}) map[string]interface{} {
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string {
hasTimeField := false
hasLogMessageField := false

var sortedPropNames []string
for k := range propNames {
if configuredFields.TimeField != "" && k == configuredFields.TimeField {
hasTimeField = true
} else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField {
hasLogMessageField = true
} else {
sortedPropNames = append(sortedPropNames, k)
}
}

sort.Strings(sortedPropNames)

if hasLogMessageField {
sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...)
}

if hasTimeField {
sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...)
}
Expand All @@ -1112,6 +1137,9 @@ func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propNa
return doc[propName]
}
}
if len(docs) == 0 {
return nil
}
return docs[0][propName]
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/quickwit/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,22 +253,31 @@ func TestProcessLogsResponse(t *testing.T) {
require.Len(t, dataframes, 1)
frame := dataframes[0]

require.Equal(t, 11, len(frame.Fields))
require.Equal(t, 12, len(frame.Fields))
// Fields have the correct length
require.Equal(t, 2, frame.Fields[0].Len())

fieldMap := make(map[string]*data.Field)
for _, f := range frame.Fields {
fieldMap[f.Name] = f
}

// First field is timeField
require.Equal(t, "@timestamp", frame.Fields[0].Name)
require.Equal(t, data.FieldTypeNullableTime, frame.Fields[0].Type())
// Second field is logMessageField
require.Equal(t, "line", frame.Fields[1].Name)
require.Equal(t, data.FieldTypeNullableString, frame.Fields[1].Type())
require.Equal(t, "line", frame.Fields[4].Name)
// Correctly uses string types
require.Equal(t, data.FieldTypeNullableString, frame.Fields[1].Type())
// Synthetic id field added for log row uid stability
require.Contains(t, fieldMap, "id")
require.Equal(t, data.FieldTypeNullableString, fieldMap["id"].Type())
// Correctly detects float64 types
require.Equal(t, data.FieldTypeNullableFloat64, frame.Fields[2].Type())
require.Equal(t, data.FieldTypeNullableFloat64, fieldMap["float"].Type())
// Correctly flattens fields
require.Equal(t, "nested.field.double_nested", frame.Fields[7].Name)
require.Equal(t, data.FieldTypeNullableString, frame.Fields[7].Type())
require.Contains(t, fieldMap, "nested.field.double_nested")
require.Equal(t, data.FieldTypeNullableString, fieldMap["nested.field.double_nested"].Type())
// Correctly detects type even if first value is null
require.Equal(t, data.FieldTypeNullableJSON, frame.Fields[9].Type())
require.Equal(t, data.FieldTypeNullableJSON, fieldMap["shapes"].Type())
})
}

Expand Down
211 changes: 15 additions & 196 deletions src/datasource/supplementaryQueries.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
import {
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceJsonData,
DataSourceWithSupplementaryQueriesSupport,
FieldColorModeId,
FieldType,
LoadingState,
LogLevel,
LogsVolumeCustomMetaData,
LogsVolumeType,
SupplementaryQueryType,
} from '@grafana/data';
import { BarAlignment, DataQuery, GraphDrawStyle, StackingMode } from "@grafana/schema";
import { colors } from "@grafana/ui";
import { getIntervalInfo } from '@/utils/time';
import { cloneDeep, groupBy } from "lodash";
import { Observable, isObservable, from } from 'rxjs';
import { cloneDeep } from "lodash";
import { BucketAggregation, ElasticsearchQuery } from '@/types';
import { BaseQuickwitDataSourceConstructor } from './base';

export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';

export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstructor> ( Base: T ){
return class DSWithSupplementaryQueries extends Base implements DataSourceWithSupplementaryQueriesSupport<ElasticsearchQuery> {

/**
* Returns an observable that will be used to fetch supplementary data based on the provided
* supplementary query type and original request.
* Returns a DataQueryRequest for the supplementary query type.
* Grafana's Explore layer handles the Observable lifecycle.
*/
getDataProvider(
getSupplementaryRequest(
type: SupplementaryQueryType,
request: DataQueryRequest<ElasticsearchQuery>
): Observable<DataQueryResponse> | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(type)) {
return undefined;
}
): DataQueryRequest<ElasticsearchQuery> | undefined {
switch (type) {
case SupplementaryQueryType.LogsVolume:
return this.getLogsVolumeDataProvider(request);
return this.getLogsVolumeRequest(request);
default:
return undefined;
}
Expand All @@ -55,18 +39,15 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
* Returns a supplementary query to be used to fetch supplementary data based on the provided type and original query.
* If provided query is not suitable for provided supplementary query type, undefined should be returned.
*/
// FIXME: options should be of type SupplementaryQueryOptions but this type is not public.
getSupplementaryQuery(options: any, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
getSupplementaryQuery(options: { type: SupplementaryQueryType }, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(options.type)) {
return undefined;
}

let isQuerySuitable = false;

switch (options.type) {
case SupplementaryQueryType.LogsVolume:
case SupplementaryQueryType.LogsVolume: {
// it has to be a logs-producing range-query
isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
const isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
if (!isQuerySuitable) {
return undefined;
}
Expand Down Expand Up @@ -104,13 +85,16 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
bucketAggs,
filters: query.filters,
};
}

default:
return undefined;
}
}

getLogsVolumeDataProvider(request: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> | undefined {
private getLogsVolumeRequest(
request: DataQueryRequest<ElasticsearchQuery>
): DataQueryRequest<ElasticsearchQuery> | undefined {
const logsVolumeRequest = cloneDeep(request);
const targets = logsVolumeRequest.targets
.map((target) => this.getSupplementaryQuery({ type: SupplementaryQueryType.LogsVolume }, target))
Expand All @@ -120,172 +104,7 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
return undefined;
}

return queryLogsVolume(
this,
{ ...logsVolumeRequest, targets },
{
range: request.range,
targets: request.targets,
extractLevel: (dataFrame: any) => getLogLevelFromKey(dataFrame || ''),
}
);
}
};
}

// Copy/pasted from grafana/data as it is deprecated there.
function getLogLevelFromKey(dataframe: DataFrame): LogLevel {
const name = dataframe.fields[1].config.displayNameFromDS || ``;
const level = (LogLevel as any)[name.toString().toLowerCase()];
if (level) {
return level;
}
return LogLevel.unknown;
}

/**
* Creates an observable, which makes requests to get logs volume and aggregates results.
*/

export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataSourceJsonData>(
datasource: DataSourceApi<TQuery, TOptions>,
logsVolumeRequest: DataQueryRequest<TQuery>,
options: any
): Observable<DataQueryResponse> {
const timespan = options.range.to.valueOf() - options.range.from.valueOf();
const intervalInfo = getIntervalInfo(timespan, 400);

logsVolumeRequest.interval = intervalInfo.interval;
logsVolumeRequest.scopedVars.__interval = { value: intervalInfo.interval, text: intervalInfo.interval };

if (intervalInfo.intervalMs !== undefined) {
logsVolumeRequest.intervalMs = intervalInfo.intervalMs;
logsVolumeRequest.scopedVars.__interval_ms = { value: intervalInfo.intervalMs, text: intervalInfo.intervalMs };
return { ...logsVolumeRequest, targets };
}

logsVolumeRequest.hideFromInspector = true;

return new Observable((observer) => {
let logsVolumeData: DataFrame[] = [];
observer.next({
state: LoadingState.Loading,
error: undefined,
data: [],
});

const queryResponse = datasource.query(logsVolumeRequest);
const queryObservable = isObservable(queryResponse) ? queryResponse : from(queryResponse);

const subscription = queryObservable.subscribe({
complete: () => {
observer.complete();
},
next: (dataQueryResponse: DataQueryResponse) => {
const { error } = dataQueryResponse;
if (error !== undefined) {
observer.next({
state: LoadingState.Error,
error,
data: [],
});
observer.error(error);
} else {
const framesByRefId = groupBy(dataQueryResponse.data, 'refId');
logsVolumeData = dataQueryResponse.data.map((dataFrame) => {
let sourceRefId = dataFrame.refId || '';
if (sourceRefId.startsWith('log-volume-')) {
sourceRefId = sourceRefId.substr('log-volume-'.length);
}

const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
datasourceName: datasource.name,
sourceQuery: options.targets.find((dataQuery: any) => dataQuery.refId === sourceRefId)!,
};

dataFrame.meta = {
...dataFrame.meta,
custom: {
...dataFrame.meta?.custom,
...logsVolumeCustomMetaData,
},
};
return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1);
});

observer.next({
state: dataQueryResponse.state,
error: undefined,
data: logsVolumeData,
});
}
},
error: (error: any) => {
observer.next({
state: LoadingState.Error,
error: error,
data: [],
});
observer.error(error);
},
});
return () => {
subscription?.unsubscribe();
};
});
}
const updateLogsVolumeConfig = (
dataFrame: DataFrame,
extractLevel: (dataFrame: DataFrame) => LogLevel,
oneLevelDetected: boolean
): DataFrame => {
dataFrame.fields = dataFrame.fields.map((field) => {
if (field.type === FieldType.number) {
field.config = {
...field.config,
...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected),
};
}
return field;
});
return dataFrame;
};
const LogLevelColor = {
[LogLevel.critical]: colors[7],
[LogLevel.warning]: colors[1],
[LogLevel.error]: colors[4],
[LogLevel.info]: colors[0],
[LogLevel.debug]: colors[5],
[LogLevel.trace]: colors[2],
[LogLevel.unknown]: '#8e8e8e' // or '#bdc4cd',
};
/**
* Returns field configuration used to render logs volume bars
*/
function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
const name = oneLevelDetected && level === LogLevel.unknown ? 'logs' : level;
const color = LogLevelColor[level];
return {
displayNameFromDS: name,
color: {
mode: FieldColorModeId.Fixed,
fixedColor: color,
},
custom: {
drawStyle: GraphDrawStyle.Bars,
barAlignment: BarAlignment.Center,
lineColor: color,
pointColor: color,
fillColor: color,
lineWidth: 1,
fillOpacity: 100,
stacking: {
mode: StackingMode.Normal,
group: 'A',
},
},
};
}


Loading