From 5e948a8377c4148c007d85b7afb005d4c9850d52 Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Mon, 1 Jun 2026 15:40:52 +0800 Subject: [PATCH 1/8] feat: add loki log mcp tools --- pkg/config/observability/config.go | 7 + pkg/config/observability/logs.go | 89 ++++++++ pkg/console/component.go | 14 +- pkg/mcp/tools/log/analyzer.go | 89 ++++++++ pkg/mcp/tools/log/loki.go | 318 +++++++++++++++++++++++++++++ pkg/mcp/tools/log/model.go | 68 ++++++ pkg/mcp/tools/log/tools.go | 160 +++++++++++++++ pkg/mcp/tools/log/tools_test.go | 191 +++++++++++++++++ pkg/mcp/tools/resource_search.go | 11 +- pkg/mcp/tools/service_discovery.go | 3 +- 10 files changed, 936 insertions(+), 14 deletions(-) create mode 100644 pkg/config/observability/logs.go create mode 100644 pkg/mcp/tools/log/analyzer.go create mode 100644 pkg/mcp/tools/log/loki.go create mode 100644 pkg/mcp/tools/log/model.go create mode 100644 pkg/mcp/tools/log/tools.go create mode 100644 pkg/mcp/tools/log/tools_test.go diff --git a/pkg/config/observability/config.go b/pkg/config/observability/config.go index 879e22998..920e8a31c 100644 --- a/pkg/config/observability/config.go +++ b/pkg/config/observability/config.go @@ -33,6 +33,8 @@ type Config struct { Grafana string `json:"grafana" yaml:"grafana"` // Prometheus is the url of prometheus Prometheus string `json:"prometheus" yaml:"prometheus"` + // Logs configures the log query provider. + Logs *LogsConfig `json:"logs,omitempty" yaml:"logs,omitempty"` GrafanaBaseURL *url.URL `json:"-" yaml:"-"` PrometheusBaseURL *url.URL `json:"-" yaml:"-"` @@ -55,6 +57,11 @@ func (c *Config) Validate() error { } c.GrafanaBaseURL = grafanaBaseURL } + if c.Logs != nil { + if err := c.Logs.Validate(); err != nil { + return err + } + } return nil } diff --git a/pkg/config/observability/logs.go b/pkg/config/observability/logs.go new file mode 100644 index 000000000..0715efdda --- /dev/null +++ b/pkg/config/observability/logs.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observability + +import ( + "fmt" + "net/url" + + "github.com/duke-git/lancet/v2/strutil" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" +) + +type LogProviderType string + +const ( + LogProviderLoki LogProviderType = "loki" +) + +type LogsConfig struct { + DefaultProvider string `json:"defaultProvider" yaml:"defaultProvider"` + Providers []LogProviderConfig `json:"providers" yaml:"providers"` +} + +type LogProviderConfig struct { + Name string `json:"name" yaml:"name"` + Type LogProviderType `json:"type" yaml:"type"` + Endpoint string `json:"endpoint" yaml:"endpoint"` + Tenant string `json:"tenant,omitempty" yaml:"tenant,omitempty"` +} + +func (c *LogsConfig) Validate() error { + if c == nil || len(c.Providers) == 0 { + return nil + } + if strutil.IsBlank(c.DefaultProvider) { + return bizerror.New(bizerror.ConfigError, "default log provider is required") + } + + foundDefault := false + for _, provider := range c.Providers { + if strutil.IsBlank(provider.Name) { + return bizerror.New(bizerror.ConfigError, "log provider name is required") + } + if provider.Name == c.DefaultProvider { + foundDefault = true + } + if provider.Type != LogProviderLoki { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("unsupported log provider type: %s", provider.Type)) + } + if strutil.IsBlank(provider.Endpoint) { + return bizerror.New(bizerror.ConfigError, "log provider endpoint is required") + } + if _, err := url.Parse(provider.Endpoint); err != nil { + return bizerror.Wrap(err, bizerror.ConfigError, fmt.Sprintf("invalid log provider endpoint: %s", provider.Endpoint)) + } + } + if !foundDefault { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("default log provider %q is not configured", c.DefaultProvider)) + } + return nil +} + +func (c *LogsConfig) Default() (LogProviderConfig, bool) { + if c == nil { + return LogProviderConfig{}, false + } + for _, provider := range c.Providers { + if provider.Name == c.DefaultProvider { + return provider, true + } + } + return LogProviderConfig{}, false +} diff --git a/pkg/console/component.go b/pkg/console/component.go index 631fcaeed..f2ca85669 100644 --- a/pkg/console/component.go +++ b/pkg/console/component.go @@ -41,8 +41,9 @@ import ( "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" mcpcore "github.com/apache/dubbo-admin/pkg/mcp/core" - mcphttp "github.com/apache/dubbo-admin/pkg/mcp/transport/http" mcp_tools "github.com/apache/dubbo-admin/pkg/mcp/tools" + mcp_log_tools "github.com/apache/dubbo-admin/pkg/mcp/tools/log" + mcphttp "github.com/apache/dubbo-admin/pkg/mcp/transport/http" ) func init() { @@ -50,11 +51,11 @@ func init() { } type consoleWebServer struct { - Engine *gin.Engine - cfg *console.Config - cs consolectx.Context - mcpPath string // MCP端点路径,用于auth中间件跳过认证 - mcpAPIKey string // MCP API密钥,用于认证 + Engine *gin.Engine + cfg *console.Config + cs consolectx.Context + mcpPath string // MCP endpoint path used by auth middleware to skip authentication. + mcpAPIKey string // MCP API key used for authentication. } func (c *consoleWebServer) RequiredDependencies() []runtime.ComponentType { @@ -189,6 +190,7 @@ func (c *consoleWebServer) registerMCPEndpoints(coreRt runtime.Runtime, engine * reg.RegisterRegistrar(&mcp_tools.ResourceSearchRegistrar{}) reg.RegisterRegistrar(&mcp_tools.ServiceRegistrar{}) reg.RegisterRegistrar(&mcp_tools.DetailRegistrar{}) + reg.RegisterRegistrar(&mcp_log_tools.LogRegistrar{}) reg.RegisterAll() // 创建HTTP处理器 diff --git a/pkg/mcp/tools/log/analyzer.go b/pkg/mcp/tools/log/analyzer.go new file mode 100644 index 000000000..669c20459 --- /dev/null +++ b/pkg/mcp/tools/log/analyzer.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "fmt" + "regexp" + "sort" + "strings" +) + +var volatileTokenPattern = regexp.MustCompile(`(?i)\b[0-9a-f]{8,}\b|\b\d+\b`) + +func analyzeErrors(logs []LogItem, sourceEngine string) *AnalyzeErrorLogsResp { + patternsByName := map[string]*ErrorPattern{} + for _, item := range logs { + if !isErrorLog(item) { + continue + } + patternName := normalizeMessagePattern(item.Message) + pattern := patternsByName[patternName] + if pattern == nil { + pattern = &ErrorPattern{ + Pattern: patternName, + Example: item.Message, + FirstSeen: item.Timestamp, + LastSeen: item.Timestamp, + } + patternsByName[patternName] = pattern + } + pattern.Count++ + pattern.LastSeen = item.Timestamp + if len(pattern.Examples) < 3 { + pattern.Examples = append(pattern.Examples, item) + } + } + + patterns := make([]ErrorPattern, 0, len(patternsByName)) + total := 0 + for _, pattern := range patternsByName { + total += pattern.Count + patterns = append(patterns, *pattern) + } + sort.Slice(patterns, func(i, j int) bool { + if patterns[i].Count == patterns[j].Count { + return patterns[i].Pattern < patterns[j].Pattern + } + return patterns[i].Count > patterns[j].Count + }) + + return &AnalyzeErrorLogsResp{ + Summary: fmt.Sprintf("found %d error log entries across %d patterns", total, len(patterns)), + TotalErrors: total, + Patterns: patterns, + SourceEngine: sourceEngine, + } +} + +func isErrorLog(item LogItem) bool { + if strings.EqualFold(item.Severity, "error") { + return true + } + message := strings.ToLower(item.Message) + return strings.Contains(message, "error") || strings.Contains(message, "exception") || strings.Contains(message, "failed") +} + +func normalizeMessagePattern(message string) string { + pattern := volatileTokenPattern.ReplaceAllString(message, "?") + pattern = strings.Join(strings.Fields(pattern), " ") + if pattern == "" { + return "unknown" + } + return pattern +} diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go new file mode 100644 index 000000000..75e822e2e --- /dev/null +++ b/pkg/mcp/tools/log/loki.go @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "time" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + observabilitycfg "github.com/apache/dubbo-admin/pkg/config/observability" +) + +const defaultQueryWindow = time.Hour + +type lokiClient struct { + config observabilitycfg.LogProviderConfig + client *http.Client +} + +type lokiQueryRangeResp struct { + Status string `json:"status"` + Data struct { + Result []lokiStream `json:"result"` + } `json:"data"` + Error string `json:"error,omitempty"` +} + +type lokiStream struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +func newLokiClient(cfg observabilitycfg.LogProviderConfig) *lokiClient { + return &lokiClient{ + config: cfg, + client: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (c *lokiClient) search(ctx context.Context, req *SearchLogsReq) (*SearchLogsResp, error) { + if req.Limit <= 0 { + req.Limit = defaultLogLimit + } + start, end, err := resolveTimeRange(req.StartTime, req.EndTime) + if err != nil { + return nil, err + } + + queries := buildLogQLQueries(req) + merged := &SearchLogsResp{SourceEngine: "loki", Logs: make([]LogItem, 0, req.Limit)} + seen := map[string]struct{}{} + for _, query := range queries { + logs, err := c.queryRange(ctx, query, start, end, req.Limit) + if err != nil { + return nil, err + } + for _, item := range logs { + key := dedupeKey(item) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + merged.Logs = append(merged.Logs, item) + if len(merged.Logs) >= req.Limit { + break + } + } + if len(merged.Logs) >= req.Limit { + break + } + } + + sort.SliceStable(merged.Logs, func(i, j int) bool { + return merged.Logs[i].Timestamp > merged.Logs[j].Timestamp + }) + return merged, nil +} + +func (c *lokiClient) queryRange(ctx context.Context, query string, start, end time.Time, limit int) ([]LogItem, error) { + queryURL, err := c.queryRangeURL(query, start, end, limit) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, queryURL, nil) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.InternalError, "failed to create loki query request") + } + if c.config.Tenant != "" { + httpReq.Header.Set("X-Scope-OrgID", c.config.Tenant) + } + + httpResp, err := c.client.Do(httpReq) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.NetWorkError, "failed to query loki") + } + defer httpResp.Body.Close() + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, 4096)) + return nil, bizerror.New(bizerror.NetWorkError, + fmt.Sprintf("loki query failed with status %d: %s", httpResp.StatusCode, strings.TrimSpace(string(body)))) + } + + var lokiResp lokiQueryRangeResp + if err := json.NewDecoder(httpResp.Body).Decode(&lokiResp); err != nil { + return nil, bizerror.Wrap(err, bizerror.JsonError, "failed to decode loki query response") + } + if lokiResp.Status != "success" { + if lokiResp.Error != "" { + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki query failed: %s", lokiResp.Error)) + } + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki query returned status %q", lokiResp.Status)) + } + return normalizeLokiLogs(lokiResp), nil +} + +func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int) (string, error) { + baseURL, err := url.Parse(c.config.Endpoint) + if err != nil { + return "", bizerror.Wrap(err, bizerror.ConfigError, "invalid loki endpoint") + } + baseURL.Path = strings.TrimRight(baseURL.Path, "/") + "/loki/api/v1/query_range" + + query := baseURL.Query() + query.Set("query", logQL) + query.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + query.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + query.Set("limit", strconv.Itoa(limit)) + query.Set("direction", "backward") + baseURL.RawQuery = query.Encode() + return baseURL.String(), nil +} + +func buildLogQLQueries(req *SearchLogsReq) []string { + selectors := buildStreamSelectors(req) + queries := make([]string, 0, len(selectors)) + for _, selector := range selectors { + query := selector + if req.Keywords != "" { + query += " |= " + strconv.Quote(req.Keywords) + } + queries = append(queries, query) + } + return queries +} + +func buildStreamSelectors(req *SearchLogsReq) []string { + labelGroups := make([][]string, 0, 4) + if req.Mesh != "" { + labelGroups = append(labelGroups, []string{labelMatcher("mesh", req.Mesh)}) + } + if req.AppName != "" { + labelGroups = append(labelGroups, labelMatchers([]string{"app", "appName"}, req.AppName)) + } + if req.ServiceName != "" { + labelGroups = append(labelGroups, labelMatchers([]string{"service", "serviceName", "service_name"}, req.ServiceName)) + } + if req.InstanceName != "" { + labelGroups = append(labelGroups, labelMatchers([]string{"instance", "instanceName", "pod"}, req.InstanceName)) + } + if req.TraceID != "" { + labelGroups = append(labelGroups, labelMatchers([]string{"trace_id", "traceId", "traceid"}, req.TraceID)) + } + if len(labelGroups) == 0 { + return []string{`{job=~".+"}`} + } + + selectors := []string{""} + for _, group := range labelGroups { + next := make([]string, 0, len(selectors)*len(group)) + for _, prefix := range selectors { + for _, matcher := range group { + if prefix == "" { + next = append(next, matcher) + } else { + next = append(next, prefix+", "+matcher) + } + } + } + selectors = next + } + + result := make([]string, 0, len(selectors)) + for _, selector := range selectors { + result = append(result, fmt.Sprintf("{%s}", selector)) + } + return result +} + +func labelMatchers(names []string, value string) []string { + matchers := make([]string, 0, len(names)) + for _, name := range names { + matchers = append(matchers, labelMatcher(name, value)) + } + return matchers +} + +func labelMatcher(name, value string) string { + return fmt.Sprintf("%s=%s", name, strconv.Quote(value)) +} + +func resolveTimeRange(startRaw, endRaw string) (time.Time, time.Time, error) { + end := time.Now() + if endRaw != "" { + parsed, err := parseLogTime("endTime", endRaw) + if err != nil { + return time.Time{}, time.Time{}, err + } + end = parsed + } + start := end.Add(-defaultQueryWindow) + if startRaw != "" { + parsed, err := parseLogTime("startTime", startRaw) + if err != nil { + return time.Time{}, time.Time{}, err + } + start = parsed + } + if start.After(end) { + return time.Time{}, time.Time{}, bizerror.New(bizerror.InvalidArgument, "startTime must be less than or equal to endTime") + } + return start, end, nil +} + +func parseLogTime(field, value string) (time.Time, error) { + if ts, err := time.Parse(time.RFC3339Nano, value); err == nil { + return ts, nil + } + if ns, err := strconv.ParseInt(value, 10, 64); err == nil { + return time.Unix(0, ns), nil + } + return time.Time{}, bizerror.New(bizerror.InvalidArgument, + fmt.Sprintf("%s must be RFC3339, RFC3339Nano, or Unix nanoseconds", field)) +} + +func normalizeLokiLogs(resp lokiQueryRangeResp) []LogItem { + logs := make([]LogItem, 0) + for _, stream := range resp.Data.Result { + for _, value := range stream.Values { + if len(value) < 2 { + continue + } + logs = append(logs, LogItem{ + Timestamp: normalizeLokiTimestamp(value[0]), + AppName: firstLabel(stream.Stream, "app", "appName"), + ServiceName: firstLabel(stream.Stream, "service", "serviceName", "service_name"), + InstanceName: firstLabel(stream.Stream, "instance", "instanceName", "pod"), + Severity: firstLabel(stream.Stream, "level", "severity", "detected_level"), + Message: value[1], + TraceID: firstLabel(stream.Stream, "trace_id", "traceId", "traceid"), + SpanID: firstLabel(stream.Stream, "span_id", "spanId", "spanid"), + Attributes: extraLabels(stream.Stream), + Raw: value[1], + }) + } + } + return logs +} + +func normalizeLokiTimestamp(value string) string { + ns, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return value + } + return time.Unix(0, ns).UTC().Format(time.RFC3339Nano) +} + +func firstLabel(labels map[string]string, keys ...string) string { + for _, key := range keys { + if value := labels[key]; value != "" { + return value + } + } + return "" +} + +func extraLabels(labels map[string]string) map[string]string { + attrs := make(map[string]string) + for key, value := range labels { + switch key { + case "mesh", "app", "appName", "service", "serviceName", "service_name", "instance", "instanceName", + "pod", "level", "severity", "detected_level", "trace_id", "traceId", "traceid", "span_id", "spanId", "spanid": + continue + default: + attrs[key] = value + } + } + if len(attrs) == 0 { + return nil + } + return attrs +} + +func dedupeKey(item LogItem) string { + return strings.Join([]string{item.Timestamp, item.Message, item.TraceID, item.SpanID}, "\x00") +} diff --git a/pkg/mcp/tools/log/model.go b/pkg/mcp/tools/log/model.go new file mode 100644 index 000000000..54d207003 --- /dev/null +++ b/pkg/mcp/tools/log/model.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +type SearchLogsReq struct { + Mesh string `json:"mesh,omitempty"` + AppName string `json:"appName,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + InstanceName string `json:"instanceName,omitempty"` + TraceID string `json:"traceId,omitempty"` + Keywords string `json:"keywords,omitempty"` + StartTime string `json:"startTime,omitempty"` + EndTime string `json:"endTime,omitempty"` + Limit int `json:"limit,omitempty"` +} + +type LogItem struct { + Timestamp string `json:"timestamp"` + AppName string `json:"appName,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + InstanceName string `json:"instanceName,omitempty"` + Severity string `json:"severity,omitempty"` + Message string `json:"message"` + TraceID string `json:"traceId,omitempty"` + SpanID string `json:"spanId,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Raw string `json:"raw,omitempty"` +} + +type SearchLogsResp struct { + Logs []LogItem `json:"logs"` + SourceEngine string `json:"sourceEngine"` +} + +type AnalyzeErrorLogsReq struct { + SearchLogsReq +} + +type ErrorPattern struct { + Pattern string `json:"pattern"` + Count int `json:"count"` + Example string `json:"example,omitempty"` + FirstSeen string `json:"firstSeen,omitempty"` + LastSeen string `json:"lastSeen,omitempty"` + Examples []LogItem `json:"examples,omitempty"` +} + +type AnalyzeErrorLogsResp struct { + Summary string `json:"summary"` + TotalErrors int `json:"totalErrors"` + Patterns []ErrorPattern `json:"patterns"` + SourceEngine string `json:"sourceEngine"` +} diff --git a/pkg/mcp/tools/log/tools.go b/pkg/mcp/tools/log/tools.go new file mode 100644 index 000000000..7fabd6dea --- /dev/null +++ b/pkg/mcp/tools/log/tools.go @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "fmt" + + consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/mcp/registry" + basetools "github.com/apache/dubbo-admin/pkg/mcp/tools" + "github.com/apache/dubbo-admin/pkg/mcp/types" +) + +const defaultLogLimit = 100 + +type LogRegistrar struct{} + +func (r *LogRegistrar) RegisterTools(reg *registry.Registry) { + properties := logSearchProperties() + reg.Register(types.ToolDef{ + Name: "search_logs", + Description: "查询 Dubbo 服务日志,支持按应用、服务、实例、TraceID 和关键字过滤", + InputSchema: types.InputSchema{ + Type: "object", + Properties: properties, + }, + Handler: SearchLogs, + }) + + reg.Register(types.ToolDef{ + Name: "analyze_error_logs", + Description: "分析错误日志并按错误模式聚合", + InputSchema: types.InputSchema{ + Type: "object", + Properties: properties, + }, + Handler: AnalyzeErrorLogs, + }) +} + +func SearchLogs(ctx consolectx.Context, args map[string]any) (*types.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return basetools.ErrorResult(err), nil + } + resp, err := client.search(requestContext(ctx), buildSearchLogsReq(args)) + if err != nil { + return basetools.ErrorResult(err), nil + } + return basetools.JsonResult(resp) +} + +func AnalyzeErrorLogs(ctx consolectx.Context, args map[string]any) (*types.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return basetools.ErrorResult(err), nil + } + req := buildSearchLogsReq(args) + if req.Keywords == "" { + req.Keywords = "Error" + } + searchResp, err := client.search(requestContext(ctx), req) + if err != nil { + return basetools.ErrorResult(err), nil + } + return basetools.JsonResult(analyzeErrors(searchResp.Logs, searchResp.SourceEngine)) +} + +func lokiClientFromContext(ctx consolectx.Context) (*lokiClient, error) { + if ctx == nil || ctx.Config().Observability == nil || ctx.Config().Observability.Logs == nil { + return nil, fmt.Errorf("loki log provider is not configured") + } + provider, ok := ctx.Config().Observability.Logs.Default() + if !ok { + return nil, fmt.Errorf("default loki log provider is not configured") + } + return newLokiClient(provider), nil +} + +func requestContext(ctx consolectx.Context) context.Context { + if ctx == nil || ctx.AppContext() == nil { + return context.Background() + } + return ctx.AppContext() +} + +func buildSearchLogsReq(args map[string]any) *SearchLogsReq { + helper := basetools.NewArgsHelper(args) + return &SearchLogsReq{ + Mesh: helper.GetString("mesh", ""), + AppName: helper.GetString("appName", ""), + ServiceName: helper.GetString("serviceName", ""), + InstanceName: helper.GetString("instanceName", ""), + TraceID: helper.GetString("traceId", ""), + Keywords: helper.GetString("keywords", ""), + StartTime: helper.GetString("startTime", ""), + EndTime: helper.GetString("endTime", ""), + Limit: helper.GetInt("limit", defaultLogLimit), + } +} + +func logSearchProperties() map[string]types.PropertyDef { + return map[string]types.PropertyDef{ + "mesh": { + Type: "string", + Description: "Mesh 名称,用于显式按 mesh label 过滤", + }, + "appName": { + Type: "string", + Description: "应用名称", + }, + "serviceName": { + Type: "string", + Description: "服务名称", + }, + "instanceName": { + Type: "string", + Description: "实例、Pod 或主机名称", + }, + "traceId": { + Type: "string", + Description: "TraceID", + }, + "keywords": { + Type: "string", + Description: "日志关键字", + }, + "startTime": { + Type: "string", + Description: "开始时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "endTime": { + Type: "string", + Description: "结束时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "limit": { + Type: "integer", + Description: "返回日志条数上限", + Default: defaultLogLimit, + }, + } +} + +var _ registry.ToolRegistrar = (*LogRegistrar)(nil) diff --git a/pkg/mcp/tools/log/tools_test.go b/pkg/mcp/tools/log/tools_test.go new file mode 100644 index 000000000..f70729d56 --- /dev/null +++ b/pkg/mcp/tools/log/tools_test.go @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/apache/dubbo-admin/pkg/config/app" + "github.com/apache/dubbo-admin/pkg/config/observability" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/console/counter" + "github.com/apache/dubbo-admin/pkg/core/lock" + "github.com/apache/dubbo-admin/pkg/core/manager" + "github.com/apache/dubbo-admin/pkg/mcp/registry" +) + +func TestLogRegistrarRegistersExpectedTools(t *testing.T) { + reg := registry.NewRegistry() + (&LogRegistrar{}).RegisterTools(reg) + + if got := reg.Count(); got != 2 { + t.Fatalf("expected 2 log tools, got %d", got) + } + for _, name := range []string{"search_logs", "analyze_error_logs"} { + tool, ok := reg.Get(name) + if !ok { + t.Fatalf("tool %s was not registered", name) + } + if tool.Handler == nil { + t.Fatalf("tool %s handler is nil", name) + } + } +} + +func TestBuildLogQLQueriesUsesCommonLabelAliases(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + ServiceName: "org.apache.DemoService", + Keywords: "Error", + }) + + expected := []string{ + `{service="org.apache.DemoService"} |= "Error"`, + `{serviceName="org.apache.DemoService"} |= "Error"`, + `{service_name="org.apache.DemoService"} |= "Error"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + +func TestSearchLogsQueriesLoki(t *testing.T) { + var seenQueries []string + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/query_range" { + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [{ + "stream": { + "app": "demo-provider", + "service_name": "org.apache.DemoService", + "instance": "127.0.0.1:20880", + "level": "ERROR", + "trace_id": "trace-1", + "span_id": "span-1", + "namespace": "dubbo-system" + }, + "values": [["1777110661783444000", "ERROR test log"]] + }] + } + }`)) + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "serviceName": "org.apache.DemoService", + "keywords": "ERROR", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + "limit": float64(10), + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + + var payload SearchLogsResp + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("failed to decode tool result: %v", err) + } + if len(payload.Logs) != 1 { + t.Fatalf("expected one deduplicated log, got %d", len(payload.Logs)) + } + logItem := payload.Logs[0] + if logItem.ServiceName != "org.apache.DemoService" || logItem.TraceID != "trace-1" || logItem.Severity != "ERROR" { + t.Fatalf("unexpected normalized log: %+v", logItem) + } + if len(seenQueries) != 3 { + t.Fatalf("expected three service label alias queries, got %d: %v", len(seenQueries), seenQueries) + } +} + +func TestAnalyzeErrorsGroupsPatterns(t *testing.T) { + resp := analyzeErrors([]LogItem{ + {Timestamp: "2026-04-01T00:00:00Z", Severity: "ERROR", Message: "Error 500 for request 123"}, + {Timestamp: "2026-04-01T00:01:00Z", Severity: "ERROR", Message: "Error 503 for request 456"}, + {Timestamp: "2026-04-01T00:02:00Z", Severity: "INFO", Message: "normal log"}, + }, "loki") + + if resp.TotalErrors != 2 { + t.Fatalf("expected 2 errors, got %d", resp.TotalErrors) + } + if len(resp.Patterns) != 1 { + t.Fatalf("expected one pattern, got %d", len(resp.Patterns)) + } + if resp.Patterns[0].Pattern != "Error ? for request ?" || resp.Patterns[0].Count != 2 { + t.Fatalf("unexpected pattern: %+v", resp.Patterns[0]) + } +} + +func newLogToolTestContext(endpoint string) consolectx.Context { + return &logToolTestContext{ + config: app.AdminConfig{ + Observability: &observability.Config{ + Logs: &observability.LogsConfig{ + DefaultProvider: "loki-main", + Providers: []observability.LogProviderConfig{{ + Name: "loki-main", + Type: observability.LogProviderLoki, + Endpoint: endpoint, + }}, + }, + }, + }, + } +} + +type logToolTestContext struct { + config app.AdminConfig +} + +func (c *logToolTestContext) ResourceManager() manager.ResourceManager { + return nil +} + +func (c *logToolTestContext) CounterManager() counter.CounterManager { + return nil +} + +func (c *logToolTestContext) Config() app.AdminConfig { + return c.config +} + +func (c *logToolTestContext) AppContext() context.Context { + return context.Background() +} + +func (c *logToolTestContext) LockManager() lock.Lock { + return nil +} diff --git a/pkg/mcp/tools/resource_search.go b/pkg/mcp/tools/resource_search.go index 8943fdd3f..b72f8d1a9 100644 --- a/pkg/mcp/tools/resource_search.go +++ b/pkg/mcp/tools/resource_search.go @@ -21,8 +21,8 @@ import ( consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/console/model" "github.com/apache/dubbo-admin/pkg/console/service" - "github.com/apache/dubbo-admin/pkg/mcp/types" "github.com/apache/dubbo-admin/pkg/mcp/registry" + "github.com/apache/dubbo-admin/pkg/mcp/types" ) // ResourceSearchRegistrar 搜索工具注册器 @@ -165,11 +165,11 @@ func (e *appNameSearchExecutor) execute(ctx consolectx.Context, keyword, mesh st func (e *appNameSearchExecutor) buildResult(pagedResult *model.SearchPaginationResult, keyword string, pageSize, pageNumber int) map[string]any { apps := extractApplicationsFromResult(pagedResult) return map[string]any{ - "keyword": keyword, - "pageSize": pageSize, - "pageNumber": pageNumber, + "keyword": keyword, + "pageSize": pageSize, + "pageNumber": pageNumber, "applications": apps, - "totalCount": len(apps), + "totalCount": len(apps), } } @@ -280,7 +280,6 @@ func extractServicesFromResult(pagedResult *model.SearchPaginationResult) ([]any "serviceName": svc.ServiceName, "version": svc.Version, "group": svc.Group, - "providerAppName": svc.ProviderAppName, "consumerAppName": svc.ConsumerAppName, }) } diff --git a/pkg/mcp/tools/service_discovery.go b/pkg/mcp/tools/service_discovery.go index 0b4ff2894..74ad5f5e9 100644 --- a/pkg/mcp/tools/service_discovery.go +++ b/pkg/mcp/tools/service_discovery.go @@ -21,8 +21,8 @@ import ( consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/console/model" "github.com/apache/dubbo-admin/pkg/console/service" - "github.com/apache/dubbo-admin/pkg/mcp/types" "github.com/apache/dubbo-admin/pkg/mcp/registry" + "github.com/apache/dubbo-admin/pkg/mcp/types" ) // ServiceRegistrar 服务工具注册器 @@ -232,7 +232,6 @@ func extractServices(result *model.SearchPaginationResult) ([]any, int) { "serviceName": svc.ServiceName, "version": svc.Version, "group": svc.Group, - "providerAppName": svc.ProviderAppName, "consumerAppName": svc.ConsumerAppName, }) } From 3d4fdfe41564185e2e69f3bc50194412ac2f25bb Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Mon, 1 Jun 2026 15:45:28 +0800 Subject: [PATCH 2/8] docs: add loki log config example --- app/dubbo-admin/dubbo-admin.yaml | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/app/dubbo-admin/dubbo-admin.yaml b/app/dubbo-admin/dubbo-admin.yaml index 1e74b47ea..b348772ce 100644 --- a/app/dubbo-admin/dubbo-admin.yaml +++ b/app/dubbo-admin/dubbo-admin.yaml @@ -29,12 +29,23 @@ log: # [Optional] config for observability. observability: # [Optional] config for grafana and prometheus. - grafana: http://101.34.253.152:30300 - # [Optional] config for prometheus. - prometheus: http://101.34.253.152:30900/ - -# [Optional] config for console. -console: + grafana: http://101.34.253.152:30300 + # [Optional] config for prometheus. + prometheus: http://101.34.253.152:30900/ + # [Optional] config for log providers used by MCP log tools. + # defaultProvider must match one provider name in providers. + # Currently only Loki is supported. endpoint should be the Loki HTTP API root. + # tenant is optional and will be sent as X-Scope-OrgID for multi-tenant Loki. + # logs: + # defaultProvider: loki-main + # providers: + # - name: loki-main + # type: loki + # endpoint: http://localhost:3100 + # tenant: "" + +# [Optional] config for console. +console: # [Optional] port of http server, default is 8888. port: 8888 # [Optional] running mode of Gin, options are debug, release, test, default is release. From 34cd9fd6f893001982968316a1c2579247a81be6 Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Mon, 1 Jun 2026 15:48:40 +0800 Subject: [PATCH 3/8] =?UTF-8?q?feat=EF=BC=9AThe=20joint=20debugging=20=20o?= =?UTF-8?q?f=20interface=20error-analyse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/console/component.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/console/component.go b/pkg/console/component.go index f2ca85669..ecf163f86 100644 --- a/pkg/console/component.go +++ b/pkg/console/component.go @@ -54,8 +54,8 @@ type consoleWebServer struct { Engine *gin.Engine cfg *console.Config cs consolectx.Context - mcpPath string // MCP endpoint path used by auth middleware to skip authentication. - mcpAPIKey string // MCP API key used for authentication. + mcpPath string // MCP端点路径,用于auth中间件跳过认证 + mcpAPIKey string // MCP API密钥,用于认证 } func (c *consoleWebServer) RequiredDependencies() []runtime.ComponentType { From a88a029b5b93f21aed6c72f00f18430046e39edc Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Mon, 1 Jun 2026 22:05:01 +0800 Subject: [PATCH 4/8] docs: add alloy loki values example --- .../kubernetes/monitoring/alloy-values.yaml | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 release/kubernetes/monitoring/alloy-values.yaml diff --git a/release/kubernetes/monitoring/alloy-values.yaml b/release/kubernetes/monitoring/alloy-values.yaml new file mode 100644 index 000000000..643025063 --- /dev/null +++ b/release/kubernetes/monitoring/alloy-values.yaml @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Example values for the Grafana Alloy Helm chart. +# The generated Loki labels match the MCP log tools: +# - appName filters app/appName labels +# - serviceName filters service/serviceName/service_name labels +# - instanceName filters instance/instanceName/pod labels +# - mesh filters the mesh label +# +# traceId is intentionally not promoted to a Loki label in this example. +# Trace IDs are usually high-cardinality values; use the MCP keywords filter +# or add custom log parsing if your deployment requires trace-id lookup. + +global: + image: + registry: m.daocloud.io/docker.io + +image: + registry: m.daocloud.io/docker.io + repository: grafana/alloy + pullPolicy: IfNotPresent + +controller: + type: deployment + replicas: 1 + +configReloader: + enabled: false + +rbac: + create: true + +serviceAccount: + create: true + +alloy: + enableReporting: false + + mounts: + varlog: false + dockercontainers: false + + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 300m + memory: 512Mi + + configMap: + create: true + content: |- + logging { + level = "info" + format = "logfmt" + } + + discovery.kubernetes "pods" { + role = "pod" + } + + discovery.relabel "dubbo_pod_logs" { + targets = discovery.kubernetes.pods.targets + + rule { + source_labels = ["__meta_kubernetes_namespace"] + regex = "default|dubbo-system|dubbo-samples-shop|logging" + action = "keep" + } + + rule { + source_labels = ["__meta_kubernetes_namespace"] + target_label = "namespace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_name"] + target_label = "pod" + } + + rule { + source_labels = ["__meta_kubernetes_pod_name"] + target_label = "instance" + } + + rule { + source_labels = ["__meta_kubernetes_pod_container_name"] + target_label = "container" + } + + rule { + source_labels = ["__meta_kubernetes_pod_container_name"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app_kubernetes_io_name"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + regex = "(.+)" + target_label = "app" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app_kubernetes_io_name"] + regex = "(.+)" + target_label = "app" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_namespace", "__meta_kubernetes_pod_container_name"] + separator = "/" + target_label = "job" + } + } + + loki.source.kubernetes "dubbo_pod_logs" { + targets = discovery.relabel.dubbo_pod_logs.output + forward_to = [loki.process.dubbo_logs.receiver] + } + + loki.process "dubbo_logs" { + forward_to = [loki.write.local.receiver] + + stage.decolorize {} + + stage.static_labels { + values = { + cluster = "k8s1.28.2", + mesh = "nacos2.5", + } + } + } + + loki.write "local" { + endpoint { + url = "http://loki.logging.svc.cluster.local:3100/loki/api/v1/push" + } + } From 5a810970dc6b1dcba9fef47f8985bd76072a99a0 Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Mon, 1 Jun 2026 22:51:48 +0800 Subject: [PATCH 5/8] =?UTF-8?q?feat=EF=BC=9AThe=20joint=20debugging=20=20o?= =?UTF-8?q?f=20interface=20error-analyse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/mcp/tools/log/loki.go | 6 ++++++ pkg/mcp/tools/log/tools.go | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go index 75e822e2e..7b1b8ae34 100644 --- a/pkg/mcp/tools/log/loki.go +++ b/pkg/mcp/tools/log/loki.go @@ -77,6 +77,7 @@ func (c *lokiClient) search(ctx context.Context, req *SearchLogsReq) (*SearchLog if err != nil { return nil, err } + // remove duplicates for _, item := range logs { key := dedupeKey(item) if _, ok := seen[key]; ok { @@ -136,6 +137,7 @@ func (c *lokiClient) queryRange(ctx context.Context, query string, start, end ti return normalizeLokiLogs(lokiResp), nil } +// e.g: endpoint: {endpoint}/loki/api/v1/query_range?query={app="order-service"}&start=1717200000000000000&end=1717203600000000000&limit=100&direction=backward func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int) (string, error) { baseURL, err := url.Parse(c.config.Endpoint) if err != nil { @@ -156,6 +158,7 @@ func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int func buildLogQLQueries(req *SearchLogsReq) []string { selectors := buildStreamSelectors(req) queries := make([]string, 0, len(selectors)) + // add keywords filter for _, selector := range selectors { query := selector if req.Keywords != "" { @@ -187,6 +190,7 @@ func buildStreamSelectors(req *SearchLogsReq) []string { return []string{`{job=~".+"}`} } + // Cartesian product selectors := []string{""} for _, group := range labelGroups { next := make([]string, 0, len(selectors)*len(group)) @@ -287,6 +291,7 @@ func normalizeLokiTimestamp(value string) string { return time.Unix(0, ns).UTC().Format(time.RFC3339Nano) } +// firstLabel returns the first label value that matches any of the keys, or an empty string if none matches func firstLabel(labels map[string]string, keys ...string) string { for _, key := range keys { if value := labels[key]; value != "" { @@ -296,6 +301,7 @@ func firstLabel(labels map[string]string, keys ...string) string { return "" } +// extraLabels returns all labels except the ones used to filter the logs func extraLabels(labels map[string]string) map[string]string { attrs := make(map[string]string) for key, value := range labels { diff --git a/pkg/mcp/tools/log/tools.go b/pkg/mcp/tools/log/tools.go index 7fabd6dea..7c4f65fa7 100644 --- a/pkg/mcp/tools/log/tools.go +++ b/pkg/mcp/tools/log/tools.go @@ -35,7 +35,7 @@ func (r *LogRegistrar) RegisterTools(reg *registry.Registry) { properties := logSearchProperties() reg.Register(types.ToolDef{ Name: "search_logs", - Description: "查询 Dubbo 服务日志,支持按应用、服务、实例、TraceID 和关键字过滤", + Description: "查询 Dubbo 服务日志,支持按应用、服务、实例 和关键字过滤,暂不支持TraceID", InputSchema: types.InputSchema{ Type: "object", Properties: properties, From 563416228829c040b57b488f53a00ef9f65e5d87 Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Tue, 2 Jun 2026 15:02:29 +0800 Subject: [PATCH 6/8] =?UTF-8?q?feat=EF=BC=9Asupport=20traceId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/mcp/tools/log/loki.go | 85 ++++++++++++++++++++++++++---- pkg/mcp/tools/log/model.go | 1 + pkg/mcp/tools/log/tools.go | 2 +- pkg/mcp/tools/log/tools_test.go | 92 +++++++++++++++++++++++++++++++-- 4 files changed, 166 insertions(+), 14 deletions(-) diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go index 7b1b8ae34..89ecf78d2 100644 --- a/pkg/mcp/tools/log/loki.go +++ b/pkg/mcp/tools/log/loki.go @@ -24,6 +24,7 @@ import ( "io" "net/http" "net/url" + "regexp" "sort" "strconv" "strings" @@ -158,12 +159,14 @@ func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int func buildLogQLQueries(req *SearchLogsReq) []string { selectors := buildStreamSelectors(req) queries := make([]string, 0, len(selectors)) - // add keywords filter for _, selector := range selectors { query := selector if req.Keywords != "" { query += " |= " + strconv.Quote(req.Keywords) } + if req.TraceID != "" { + query += " |= " + strconv.Quote(req.TraceID) + } queries = append(queries, query) } return queries @@ -183,9 +186,6 @@ func buildStreamSelectors(req *SearchLogsReq) []string { if req.InstanceName != "" { labelGroups = append(labelGroups, labelMatchers([]string{"instance", "instanceName", "pod"}, req.InstanceName)) } - if req.TraceID != "" { - labelGroups = append(labelGroups, labelMatchers([]string{"trace_id", "traceId", "traceid"}, req.TraceID)) - } if len(labelGroups) == 0 { return []string{`{job=~".+"}`} } @@ -266,23 +266,38 @@ func normalizeLokiLogs(resp lokiQueryRangeResp) []LogItem { if len(value) < 2 { continue } + raw := value[1] + message := extractLogField(raw, "msg", "message") + if message == "" { + message = raw + } logs = append(logs, LogItem{ - Timestamp: normalizeLokiTimestamp(value[0]), + Timestamp: normalizeLogTimestamp(value[0], extractLogField(raw, "time", "timestamp")), AppName: firstLabel(stream.Stream, "app", "appName"), ServiceName: firstLabel(stream.Stream, "service", "serviceName", "service_name"), InstanceName: firstLabel(stream.Stream, "instance", "instanceName", "pod"), - Severity: firstLabel(stream.Stream, "level", "severity", "detected_level"), - Message: value[1], - TraceID: firstLabel(stream.Stream, "trace_id", "traceId", "traceid"), - SpanID: firstLabel(stream.Stream, "span_id", "spanId", "spanid"), + Severity: firstNonEmpty(extractLogField(raw, "level", "severity"), firstLabel(stream.Stream, "level", "severity", "detected_level")), + Message: message, + TraceID: extractLogField(raw, "trace_id", "traceId", "traceid"), + SpanID: extractLogField(raw, "span_id", "spanId", "spanid"), + TraceFlags: extractLogField(raw, "trace_flags", "traceFlags", "traceflags"), Attributes: extraLabels(stream.Stream), - Raw: value[1], + Raw: raw, }) } } return logs } +func normalizeLogTimestamp(lokiTimestamp, logTimestamp string) string { + if logTimestamp != "" { + if parsed, err := time.Parse(time.RFC3339Nano, logTimestamp); err == nil { + return parsed.UTC().Format(time.RFC3339Nano) + } + } + return normalizeLokiTimestamp(lokiTimestamp) +} + func normalizeLokiTimestamp(value string) string { ns, err := strconv.ParseInt(value, 10, 64) if err != nil { @@ -301,6 +316,56 @@ func firstLabel(labels map[string]string, keys ...string) string { return "" } +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + +func extractLogField(message string, keys ...string) string { + if value := extractJSONLogField(message, keys...); value != "" { + return value + } + return extractTextLogField(message, keys...) +} + +func extractJSONLogField(message string, keys ...string) string { + var payload map[string]any + if err := json.Unmarshal([]byte(message), &payload); err != nil { + return "" + } + for _, key := range keys { + if value := stringifyLogField(payload[key]); value != "" { + return value + } + } + return "" +} + +func stringifyLogField(value any) string { + switch v := value.(type) { + case string: + return v + case float64, bool: + return fmt.Sprint(v) + default: + return "" + } +} + +func extractTextLogField(message string, keys ...string) string { + for _, key := range keys { + pattern := regexp.MustCompile(`(?i)(?:^|[\s{,])"?` + regexp.QuoteMeta(key) + `"?\s*[:=]\s*"?([^"\s,}]+)`) + if matches := pattern.FindStringSubmatch(message); len(matches) == 2 { + return matches[1] + } + } + return "" +} + // extraLabels returns all labels except the ones used to filter the logs func extraLabels(labels map[string]string) map[string]string { attrs := make(map[string]string) diff --git a/pkg/mcp/tools/log/model.go b/pkg/mcp/tools/log/model.go index 54d207003..4315d94e8 100644 --- a/pkg/mcp/tools/log/model.go +++ b/pkg/mcp/tools/log/model.go @@ -38,6 +38,7 @@ type LogItem struct { Message string `json:"message"` TraceID string `json:"traceId,omitempty"` SpanID string `json:"spanId,omitempty"` + TraceFlags string `json:"traceFlags,omitempty"` Attributes map[string]string `json:"attributes,omitempty"` Raw string `json:"raw,omitempty"` } diff --git a/pkg/mcp/tools/log/tools.go b/pkg/mcp/tools/log/tools.go index 7c4f65fa7..7fabd6dea 100644 --- a/pkg/mcp/tools/log/tools.go +++ b/pkg/mcp/tools/log/tools.go @@ -35,7 +35,7 @@ func (r *LogRegistrar) RegisterTools(reg *registry.Registry) { properties := logSearchProperties() reg.Register(types.ToolDef{ Name: "search_logs", - Description: "查询 Dubbo 服务日志,支持按应用、服务、实例 和关键字过滤,暂不支持TraceID", + Description: "查询 Dubbo 服务日志,支持按应用、服务、实例、TraceID 和关键字过滤", InputSchema: types.InputSchema{ Type: "object", Properties: properties, diff --git a/pkg/mcp/tools/log/tools_test.go b/pkg/mcp/tools/log/tools_test.go index f70729d56..4e84c54d1 100644 --- a/pkg/mcp/tools/log/tools_test.go +++ b/pkg/mcp/tools/log/tools_test.go @@ -72,6 +72,94 @@ func TestBuildLogQLQueriesUsesCommonLabelAliases(t *testing.T) { } } +func TestBuildLogQLQueriesFiltersTraceIDFromLogContent(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + ServiceName: "org.apache.DemoService", + TraceID: "trace-1", + Keywords: "ERROR", + }) + + expected := []string{ + `{service="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + `{serviceName="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + `{service_name="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + +func TestNormalizeLokiLogsExtractsTraceContextFromMessage(t *testing.T) { + logs := normalizeLokiLogs(lokiQueryRangeResp{ + Status: "success", + Data: struct { + Result []lokiStream `json:"result"` + }{ + Result: []lokiStream{{ + Stream: map[string]string{ + "app": "demo-provider", + "trace_id": "label-trace", + "span_id": "label-span", + }, + Values: [][]string{{ + "1777110661783444000", + `ERROR trace_id=message-trace span_id=message-span metadata report failed`, + }}, + }}, + }, + }) + + if len(logs) != 1 { + t.Fatalf("expected one log, got %d", len(logs)) + } + if logs[0].TraceID != "message-trace" || logs[0].SpanID != "message-span" { + t.Fatalf("expected trace context from message, got traceID=%q spanID=%q", logs[0].TraceID, logs[0].SpanID) + } +} + +func TestNormalizeLokiLogsParsesDubboGoJSONLog(t *testing.T) { + raw := `{"level":"error","msg":"error message","span_id":"36d69a3dd9bea02c","time":"2026-06-02T14:56:37+08:00","trace_flags":"01","trace_id":"faba6a688ea3070b1613f50fb081c578"}` + logs := normalizeLokiLogs(lokiQueryRangeResp{ + Status: "success", + Data: struct { + Result []lokiStream `json:"result"` + }{ + Result: []lokiStream{{ + Stream: map[string]string{ + "app": "dubbo-go-provider", + "service_name": "org.apache.DemoService", + }, + Values: [][]string{{ + "1777110661783444000", + raw, + }}, + }}, + }, + }) + + if len(logs) != 1 { + t.Fatalf("expected one log, got %d", len(logs)) + } + logItem := logs[0] + if logItem.Timestamp != "2026-06-02T06:56:37Z" { + t.Fatalf("expected timestamp from JSON log time, got %q", logItem.Timestamp) + } + if logItem.Severity != "error" || logItem.Message != "error message" { + t.Fatalf("expected level and msg from JSON log, got severity=%q message=%q", logItem.Severity, logItem.Message) + } + if logItem.TraceID != "faba6a688ea3070b1613f50fb081c578" || logItem.SpanID != "36d69a3dd9bea02c" || logItem.TraceFlags != "01" { + t.Fatalf("unexpected trace context: traceID=%q spanID=%q traceFlags=%q", logItem.TraceID, logItem.SpanID, logItem.TraceFlags) + } + if logItem.Raw != raw { + t.Fatalf("expected raw JSON log to be preserved, got %q", logItem.Raw) + } +} + func TestSearchLogsQueriesLoki(t *testing.T) { var seenQueries []string loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -90,11 +178,9 @@ func TestSearchLogsQueriesLoki(t *testing.T) { "service_name": "org.apache.DemoService", "instance": "127.0.0.1:20880", "level": "ERROR", - "trace_id": "trace-1", - "span_id": "span-1", "namespace": "dubbo-system" }, - "values": [["1777110661783444000", "ERROR test log"]] + "values": [["1777110661783444000", "ERROR trace_id=trace-1 span_id=span-1 test log"]] }] } }`)) From 55968cfa395a94d35185029dbcb857b72e710e81 Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Tue, 2 Jun 2026 15:37:12 +0800 Subject: [PATCH 7/8] =?UTF-8?q?feat=EF=BC=9Aadd=20get=5Flog=5Fcapabilities?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../examples/observability}/alloy-values.yaml | 0 pkg/mcp/tools/log/loki.go | 239 +++++++++++++++++- pkg/mcp/tools/log/model.go | 14 + pkg/mcp/tools/log/tools.go | 43 ++++ pkg/mcp/tools/log/tools_test.go | 210 +++++++++++++-- 5 files changed, 475 insertions(+), 31 deletions(-) rename {release/kubernetes/monitoring => docs/examples/observability}/alloy-values.yaml (100%) diff --git a/release/kubernetes/monitoring/alloy-values.yaml b/docs/examples/observability/alloy-values.yaml similarity index 100% rename from release/kubernetes/monitoring/alloy-values.yaml rename to docs/examples/observability/alloy-values.yaml diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go index 89ecf78d2..05bb3b025 100644 --- a/pkg/mcp/tools/log/loki.go +++ b/pkg/mcp/tools/log/loki.go @@ -28,13 +28,44 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/apache/dubbo-admin/pkg/common/bizerror" observabilitycfg "github.com/apache/dubbo-admin/pkg/config/observability" ) -const defaultQueryWindow = time.Hour +const ( + defaultQueryWindow = time.Hour + labelCacheTTL = 5 * time.Minute +) + +var fallbackSelectorPriority = []string{ + "namespace", + "job", + "app", + "appName", + "service_name", + "serviceName", + "service", + "pod", + "container", + "instance", + "instanceName", + "level", +} + +var lokiLabelsCache = struct { + sync.Mutex + items map[string]cachedLokiLabels +}{ + items: map[string]cachedLokiLabels{}, +} + +type cachedLokiLabels struct { + labels map[string]struct{} + expiresAt time.Time +} type lokiClient struct { config observabilitycfg.LogProviderConfig @@ -49,6 +80,12 @@ type lokiQueryRangeResp struct { Error string `json:"error,omitempty"` } +type lokiLabelsResp struct { + Status string `json:"status"` + Data []string `json:"data"` + Error string `json:"error,omitempty"` +} + type lokiStream struct { Stream map[string]string `json:"stream"` Values [][]string `json:"values"` @@ -70,7 +107,8 @@ func (c *lokiClient) search(ctx context.Context, req *SearchLogsReq) (*SearchLog return nil, err } - queries := buildLogQLQueries(req) + labelNames, _ := c.labelNames(ctx, start, end) + queries := buildLogQLQueriesWithLabels(req, labelNames) merged := &SearchLogsResp{SourceEngine: "loki", Logs: make([]LogItem, 0, req.Limit)} seen := map[string]struct{}{} for _, query := range queries { @@ -101,6 +139,41 @@ func (c *lokiClient) search(ctx context.Context, req *SearchLogsReq) (*SearchLog return merged, nil } +func (c *lokiClient) capabilities(ctx context.Context, req *LogCapabilitiesReq) (*LogCapabilitiesResp, error) { + start, end, err := resolveTimeRange(req.StartTime, req.EndTime) + if err != nil { + return nil, err + } + labelNames, err := c.labelNames(ctx, start, end) + if err != nil { + return nil, err + } + + return &LogCapabilitiesResp{ + AvailableLabels: supportedLabels(labelNames), + SupportedFilters: []string{ + "mesh", + "appName", + "serviceName", + "instanceName", + "traceId", + "keywords", + "startTime", + "endTime", + "limit", + }, + LabelFilters: map[string][]string{ + "mesh": matchingLabels([]string{"mesh"}, labelNames), + "appName": matchingLabels([]string{"app", "appName"}, labelNames), + "serviceName": matchingLabels([]string{"service", "serviceName", "service_name"}, labelNames), + "instanceName": matchingLabels([]string{"instance", "instanceName", "pod"}, labelNames), + }, + ContentFilters: []string{"traceId", "keywords"}, + FallbackLabel: fallbackSelectorLabel(labelNames), + SourceEngine: "loki", + }, nil +} + func (c *lokiClient) queryRange(ctx context.Context, query string, start, end time.Time, limit int) ([]LogItem, error) { queryURL, err := c.queryRangeURL(query, start, end, limit) if err != nil { @@ -138,6 +211,80 @@ func (c *lokiClient) queryRange(ctx context.Context, query string, start, end ti return normalizeLokiLogs(lokiResp), nil } +func (c *lokiClient) labelNames(ctx context.Context, start, end time.Time) (map[string]struct{}, error) { + cacheKey := c.labelCacheKey() + now := time.Now() + lokiLabelsCache.Lock() + if cached, ok := lokiLabelsCache.items[cacheKey]; ok && now.Before(cached.expiresAt) { + labels := cloneLabelSet(cached.labels) + lokiLabelsCache.Unlock() + return labels, nil + } + lokiLabelsCache.Unlock() + + labelsURL, err := c.labelsURL(start, end) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, labelsURL, nil) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.InternalError, "failed to create loki labels request") + } + if c.config.Tenant != "" { + httpReq.Header.Set("X-Scope-OrgID", c.config.Tenant) + } + + httpResp, err := c.client.Do(httpReq) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.NetWorkError, "failed to query loki labels") + } + defer httpResp.Body.Close() + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, 4096)) + return nil, bizerror.New(bizerror.NetWorkError, + fmt.Sprintf("loki labels query failed with status %d: %s", httpResp.StatusCode, strings.TrimSpace(string(body)))) + } + + var labelsResp lokiLabelsResp + if err := json.NewDecoder(httpResp.Body).Decode(&labelsResp); err != nil { + return nil, bizerror.Wrap(err, bizerror.JsonError, "failed to decode loki labels response") + } + if labelsResp.Status != "success" { + if labelsResp.Error != "" { + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki labels query failed: %s", labelsResp.Error)) + } + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki labels query returned status %q", labelsResp.Status)) + } + + labels := make(map[string]struct{}, len(labelsResp.Data)) + for _, label := range labelsResp.Data { + labels[label] = struct{}{} + } + + lokiLabelsCache.Lock() + lokiLabelsCache.items[cacheKey] = cachedLokiLabels{ + labels: cloneLabelSet(labels), + expiresAt: now.Add(labelCacheTTL), + } + lokiLabelsCache.Unlock() + return labels, nil +} + +func (c *lokiClient) labelCacheKey() string { + return c.config.Endpoint + "|" + c.config.Tenant +} + +func cloneLabelSet(labels map[string]struct{}) map[string]struct{} { + if labels == nil { + return nil + } + cloned := make(map[string]struct{}, len(labels)) + for label := range labels { + cloned[label] = struct{}{} + } + return cloned +} + // e.g: endpoint: {endpoint}/loki/api/v1/query_range?query={app="order-service"}&start=1717200000000000000&end=1717203600000000000&limit=100&direction=backward func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int) (string, error) { baseURL, err := url.Parse(c.config.Endpoint) @@ -156,8 +303,26 @@ func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int return baseURL.String(), nil } +func (c *lokiClient) labelsURL(start, end time.Time) (string, error) { + baseURL, err := url.Parse(c.config.Endpoint) + if err != nil { + return "", bizerror.Wrap(err, bizerror.ConfigError, "invalid loki endpoint") + } + baseURL.Path = strings.TrimRight(baseURL.Path, "/") + "/loki/api/v1/labels" + + query := baseURL.Query() + query.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + query.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + baseURL.RawQuery = query.Encode() + return baseURL.String(), nil +} + func buildLogQLQueries(req *SearchLogsReq) []string { - selectors := buildStreamSelectors(req) + return buildLogQLQueriesWithLabels(req, nil) +} + +func buildLogQLQueriesWithLabels(req *SearchLogsReq, labelNames map[string]struct{}) []string { + selectors := buildStreamSelectorsWithLabels(req, labelNames) queries := make([]string, 0, len(selectors)) for _, selector := range selectors { query := selector @@ -173,21 +338,25 @@ func buildLogQLQueries(req *SearchLogsReq) []string { } func buildStreamSelectors(req *SearchLogsReq) []string { + return buildStreamSelectorsWithLabels(req, nil) +} + +func buildStreamSelectorsWithLabels(req *SearchLogsReq, labelNames map[string]struct{}) []string { labelGroups := make([][]string, 0, 4) if req.Mesh != "" { - labelGroups = append(labelGroups, []string{labelMatcher("mesh", req.Mesh)}) + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"mesh"}, req.Mesh, labelNames)) } if req.AppName != "" { - labelGroups = append(labelGroups, labelMatchers([]string{"app", "appName"}, req.AppName)) + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"app", "appName"}, req.AppName, labelNames)) } if req.ServiceName != "" { - labelGroups = append(labelGroups, labelMatchers([]string{"service", "serviceName", "service_name"}, req.ServiceName)) + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"service", "serviceName", "service_name"}, req.ServiceName, labelNames)) } if req.InstanceName != "" { - labelGroups = append(labelGroups, labelMatchers([]string{"instance", "instanceName", "pod"}, req.InstanceName)) + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"instance", "instanceName", "pod"}, req.InstanceName, labelNames)) } if len(labelGroups) == 0 { - return []string{`{job=~".+"}`} + return []string{fmt.Sprintf("{%s=~%s}", fallbackSelectorLabel(labelNames), strconv.Quote(".+"))} } // Cartesian product @@ -214,13 +383,65 @@ func buildStreamSelectors(req *SearchLogsReq) []string { } func labelMatchers(names []string, value string) []string { + return labelMatchersWithLabels(names, value, nil) +} + +func labelMatchersWithLabels(names []string, value string, labelNames map[string]struct{}) []string { + selected := selectExistingLabels(names, labelNames) matchers := make([]string, 0, len(names)) - for _, name := range names { + for _, name := range selected { matchers = append(matchers, labelMatcher(name, value)) } return matchers } +func selectExistingLabels(names []string, labelNames map[string]struct{}) []string { + if len(labelNames) == 0 { + return names + } + selected := make([]string, 0, len(names)) + for _, name := range names { + if _, ok := labelNames[name]; ok { + selected = append(selected, name) + } + } + if len(selected) == 0 { + return names + } + return selected +} + +func matchingLabels(names []string, labelNames map[string]struct{}) []string { + selected := make([]string, 0, len(names)) + for _, name := range names { + if _, ok := labelNames[name]; ok { + selected = append(selected, name) + } + } + return selected +} + +func fallbackSelectorLabel(labelNames map[string]struct{}) string { + if len(labelNames) == 0 { + return "namespace" + } + for _, label := range fallbackSelectorPriority { + if _, ok := labelNames[label]; ok { + return label + } + } + return "namespace" +} + +func supportedLabels(labelNames map[string]struct{}) []string { + labels := make([]string, 0, len(labelNames)) + for label := range labelNames { + labels = append(labels, label) + } + sort.Strings(labels) + return labels +} + func labelMatcher(name, value string) string { return fmt.Sprintf("%s=%s", name, strconv.Quote(value)) } diff --git a/pkg/mcp/tools/log/model.go b/pkg/mcp/tools/log/model.go index 4315d94e8..bbf63656d 100644 --- a/pkg/mcp/tools/log/model.go +++ b/pkg/mcp/tools/log/model.go @@ -52,6 +52,20 @@ type AnalyzeErrorLogsReq struct { SearchLogsReq } +type LogCapabilitiesReq struct { + StartTime string `json:"startTime,omitempty"` + EndTime string `json:"endTime,omitempty"` +} + +type LogCapabilitiesResp struct { + AvailableLabels []string `json:"availableLabels"` + SupportedFilters []string `json:"supportedFilters"` + LabelFilters map[string][]string `json:"labelFilters"` + ContentFilters []string `json:"contentFilters"` + FallbackLabel string `json:"fallbackLabel"` + SourceEngine string `json:"sourceEngine"` +} + type ErrorPattern struct { Pattern string `json:"pattern"` Count int `json:"count"` diff --git a/pkg/mcp/tools/log/tools.go b/pkg/mcp/tools/log/tools.go index 7fabd6dea..6a37e284c 100644 --- a/pkg/mcp/tools/log/tools.go +++ b/pkg/mcp/tools/log/tools.go @@ -52,6 +52,16 @@ func (r *LogRegistrar) RegisterTools(reg *registry.Registry) { }, Handler: AnalyzeErrorLogs, }) + + reg.Register(types.ToolDef{ + Name: "get_log_capabilities", + Description: "获取日志查询能力,返回 Loki 当前可用 labels 以及查询参数到 labels 的映射", + InputSchema: types.InputSchema{ + Type: "object", + Properties: logCapabilitiesProperties(), + }, + Handler: GetLogCapabilities, + }) } func SearchLogs(ctx consolectx.Context, args map[string]any) (*types.ToolResult, error) { @@ -82,6 +92,18 @@ func AnalyzeErrorLogs(ctx consolectx.Context, args map[string]any) (*types.ToolR return basetools.JsonResult(analyzeErrors(searchResp.Logs, searchResp.SourceEngine)) } +func GetLogCapabilities(ctx consolectx.Context, args map[string]any) (*types.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return basetools.ErrorResult(err), nil + } + resp, err := client.capabilities(requestContext(ctx), buildLogCapabilitiesReq(args)) + if err != nil { + return basetools.ErrorResult(err), nil + } + return basetools.JsonResult(resp) +} + func lokiClientFromContext(ctx consolectx.Context) (*lokiClient, error) { if ctx == nil || ctx.Config().Observability == nil || ctx.Config().Observability.Logs == nil { return nil, fmt.Errorf("loki log provider is not configured") @@ -115,6 +137,14 @@ func buildSearchLogsReq(args map[string]any) *SearchLogsReq { } } +func buildLogCapabilitiesReq(args map[string]any) *LogCapabilitiesReq { + helper := basetools.NewArgsHelper(args) + return &LogCapabilitiesReq{ + StartTime: helper.GetString("startTime", ""), + EndTime: helper.GetString("endTime", ""), + } +} + func logSearchProperties() map[string]types.PropertyDef { return map[string]types.PropertyDef{ "mesh": { @@ -157,4 +187,17 @@ func logSearchProperties() map[string]types.PropertyDef { } } +func logCapabilitiesProperties() map[string]types.PropertyDef { + return map[string]types.PropertyDef{ + "startTime": { + Type: "string", + Description: "开始时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "endTime": { + Type: "string", + Description: "结束时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + } +} + var _ registry.ToolRegistrar = (*LogRegistrar)(nil) diff --git a/pkg/mcp/tools/log/tools_test.go b/pkg/mcp/tools/log/tools_test.go index 4e84c54d1..bde9de15e 100644 --- a/pkg/mcp/tools/log/tools_test.go +++ b/pkg/mcp/tools/log/tools_test.go @@ -37,10 +37,10 @@ func TestLogRegistrarRegistersExpectedTools(t *testing.T) { reg := registry.NewRegistry() (&LogRegistrar{}).RegisterTools(reg) - if got := reg.Count(); got != 2 { - t.Fatalf("expected 2 log tools, got %d", got) + if got := reg.Count(); got != 3 { + t.Fatalf("expected 3 log tools, got %d", got) } - for _, name := range []string{"search_logs", "analyze_error_logs"} { + for _, name := range []string{"search_logs", "analyze_error_logs", "get_log_capabilities"} { tool, ok := reg.Get(name) if !ok { t.Fatalf("tool %s was not registered", name) @@ -51,6 +51,60 @@ func TestLogRegistrarRegistersExpectedTools(t *testing.T) { } } +func TestGetLogCapabilitiesReturnsAvailableLabelsAndResolvedFilters(t *testing.T) { + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/labels" { + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"status":"success","data":["pod","namespace","service_name","mesh"]}`)) + })) + defer loki.Close() + + result, err := GetLogCapabilities(newLogToolTestContext(loki.URL), map[string]any{ + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("GetLogCapabilities returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("GetLogCapabilities returned error result: %s", result.Content[0].Text) + } + + var payload LogCapabilitiesResp + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("failed to decode tool result: %v", err) + } + expectedLabels := []string{"mesh", "namespace", "pod", "service_name"} + if len(payload.AvailableLabels) != len(expectedLabels) { + t.Fatalf("expected labels %v, got %v", expectedLabels, payload.AvailableLabels) + } + for i := range expectedLabels { + if payload.AvailableLabels[i] != expectedLabels[i] { + t.Fatalf("label[%d] expected %q, got %q", i, expectedLabels[i], payload.AvailableLabels[i]) + } + } + if payload.FallbackLabel != "namespace" { + t.Fatalf("expected fallbackLabel namespace, got %q", payload.FallbackLabel) + } + if got := payload.LabelFilters["serviceName"]; len(got) != 1 || got[0] != "service_name" { + t.Fatalf("expected serviceName to resolve to service_name, got %v", got) + } + if got := payload.LabelFilters["instanceName"]; len(got) != 1 || got[0] != "pod" { + t.Fatalf("expected instanceName to resolve to pod, got %v", got) + } + if got := payload.LabelFilters["appName"]; len(got) != 0 { + t.Fatalf("expected appName to have no available labels, got %v", got) + } + if len(payload.ContentFilters) != 2 || payload.ContentFilters[0] != "traceId" || payload.ContentFilters[1] != "keywords" { + t.Fatalf("unexpected content filters: %v", payload.ContentFilters) + } + if payload.SourceEngine != "loki" { + t.Fatalf("expected sourceEngine loki, got %q", payload.SourceEngine) + } +} + func TestBuildLogQLQueriesUsesCommonLabelAliases(t *testing.T) { queries := buildLogQLQueries(&SearchLogsReq{ ServiceName: "org.apache.DemoService", @@ -94,6 +148,24 @@ func TestBuildLogQLQueriesFiltersTraceIDFromLogContent(t *testing.T) { } } +func TestBuildLogQLQueriesUsesNamespaceSelectorWhenOnlyTraceIDIsProvided(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + TraceID: "trace-1", + }) + + expected := []string{ + `{namespace=~".+"} |= "trace-1"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + func TestNormalizeLokiLogsExtractsTraceContextFromMessage(t *testing.T) { logs := normalizeLokiLogs(lokiQueryRangeResp{ Status: "success", @@ -163,27 +235,31 @@ func TestNormalizeLokiLogsParsesDubboGoJSONLog(t *testing.T) { func TestSearchLogsQueriesLoki(t *testing.T) { var seenQueries []string loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/loki/api/v1/query_range" { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + http.Error(w, "labels unavailable", http.StatusInternalServerError) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [{ + "stream": { + "app": "demo-provider", + "service_name": "org.apache.DemoService", + "instance": "127.0.0.1:20880", + "level": "ERROR", + "namespace": "dubbo-system" + }, + "values": [["1777110661783444000", "ERROR trace_id=trace-1 span_id=span-1 test log"]] + }] + } + }`)) + default: t.Fatalf("unexpected loki path: %s", r.URL.Path) } - seenQueries = append(seenQueries, r.URL.Query().Get("query")) - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{ - "status": "success", - "data": { - "resultType": "streams", - "result": [{ - "stream": { - "app": "demo-provider", - "service_name": "org.apache.DemoService", - "instance": "127.0.0.1:20880", - "level": "ERROR", - "namespace": "dubbo-system" - }, - "values": [["1777110661783444000", "ERROR trace_id=trace-1 span_id=span-1 test log"]] - }] - } - }`)) })) defer loki.Close() @@ -217,6 +293,96 @@ func TestSearchLogsQueriesLoki(t *testing.T) { } } +func TestSearchLogsUsesLokiLabelsToReduceAliasQueries(t *testing.T) { + var seenQueries []string + labelsRequested := false + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + labelsRequested = true + _, _ = w.Write([]byte(`{"status":"success","data":["namespace","service_name","pod"]}`)) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "result": [{ + "stream": {"service_name": "org.apache.DemoService", "namespace": "dubbo-system"}, + "values": [["1777110661783444000", "ERROR trace_id=trace-1 test log"]] + }] + } + }`)) + default: + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "serviceName": "org.apache.DemoService", + "keywords": "ERROR", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + if !labelsRequested { + t.Fatal("expected labels endpoint to be requested") + } + expected := []string{`{service_name="org.apache.DemoService"} |= "ERROR"`} + if len(seenQueries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(seenQueries), seenQueries) + } + for i := range expected { + if seenQueries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], seenQueries[i]) + } + } +} + +func TestSearchLogsUsesAvailableFallbackLabelWhenOnlyTraceIDIsProvided(t *testing.T) { + var seenQueries []string + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + _, _ = w.Write([]byte(`{"status":"success","data":["pod","job"]}`)) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{"status":"success","data":{"result":[]}}`)) + default: + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "traceId": "trace-1", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + expected := []string{`{job=~".+"} |= "trace-1"`} + if len(seenQueries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(seenQueries), seenQueries) + } + for i := range expected { + if seenQueries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], seenQueries[i]) + } + } +} + func TestAnalyzeErrorsGroupsPatterns(t *testing.T) { resp := analyzeErrors([]LogItem{ {Timestamp: "2026-04-01T00:00:00Z", Severity: "ERROR", Message: "Error 500 for request 123"}, From c855c218e3d1fe674d8872f2ecd6213f15a61f6e Mon Sep 17 00:00:00 2001 From: Oxidaner <18622412361@163.com> Date: Sat, 13 Jun 2026 20:22:19 +0800 Subject: [PATCH 8/8] =?UTF-8?q?feat=EF=BC=9AMake=20modifications=20based?= =?UTF-8?q?=20on=20copilot's=20suggestions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/config/observability/logs.go | 6 +++++- pkg/mcp/tools/log/analyzer.go | 7 ++++++- pkg/mcp/tools/log/loki.go | 22 +++++++++++++++++----- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pkg/config/observability/logs.go b/pkg/config/observability/logs.go index 0715efdda..d9f011822 100644 --- a/pkg/config/observability/logs.go +++ b/pkg/config/observability/logs.go @@ -66,9 +66,13 @@ func (c *LogsConfig) Validate() error { if strutil.IsBlank(provider.Endpoint) { return bizerror.New(bizerror.ConfigError, "log provider endpoint is required") } - if _, err := url.Parse(provider.Endpoint); err != nil { + parsed, err := url.Parse(provider.Endpoint) + if err != nil { return bizerror.Wrap(err, bizerror.ConfigError, fmt.Sprintf("invalid log provider endpoint: %s", provider.Endpoint)) } + if parsed.Scheme == "" || parsed.Host == "" { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("invalid log provider endpoint: %s", provider.Endpoint)) + } } if !foundDefault { return bizerror.New(bizerror.ConfigError, fmt.Sprintf("default log provider %q is not configured", c.DefaultProvider)) diff --git a/pkg/mcp/tools/log/analyzer.go b/pkg/mcp/tools/log/analyzer.go index 669c20459..f09fe34eb 100644 --- a/pkg/mcp/tools/log/analyzer.go +++ b/pkg/mcp/tools/log/analyzer.go @@ -44,7 +44,12 @@ func analyzeErrors(logs []LogItem, sourceEngine string) *AnalyzeErrorLogsResp { patternsByName[patternName] = pattern } pattern.Count++ - pattern.LastSeen = item.Timestamp + if pattern.FirstSeen == "" || item.Timestamp < pattern.FirstSeen { + pattern.FirstSeen = item.Timestamp + } + if pattern.LastSeen == "" || item.Timestamp > pattern.LastSeen { + pattern.LastSeen = item.Timestamp + } if len(pattern.Examples) < 3 { pattern.Examples = append(pattern.Examples, item) } diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go index 05bb3b025..b9318bc5a 100644 --- a/pkg/mcp/tools/log/loki.go +++ b/pkg/mcp/tools/log/loki.go @@ -499,9 +499,9 @@ func normalizeLokiLogs(resp lokiQueryRangeResp) []LogItem { InstanceName: firstLabel(stream.Stream, "instance", "instanceName", "pod"), Severity: firstNonEmpty(extractLogField(raw, "level", "severity"), firstLabel(stream.Stream, "level", "severity", "detected_level")), Message: message, - TraceID: extractLogField(raw, "trace_id", "traceId", "traceid"), - SpanID: extractLogField(raw, "span_id", "spanId", "spanid"), - TraceFlags: extractLogField(raw, "trace_flags", "traceFlags", "traceflags"), + TraceID: firstNonEmpty(extractLogField(raw, "trace_id", "traceId", "traceid"), firstLabel(stream.Stream, "trace_id", "traceId", "traceid")), + SpanID: firstNonEmpty(extractLogField(raw, "span_id", "spanId", "spanid"), firstLabel(stream.Stream, "span_id", "spanId", "spanid")), + TraceFlags: firstNonEmpty(extractLogField(raw, "trace_flags", "traceFlags", "traceflags"), firstLabel(stream.Stream, "trace_flags", "traceFlags", "traceflags")), Attributes: extraLabels(stream.Stream), Raw: raw, }) @@ -577,9 +577,20 @@ func stringifyLogField(value any) string { } } +// cache the compiled regex per key and reuse it. +var textLogFieldPatterns sync.Map // map[string]*regexp.Regexp +func textLogFieldPattern(key string) *regexp.Regexp { + if re, ok := textLogFieldPatterns.Load(key); ok { + return re.(*regexp.Regexp) + } + compiled := regexp.MustCompile(`(?i)(?:^|[\s{,])"?` + regexp.QuoteMeta(key) + `"?\s*[:=]\s*"?([^"\s,}]+)`) + actual, _ := textLogFieldPatterns.LoadOrStore(key, compiled) + return actual.(*regexp.Regexp) +} + func extractTextLogField(message string, keys ...string) string { for _, key := range keys { - pattern := regexp.MustCompile(`(?i)(?:^|[\s{,])"?` + regexp.QuoteMeta(key) + `"?\s*[:=]\s*"?([^"\s,}]+)`) + pattern := textLogFieldPattern(key) if matches := pattern.FindStringSubmatch(message); len(matches) == 2 { return matches[1] } @@ -593,7 +604,8 @@ func extraLabels(labels map[string]string) map[string]string { for key, value := range labels { switch key { case "mesh", "app", "appName", "service", "serviceName", "service_name", "instance", "instanceName", - "pod", "level", "severity", "detected_level", "trace_id", "traceId", "traceid", "span_id", "spanId", "spanid": + "pod", "level", "severity", "detected_level", "trace_id", "traceId", "traceid", "span_id", "spanId", + "spanid", "trace_flags", "traceFlags", "traceflags": continue default: attrs[key] = value