Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,17 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
}
}

// Always set a unique id per row. Grafana's virtualized log panel uses
// LogRowModel.uid (derived from the "id" field) as a cache key for
// row height measurements. Without unique ids, rows sharing the same
// cache key cause an infinite resetAfterIndex loop. The source index
// may have an "id" field with non-unique values, so always overwrite.
doc["id"] = fmt.Sprintf("%d", hitIdx)

docs[hitIdx] = doc
}

propNames["id"] = true
sortedPropNames := sortPropNames(propNames, configuredFields, true)
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)

Expand Down Expand Up @@ -1074,18 +1082,25 @@ func flatten(target map[string]interface{}) map[string]interface{} {
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string {
hasTimeField := false
hasLogMessageField := false

var sortedPropNames []string
for k := range propNames {
if configuredFields.TimeField != "" && k == configuredFields.TimeField {
hasTimeField = true
} else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField {
hasLogMessageField = true
} else {
sortedPropNames = append(sortedPropNames, k)
}
}

sort.Strings(sortedPropNames)

if hasLogMessageField {
sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...)
}

if hasTimeField {
sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...)
}
Expand All @@ -1100,6 +1115,9 @@ func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propNa
return doc[propName]
}
}
if len(docs) == 0 {
return nil
}
return docs[0][propName]
}

Expand Down
41 changes: 5 additions & 36 deletions src/datasource/processResponse.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DataFrame, DataLink, DataQueryRequest, DataQueryResponse, Field, FieldType } from "@grafana/data";
import { DataFrame, DataLink, DataQueryRequest, DataQueryResponse } from "@grafana/data";
import { getDataSourceSrv } from "@grafana/runtime";
import { BaseQuickwitDataSource } from './base';
import { DataLinkConfig, ElasticsearchQuery } from "../types";
Expand All @@ -16,48 +16,16 @@ export function getQueryResponseProcessor(datasource: BaseQuickwitDataSource, re
}
};
}
function getCustomFieldName(fieldname: string) { return `$qw_${fieldname}`; }

export function processLogsDataFrame(datasource: BaseQuickwitDataSource, dataFrame: DataFrame) {
// Ignore log volume dataframe, no need to add links or a displayed message field.
// Ignore log volume dataframe, no need to add links.
if (!dataFrame.refId || dataFrame.refId.startsWith('log-volume')) {
return;
}
// Skip empty dataframes
if (dataFrame.length===0 || dataFrame.fields.length === 0) {
return;
}
if (datasource.logMessageField) {
const messageFields = datasource.logMessageField.split(',');
let field_idx_list = [];
for (const messageField of messageFields) {
const field_idx = dataFrame.fields.findIndex((field) => field.name === messageField);
if (field_idx !== -1) {
field_idx_list.push(field_idx);
}
}
const displayedMessages = Array(dataFrame.length);
for (let idx = 0; idx < dataFrame.length; idx++) {
let displayedMessage = "";
// If we have only one field, we assume the field name is obvious for the user and we don't need to show it.
if (field_idx_list.length === 1) {
displayedMessage = `${dataFrame.fields[field_idx_list[0]].values[idx]}`;
} else {
for (const field_idx of field_idx_list) {
displayedMessage += ` ${dataFrame.fields[field_idx].name}=${dataFrame.fields[field_idx].values[idx]}`;
}
}
displayedMessages[idx] = displayedMessage.trim();
}

const newField: Field = {
name: getCustomFieldName('message'),
type: FieldType.string,
config: {},
values: displayedMessages,
};
const [timestamp, ...rest] = dataFrame.fields;
dataFrame.fields = [timestamp, newField, ...rest];
}

if (!datasource.dataLinks.length) {
return;
Expand All @@ -71,9 +39,10 @@ export function processLogsDataFrame(datasource: BaseQuickwitDataSource, dataFra
}

field.config = field.config || {};
field.config.links = [...(field.config.links || [], linksToApply.map(generateDataLink))];
field.config.links = [...(field.config.links || []), ...linksToApply.map(generateDataLink)];
}
}

function generateDataLink(linkConfig: DataLinkConfig): DataLink {
const dataSourceSrv = getDataSourceSrv();

Expand Down
211 changes: 15 additions & 196 deletions src/datasource/supplementaryQueries.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
import {
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceJsonData,
DataSourceWithSupplementaryQueriesSupport,
FieldColorModeId,
FieldType,
LoadingState,
LogLevel,
LogsVolumeCustomMetaData,
LogsVolumeType,
SupplementaryQueryType,
} from '@grafana/data';
import { BarAlignment, DataQuery, GraphDrawStyle, StackingMode } from "@grafana/schema";
import { colors } from "@grafana/ui";
import { getIntervalInfo } from '@/utils/time';
import { cloneDeep, groupBy } from "lodash";
import { Observable, isObservable, from } from 'rxjs';
import { cloneDeep } from "lodash";
import { BucketAggregation, ElasticsearchQuery } from '@/types';
import { BaseQuickwitDataSourceConstructor } from './base';

export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';

export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstructor> ( Base: T ){
return class DSWithSupplementaryQueries extends Base implements DataSourceWithSupplementaryQueriesSupport<ElasticsearchQuery> {

/**
* Returns an observable that will be used to fetch supplementary data based on the provided
* supplementary query type and original request.
* Returns a DataQueryRequest for the supplementary query type.
* Grafana's Explore layer handles the Observable lifecycle.
*/
getDataProvider(
getSupplementaryRequest(
type: SupplementaryQueryType,
request: DataQueryRequest<ElasticsearchQuery>
): Observable<DataQueryResponse> | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(type)) {
return undefined;
}
): DataQueryRequest<ElasticsearchQuery> | undefined {
switch (type) {
case SupplementaryQueryType.LogsVolume:
return this.getLogsVolumeDataProvider(request);
return this.getLogsVolumeRequest(request);
default:
return undefined;
}
Expand All @@ -55,18 +39,15 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
* Returns a supplementary query to be used to fetch supplementary data based on the provided type and original query.
* If provided query is not suitable for provided supplementary query type, undefined should be returned.
*/
// FIXME: options should be of type SupplementaryQueryOptions but this type is not public.
getSupplementaryQuery(options: any, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
getSupplementaryQuery(options: { type: SupplementaryQueryType }, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(options.type)) {
return undefined;
}

let isQuerySuitable = false;

switch (options.type) {
case SupplementaryQueryType.LogsVolume:
case SupplementaryQueryType.LogsVolume: {
// it has to be a logs-producing range-query
isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
const isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
if (!isQuerySuitable) {
return undefined;
}
Expand Down Expand Up @@ -103,13 +84,16 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
metrics: [{ type: 'count', id: '1' }],
bucketAggs,
};
}

default:
return undefined;
}
}

getLogsVolumeDataProvider(request: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> | undefined {
private getLogsVolumeRequest(
request: DataQueryRequest<ElasticsearchQuery>
): DataQueryRequest<ElasticsearchQuery> | undefined {
const logsVolumeRequest = cloneDeep(request);
const targets = logsVolumeRequest.targets
.map((target) => this.getSupplementaryQuery({ type: SupplementaryQueryType.LogsVolume }, target))
Expand All @@ -119,172 +103,7 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
return undefined;
}

return queryLogsVolume(
this,
{ ...logsVolumeRequest, targets },
{
range: request.range,
targets: request.targets,
extractLevel: (dataFrame: any) => getLogLevelFromKey(dataFrame || ''),
}
);
}
};
}

// Copy/pasted from grafana/data as it is deprecated there.
function getLogLevelFromKey(dataframe: DataFrame): LogLevel {
const name = dataframe.fields[1].config.displayNameFromDS || ``;
const level = (LogLevel as any)[name.toString().toLowerCase()];
if (level) {
return level;
}
return LogLevel.unknown;
}

/**
* Creates an observable, which makes requests to get logs volume and aggregates results.
*/

export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataSourceJsonData>(
datasource: DataSourceApi<TQuery, TOptions>,
logsVolumeRequest: DataQueryRequest<TQuery>,
options: any
): Observable<DataQueryResponse> {
const timespan = options.range.to.valueOf() - options.range.from.valueOf();
const intervalInfo = getIntervalInfo(timespan, 400);

logsVolumeRequest.interval = intervalInfo.interval;
logsVolumeRequest.scopedVars.__interval = { value: intervalInfo.interval, text: intervalInfo.interval };

if (intervalInfo.intervalMs !== undefined) {
logsVolumeRequest.intervalMs = intervalInfo.intervalMs;
logsVolumeRequest.scopedVars.__interval_ms = { value: intervalInfo.intervalMs, text: intervalInfo.intervalMs };
return { ...logsVolumeRequest, targets };
}

logsVolumeRequest.hideFromInspector = true;

return new Observable((observer) => {
let logsVolumeData: DataFrame[] = [];
observer.next({
state: LoadingState.Loading,
error: undefined,
data: [],
});

const queryResponse = datasource.query(logsVolumeRequest);
const queryObservable = isObservable(queryResponse) ? queryResponse : from(queryResponse);

const subscription = queryObservable.subscribe({
complete: () => {
observer.complete();
},
next: (dataQueryResponse: DataQueryResponse) => {
const { error } = dataQueryResponse;
if (error !== undefined) {
observer.next({
state: LoadingState.Error,
error,
data: [],
});
observer.error(error);
} else {
const framesByRefId = groupBy(dataQueryResponse.data, 'refId');
logsVolumeData = dataQueryResponse.data.map((dataFrame) => {
let sourceRefId = dataFrame.refId || '';
if (sourceRefId.startsWith('log-volume-')) {
sourceRefId = sourceRefId.substr('log-volume-'.length);
}

const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
datasourceName: datasource.name,
sourceQuery: options.targets.find((dataQuery: any) => dataQuery.refId === sourceRefId)!,
};

dataFrame.meta = {
...dataFrame.meta,
custom: {
...dataFrame.meta?.custom,
...logsVolumeCustomMetaData,
},
};
return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1);
});

observer.next({
state: dataQueryResponse.state,
error: undefined,
data: logsVolumeData,
});
}
},
error: (error: any) => {
observer.next({
state: LoadingState.Error,
error: error,
data: [],
});
observer.error(error);
},
});
return () => {
subscription?.unsubscribe();
};
});
}
const updateLogsVolumeConfig = (
dataFrame: DataFrame,
extractLevel: (dataFrame: DataFrame) => LogLevel,
oneLevelDetected: boolean
): DataFrame => {
dataFrame.fields = dataFrame.fields.map((field) => {
if (field.type === FieldType.number) {
field.config = {
...field.config,
...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected),
};
}
return field;
});
return dataFrame;
};
const LogLevelColor = {
[LogLevel.critical]: colors[7],
[LogLevel.warning]: colors[1],
[LogLevel.error]: colors[4],
[LogLevel.info]: colors[0],
[LogLevel.debug]: colors[5],
[LogLevel.trace]: colors[2],
[LogLevel.unknown]: '#8e8e8e' // or '#bdc4cd',
};
/**
* Returns field configuration used to render logs volume bars
*/
function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
const name = oneLevelDetected && level === LogLevel.unknown ? 'logs' : level;
const color = LogLevelColor[level];
return {
displayNameFromDS: name,
color: {
mode: FieldColorModeId.Fixed,
fixedColor: color,
},
custom: {
drawStyle: GraphDrawStyle.Bars,
barAlignment: BarAlignment.Center,
lineColor: color,
pointColor: color,
fillColor: color,
lineWidth: 1,
fillOpacity: 100,
stacking: {
mode: StackingMode.Normal,
group: 'A',
},
},
};
}