diff --git a/app/dubbo-admin/dubbo-admin.yaml b/app/dubbo-admin/dubbo-admin.yaml index 1e74b47ea..b348772ce 100644 --- a/app/dubbo-admin/dubbo-admin.yaml +++ b/app/dubbo-admin/dubbo-admin.yaml @@ -29,12 +29,23 @@ log: # [Optional] config for observability. observability: # [Optional] config for grafana and prometheus. - grafana: http://101.34.253.152:30300 - # [Optional] config for prometheus. - prometheus: http://101.34.253.152:30900/ - -# [Optional] config for console. -console: + grafana: http://101.34.253.152:30300 + # [Optional] config for prometheus. + prometheus: http://101.34.253.152:30900/ + # [Optional] config for log providers used by MCP log tools. + # defaultProvider must match one provider name in providers. + # Currently only Loki is supported. endpoint should be the Loki HTTP API root. + # tenant is optional and will be sent as X-Scope-OrgID for multi-tenant Loki. + # logs: + # defaultProvider: loki-main + # providers: + # - name: loki-main + # type: loki + # endpoint: http://localhost:3100 + # tenant: "" + +# [Optional] config for console. +console: # [Optional] port of http server, default is 8888. port: 8888 # [Optional] running mode of Gin, options are debug, release, test, default is release. diff --git a/docs/examples/observability/alloy-values.yaml b/docs/examples/observability/alloy-values.yaml new file mode 100644 index 000000000..643025063 --- /dev/null +++ b/docs/examples/observability/alloy-values.yaml @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Example values for the Grafana Alloy Helm chart. +# The generated Loki labels match the MCP log tools: +# - appName filters app/appName labels +# - serviceName filters service/serviceName/service_name labels +# - instanceName filters instance/instanceName/pod labels +# - mesh filters the mesh label +# +# traceId is intentionally not promoted to a Loki label in this example. +# Trace IDs are usually high-cardinality values; use the MCP keywords filter +# or add custom log parsing if your deployment requires trace-id lookup. + +global: + image: + registry: m.daocloud.io/docker.io + +image: + registry: m.daocloud.io/docker.io + repository: grafana/alloy + pullPolicy: IfNotPresent + +controller: + type: deployment + replicas: 1 + +configReloader: + enabled: false + +rbac: + create: true + +serviceAccount: + create: true + +alloy: + enableReporting: false + + mounts: + varlog: false + dockercontainers: false + + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 300m + memory: 512Mi + + configMap: + create: true + content: |- + logging { + level = "info" + format = "logfmt" + } + + discovery.kubernetes "pods" { + role = "pod" + } + + discovery.relabel "dubbo_pod_logs" { + targets = discovery.kubernetes.pods.targets + + rule { + source_labels = ["__meta_kubernetes_namespace"] + regex = "default|dubbo-system|dubbo-samples-shop|logging" + action = "keep" + } + + rule { + source_labels = ["__meta_kubernetes_namespace"] + target_label = "namespace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_name"] + target_label = "pod" + } + + rule { + source_labels = ["__meta_kubernetes_pod_name"] + target_label = "instance" + } + + rule { + source_labels = ["__meta_kubernetes_pod_container_name"] + target_label = "container" + } + + rule { + source_labels = ["__meta_kubernetes_pod_container_name"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app_kubernetes_io_name"] + regex = "(.+)" + target_label = "service_name" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + regex = "(.+)" + target_label = "app" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_pod_label_app_kubernetes_io_name"] + regex = "(.+)" + target_label = "app" + action = "replace" + } + + rule { + source_labels = ["__meta_kubernetes_namespace", "__meta_kubernetes_pod_container_name"] + separator = "/" + target_label = "job" + } + } + + loki.source.kubernetes "dubbo_pod_logs" { + targets = discovery.relabel.dubbo_pod_logs.output + forward_to = [loki.process.dubbo_logs.receiver] + } + + loki.process "dubbo_logs" { + forward_to = [loki.write.local.receiver] + + stage.decolorize {} + + stage.static_labels { + values = { + cluster = "k8s1.28.2", + mesh = "nacos2.5", + } + } + } + + loki.write "local" { + endpoint { + url = "http://loki.logging.svc.cluster.local:3100/loki/api/v1/push" + } + } diff --git a/pkg/config/observability/config.go b/pkg/config/observability/config.go index 879e22998..920e8a31c 100644 --- a/pkg/config/observability/config.go +++ b/pkg/config/observability/config.go @@ -33,6 +33,8 @@ type Config struct { Grafana string `json:"grafana" yaml:"grafana"` // Prometheus is the url of prometheus Prometheus string `json:"prometheus" yaml:"prometheus"` + // Logs configures the log query provider. + Logs *LogsConfig `json:"logs,omitempty" yaml:"logs,omitempty"` GrafanaBaseURL *url.URL `json:"-" yaml:"-"` PrometheusBaseURL *url.URL `json:"-" yaml:"-"` @@ -55,6 +57,11 @@ func (c *Config) Validate() error { } c.GrafanaBaseURL = grafanaBaseURL } + if c.Logs != nil { + if err := c.Logs.Validate(); err != nil { + return err + } + } return nil } diff --git a/pkg/config/observability/logs.go b/pkg/config/observability/logs.go new file mode 100644 index 000000000..d9f011822 --- /dev/null +++ b/pkg/config/observability/logs.go @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observability + +import ( + "fmt" + "net/url" + + "github.com/duke-git/lancet/v2/strutil" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" +) + +type LogProviderType string + +const ( + LogProviderLoki LogProviderType = "loki" +) + +type LogsConfig struct { + DefaultProvider string `json:"defaultProvider" yaml:"defaultProvider"` + Providers []LogProviderConfig `json:"providers" yaml:"providers"` +} + +type LogProviderConfig struct { + Name string `json:"name" yaml:"name"` + Type LogProviderType `json:"type" yaml:"type"` + Endpoint string `json:"endpoint" yaml:"endpoint"` + Tenant string `json:"tenant,omitempty" yaml:"tenant,omitempty"` +} + +func (c *LogsConfig) Validate() error { + if c == nil || len(c.Providers) == 0 { + return nil + } + if strutil.IsBlank(c.DefaultProvider) { + return bizerror.New(bizerror.ConfigError, "default log provider is required") + } + + foundDefault := false + for _, provider := range c.Providers { + if strutil.IsBlank(provider.Name) { + return bizerror.New(bizerror.ConfigError, "log provider name is required") + } + if provider.Name == c.DefaultProvider { + foundDefault = true + } + if provider.Type != LogProviderLoki { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("unsupported log provider type: %s", provider.Type)) + } + if strutil.IsBlank(provider.Endpoint) { + return bizerror.New(bizerror.ConfigError, "log provider endpoint is required") + } + parsed, err := url.Parse(provider.Endpoint) + if err != nil { + return bizerror.Wrap(err, bizerror.ConfigError, fmt.Sprintf("invalid log provider endpoint: %s", provider.Endpoint)) + } + if parsed.Scheme == "" || parsed.Host == "" { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("invalid log provider endpoint: %s", provider.Endpoint)) + } + } + if !foundDefault { + return bizerror.New(bizerror.ConfigError, fmt.Sprintf("default log provider %q is not configured", c.DefaultProvider)) + } + return nil +} + +func (c *LogsConfig) Default() (LogProviderConfig, bool) { + if c == nil { + return LogProviderConfig{}, false + } + for _, provider := range c.Providers { + if provider.Name == c.DefaultProvider { + return provider, true + } + } + return LogProviderConfig{}, false +} diff --git a/pkg/console/component.go b/pkg/console/component.go index d74289bc3..ea4be76c5 100644 --- a/pkg/console/component.go +++ b/pkg/console/component.go @@ -48,10 +48,10 @@ func init() { } type consoleWebServer struct { - Engine *gin.Engine - cfg *console.Config - cs consolectx.Context - mcpPath string // MCP端点路径,用于auth中间件跳过认证 + Engine *gin.Engine + cfg *console.Config + cs consolectx.Context + mcpPath string // MCP端点路径,用于auth中间件跳过认证 mcpAPIKey string // MCP API密钥,用于认证 } diff --git a/pkg/mcp/register.go b/pkg/mcp/register.go index 641d567ff..8f0d4c795 100644 --- a/pkg/mcp/register.go +++ b/pkg/mcp/register.go @@ -20,6 +20,7 @@ package mcp import ( "github.com/apache/dubbo-admin/pkg/mcp/common" "github.com/apache/dubbo-admin/pkg/mcp/tools" + logtools "github.com/apache/dubbo-admin/pkg/mcp/tools/log" ) // RegisterTools 注册所有 MCP 工具 @@ -310,4 +311,35 @@ func RegisterTools(server *Server) { }, Handler: tools.GetApplicationServices, }) + + logSearchProperties := logtools.LogSearchProperties() + server.RegisterTool(&common.ToolDef{ + Name: "search_logs", + Description: "查询 Dubbo 服务日志,支持按应用、服务、实例、TraceID 和关键字过滤", + InputSchema: common.InputSchema{ + Type: "object", + Properties: logSearchProperties, + }, + Handler: logtools.SearchLogs, + }) + + server.RegisterTool(&common.ToolDef{ + Name: "analyze_error_logs", + Description: "分析错误日志并按错误模式聚合", + InputSchema: common.InputSchema{ + Type: "object", + Properties: logSearchProperties, + }, + Handler: logtools.AnalyzeErrorLogs, + }) + + server.RegisterTool(&common.ToolDef{ + Name: "get_log_capabilities", + Description: "获取日志查询能力,返回 Loki 当前可用 labels 以及查询参数到 labels 的映射", + InputSchema: common.InputSchema{ + Type: "object", + Properties: logtools.LogCapabilitiesProperties(), + }, + Handler: logtools.GetLogCapabilities, + }) } diff --git a/pkg/mcp/tools/log/analyzer.go b/pkg/mcp/tools/log/analyzer.go new file mode 100644 index 000000000..f09fe34eb --- /dev/null +++ b/pkg/mcp/tools/log/analyzer.go @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "fmt" + "regexp" + "sort" + "strings" +) + +var volatileTokenPattern = regexp.MustCompile(`(?i)\b[0-9a-f]{8,}\b|\b\d+\b`) + +func analyzeErrors(logs []LogItem, sourceEngine string) *AnalyzeErrorLogsResp { + patternsByName := map[string]*ErrorPattern{} + for _, item := range logs { + if !isErrorLog(item) { + continue + } + patternName := normalizeMessagePattern(item.Message) + pattern := patternsByName[patternName] + if pattern == nil { + pattern = &ErrorPattern{ + Pattern: patternName, + Example: item.Message, + FirstSeen: item.Timestamp, + LastSeen: item.Timestamp, + } + patternsByName[patternName] = pattern + } + pattern.Count++ + if pattern.FirstSeen == "" || item.Timestamp < pattern.FirstSeen { + pattern.FirstSeen = item.Timestamp + } + if pattern.LastSeen == "" || item.Timestamp > pattern.LastSeen { + pattern.LastSeen = item.Timestamp + } + if len(pattern.Examples) < 3 { + pattern.Examples = append(pattern.Examples, item) + } + } + + patterns := make([]ErrorPattern, 0, len(patternsByName)) + total := 0 + for _, pattern := range patternsByName { + total += pattern.Count + patterns = append(patterns, *pattern) + } + sort.Slice(patterns, func(i, j int) bool { + if patterns[i].Count == patterns[j].Count { + return patterns[i].Pattern < patterns[j].Pattern + } + return patterns[i].Count > patterns[j].Count + }) + + return &AnalyzeErrorLogsResp{ + Summary: fmt.Sprintf("found %d error log entries across %d patterns", total, len(patterns)), + TotalErrors: total, + Patterns: patterns, + SourceEngine: sourceEngine, + } +} + +func isErrorLog(item LogItem) bool { + if strings.EqualFold(item.Severity, "error") { + return true + } + message := strings.ToLower(item.Message) + return strings.Contains(message, "error") || strings.Contains(message, "exception") || strings.Contains(message, "failed") +} + +func normalizeMessagePattern(message string) string { + pattern := volatileTokenPattern.ReplaceAllString(message, "?") + pattern = strings.Join(strings.Fields(pattern), " ") + if pattern == "" { + return "unknown" + } + return pattern +} diff --git a/pkg/mcp/tools/log/loki.go b/pkg/mcp/tools/log/loki.go new file mode 100644 index 000000000..b9318bc5a --- /dev/null +++ b/pkg/mcp/tools/log/loki.go @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + observabilitycfg "github.com/apache/dubbo-admin/pkg/config/observability" +) + +const ( + defaultQueryWindow = time.Hour + labelCacheTTL = 5 * time.Minute +) + +var fallbackSelectorPriority = []string{ + "namespace", + "job", + "app", + "appName", + "service_name", + "serviceName", + "service", + "pod", + "container", + "instance", + "instanceName", + "level", +} + +var lokiLabelsCache = struct { + sync.Mutex + items map[string]cachedLokiLabels +}{ + items: map[string]cachedLokiLabels{}, +} + +type cachedLokiLabels struct { + labels map[string]struct{} + expiresAt time.Time +} + +type lokiClient struct { + config observabilitycfg.LogProviderConfig + client *http.Client +} + +type lokiQueryRangeResp struct { + Status string `json:"status"` + Data struct { + Result []lokiStream `json:"result"` + } `json:"data"` + Error string `json:"error,omitempty"` +} + +type lokiLabelsResp struct { + Status string `json:"status"` + Data []string `json:"data"` + Error string `json:"error,omitempty"` +} + +type lokiStream struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +func newLokiClient(cfg observabilitycfg.LogProviderConfig) *lokiClient { + return &lokiClient{ + config: cfg, + client: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (c *lokiClient) search(ctx context.Context, req *SearchLogsReq) (*SearchLogsResp, error) { + if req.Limit <= 0 { + req.Limit = defaultLogLimit + } + start, end, err := resolveTimeRange(req.StartTime, req.EndTime) + if err != nil { + return nil, err + } + + labelNames, _ := c.labelNames(ctx, start, end) + queries := buildLogQLQueriesWithLabels(req, labelNames) + merged := &SearchLogsResp{SourceEngine: "loki", Logs: make([]LogItem, 0, req.Limit)} + seen := map[string]struct{}{} + for _, query := range queries { + logs, err := c.queryRange(ctx, query, start, end, req.Limit) + if err != nil { + return nil, err + } + // remove duplicates + for _, item := range logs { + key := dedupeKey(item) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + merged.Logs = append(merged.Logs, item) + if len(merged.Logs) >= req.Limit { + break + } + } + if len(merged.Logs) >= req.Limit { + break + } + } + + sort.SliceStable(merged.Logs, func(i, j int) bool { + return merged.Logs[i].Timestamp > merged.Logs[j].Timestamp + }) + return merged, nil +} + +func (c *lokiClient) capabilities(ctx context.Context, req *LogCapabilitiesReq) (*LogCapabilitiesResp, error) { + start, end, err := resolveTimeRange(req.StartTime, req.EndTime) + if err != nil { + return nil, err + } + labelNames, err := c.labelNames(ctx, start, end) + if err != nil { + return nil, err + } + + return &LogCapabilitiesResp{ + AvailableLabels: supportedLabels(labelNames), + SupportedFilters: []string{ + "mesh", + "appName", + "serviceName", + "instanceName", + "traceId", + "keywords", + "startTime", + "endTime", + "limit", + }, + LabelFilters: map[string][]string{ + "mesh": matchingLabels([]string{"mesh"}, labelNames), + "appName": matchingLabels([]string{"app", "appName"}, labelNames), + "serviceName": matchingLabels([]string{"service", "serviceName", "service_name"}, labelNames), + "instanceName": matchingLabels([]string{"instance", "instanceName", "pod"}, labelNames), + }, + ContentFilters: []string{"traceId", "keywords"}, + FallbackLabel: fallbackSelectorLabel(labelNames), + SourceEngine: "loki", + }, nil +} + +func (c *lokiClient) queryRange(ctx context.Context, query string, start, end time.Time, limit int) ([]LogItem, error) { + queryURL, err := c.queryRangeURL(query, start, end, limit) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, queryURL, nil) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.InternalError, "failed to create loki query request") + } + if c.config.Tenant != "" { + httpReq.Header.Set("X-Scope-OrgID", c.config.Tenant) + } + + httpResp, err := c.client.Do(httpReq) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.NetWorkError, "failed to query loki") + } + defer httpResp.Body.Close() + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, 4096)) + return nil, bizerror.New(bizerror.NetWorkError, + fmt.Sprintf("loki query failed with status %d: %s", httpResp.StatusCode, strings.TrimSpace(string(body)))) + } + + var lokiResp lokiQueryRangeResp + if err := json.NewDecoder(httpResp.Body).Decode(&lokiResp); err != nil { + return nil, bizerror.Wrap(err, bizerror.JsonError, "failed to decode loki query response") + } + if lokiResp.Status != "success" { + if lokiResp.Error != "" { + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki query failed: %s", lokiResp.Error)) + } + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki query returned status %q", lokiResp.Status)) + } + return normalizeLokiLogs(lokiResp), nil +} + +func (c *lokiClient) labelNames(ctx context.Context, start, end time.Time) (map[string]struct{}, error) { + cacheKey := c.labelCacheKey() + now := time.Now() + lokiLabelsCache.Lock() + if cached, ok := lokiLabelsCache.items[cacheKey]; ok && now.Before(cached.expiresAt) { + labels := cloneLabelSet(cached.labels) + lokiLabelsCache.Unlock() + return labels, nil + } + lokiLabelsCache.Unlock() + + labelsURL, err := c.labelsURL(start, end) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, labelsURL, nil) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.InternalError, "failed to create loki labels request") + } + if c.config.Tenant != "" { + httpReq.Header.Set("X-Scope-OrgID", c.config.Tenant) + } + + httpResp, err := c.client.Do(httpReq) + if err != nil { + return nil, bizerror.Wrap(err, bizerror.NetWorkError, "failed to query loki labels") + } + defer httpResp.Body.Close() + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, 4096)) + return nil, bizerror.New(bizerror.NetWorkError, + fmt.Sprintf("loki labels query failed with status %d: %s", httpResp.StatusCode, strings.TrimSpace(string(body)))) + } + + var labelsResp lokiLabelsResp + if err := json.NewDecoder(httpResp.Body).Decode(&labelsResp); err != nil { + return nil, bizerror.Wrap(err, bizerror.JsonError, "failed to decode loki labels response") + } + if labelsResp.Status != "success" { + if labelsResp.Error != "" { + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki labels query failed: %s", labelsResp.Error)) + } + return nil, bizerror.New(bizerror.NetWorkError, fmt.Sprintf("loki labels query returned status %q", labelsResp.Status)) + } + + labels := make(map[string]struct{}, len(labelsResp.Data)) + for _, label := range labelsResp.Data { + labels[label] = struct{}{} + } + + lokiLabelsCache.Lock() + lokiLabelsCache.items[cacheKey] = cachedLokiLabels{ + labels: cloneLabelSet(labels), + expiresAt: now.Add(labelCacheTTL), + } + lokiLabelsCache.Unlock() + return labels, nil +} + +func (c *lokiClient) labelCacheKey() string { + return c.config.Endpoint + "|" + c.config.Tenant +} + +func cloneLabelSet(labels map[string]struct{}) map[string]struct{} { + if labels == nil { + return nil + } + cloned := make(map[string]struct{}, len(labels)) + for label := range labels { + cloned[label] = struct{}{} + } + return cloned +} + +// e.g: endpoint: {endpoint}/loki/api/v1/query_range?query={app="order-service"}&start=1717200000000000000&end=1717203600000000000&limit=100&direction=backward +func (c *lokiClient) queryRangeURL(logQL string, start, end time.Time, limit int) (string, error) { + baseURL, err := url.Parse(c.config.Endpoint) + if err != nil { + return "", bizerror.Wrap(err, bizerror.ConfigError, "invalid loki endpoint") + } + baseURL.Path = strings.TrimRight(baseURL.Path, "/") + "/loki/api/v1/query_range" + + query := baseURL.Query() + query.Set("query", logQL) + query.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + query.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + query.Set("limit", strconv.Itoa(limit)) + query.Set("direction", "backward") + baseURL.RawQuery = query.Encode() + return baseURL.String(), nil +} + +func (c *lokiClient) labelsURL(start, end time.Time) (string, error) { + baseURL, err := url.Parse(c.config.Endpoint) + if err != nil { + return "", bizerror.Wrap(err, bizerror.ConfigError, "invalid loki endpoint") + } + baseURL.Path = strings.TrimRight(baseURL.Path, "/") + "/loki/api/v1/labels" + + query := baseURL.Query() + query.Set("start", strconv.FormatInt(start.UnixNano(), 10)) + query.Set("end", strconv.FormatInt(end.UnixNano(), 10)) + baseURL.RawQuery = query.Encode() + return baseURL.String(), nil +} + +func buildLogQLQueries(req *SearchLogsReq) []string { + return buildLogQLQueriesWithLabels(req, nil) +} + +func buildLogQLQueriesWithLabels(req *SearchLogsReq, labelNames map[string]struct{}) []string { + selectors := buildStreamSelectorsWithLabels(req, labelNames) + queries := make([]string, 0, len(selectors)) + for _, selector := range selectors { + query := selector + if req.Keywords != "" { + query += " |= " + strconv.Quote(req.Keywords) + } + if req.TraceID != "" { + query += " |= " + strconv.Quote(req.TraceID) + } + queries = append(queries, query) + } + return queries +} + +func buildStreamSelectors(req *SearchLogsReq) []string { + return buildStreamSelectorsWithLabels(req, nil) +} + +func buildStreamSelectorsWithLabels(req *SearchLogsReq, labelNames map[string]struct{}) []string { + labelGroups := make([][]string, 0, 4) + if req.Mesh != "" { + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"mesh"}, req.Mesh, labelNames)) + } + if req.AppName != "" { + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"app", "appName"}, req.AppName, labelNames)) + } + if req.ServiceName != "" { + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"service", "serviceName", "service_name"}, req.ServiceName, labelNames)) + } + if req.InstanceName != "" { + labelGroups = append(labelGroups, labelMatchersWithLabels([]string{"instance", "instanceName", "pod"}, req.InstanceName, labelNames)) + } + if len(labelGroups) == 0 { + return []string{fmt.Sprintf("{%s=~%s}", fallbackSelectorLabel(labelNames), strconv.Quote(".+"))} + } + + // Cartesian product + selectors := []string{""} + for _, group := range labelGroups { + next := make([]string, 0, len(selectors)*len(group)) + for _, prefix := range selectors { + for _, matcher := range group { + if prefix == "" { + next = append(next, matcher) + } else { + next = append(next, prefix+", "+matcher) + } + } + } + selectors = next + } + + result := make([]string, 0, len(selectors)) + for _, selector := range selectors { + result = append(result, fmt.Sprintf("{%s}", selector)) + } + return result +} + +func labelMatchers(names []string, value string) []string { + return labelMatchersWithLabels(names, value, nil) +} + +func labelMatchersWithLabels(names []string, value string, labelNames map[string]struct{}) []string { + selected := selectExistingLabels(names, labelNames) + matchers := make([]string, 0, len(names)) + for _, name := range selected { + matchers = append(matchers, labelMatcher(name, value)) + } + return matchers +} + +func selectExistingLabels(names []string, labelNames map[string]struct{}) []string { + if len(labelNames) == 0 { + return names + } + selected := make([]string, 0, len(names)) + for _, name := range names { + if _, ok := labelNames[name]; ok { + selected = append(selected, name) + } + } + if len(selected) == 0 { + return names + } + return selected +} + +func matchingLabels(names []string, labelNames map[string]struct{}) []string { + selected := make([]string, 0, len(names)) + for _, name := range names { + if _, ok := labelNames[name]; ok { + selected = append(selected, name) + } + } + return selected +} + +func fallbackSelectorLabel(labelNames map[string]struct{}) string { + if len(labelNames) == 0 { + return "namespace" + } + for _, label := range fallbackSelectorPriority { + if _, ok := labelNames[label]; ok { + return label + } + } + return "namespace" +} + +func supportedLabels(labelNames map[string]struct{}) []string { + labels := make([]string, 0, len(labelNames)) + for label := range labelNames { + labels = append(labels, label) + } + sort.Strings(labels) + return labels +} + +func labelMatcher(name, value string) string { + return fmt.Sprintf("%s=%s", name, strconv.Quote(value)) +} + +func resolveTimeRange(startRaw, endRaw string) (time.Time, time.Time, error) { + end := time.Now() + if endRaw != "" { + parsed, err := parseLogTime("endTime", endRaw) + if err != nil { + return time.Time{}, time.Time{}, err + } + end = parsed + } + start := end.Add(-defaultQueryWindow) + if startRaw != "" { + parsed, err := parseLogTime("startTime", startRaw) + if err != nil { + return time.Time{}, time.Time{}, err + } + start = parsed + } + if start.After(end) { + return time.Time{}, time.Time{}, bizerror.New(bizerror.InvalidArgument, "startTime must be less than or equal to endTime") + } + return start, end, nil +} + +func parseLogTime(field, value string) (time.Time, error) { + if ts, err := time.Parse(time.RFC3339Nano, value); err == nil { + return ts, nil + } + if ns, err := strconv.ParseInt(value, 10, 64); err == nil { + return time.Unix(0, ns), nil + } + return time.Time{}, bizerror.New(bizerror.InvalidArgument, + fmt.Sprintf("%s must be RFC3339, RFC3339Nano, or Unix nanoseconds", field)) +} + +func normalizeLokiLogs(resp lokiQueryRangeResp) []LogItem { + logs := make([]LogItem, 0) + for _, stream := range resp.Data.Result { + for _, value := range stream.Values { + if len(value) < 2 { + continue + } + raw := value[1] + message := extractLogField(raw, "msg", "message") + if message == "" { + message = raw + } + logs = append(logs, LogItem{ + Timestamp: normalizeLogTimestamp(value[0], extractLogField(raw, "time", "timestamp")), + AppName: firstLabel(stream.Stream, "app", "appName"), + ServiceName: firstLabel(stream.Stream, "service", "serviceName", "service_name"), + InstanceName: firstLabel(stream.Stream, "instance", "instanceName", "pod"), + Severity: firstNonEmpty(extractLogField(raw, "level", "severity"), firstLabel(stream.Stream, "level", "severity", "detected_level")), + Message: message, + TraceID: firstNonEmpty(extractLogField(raw, "trace_id", "traceId", "traceid"), firstLabel(stream.Stream, "trace_id", "traceId", "traceid")), + SpanID: firstNonEmpty(extractLogField(raw, "span_id", "spanId", "spanid"), firstLabel(stream.Stream, "span_id", "spanId", "spanid")), + TraceFlags: firstNonEmpty(extractLogField(raw, "trace_flags", "traceFlags", "traceflags"), firstLabel(stream.Stream, "trace_flags", "traceFlags", "traceflags")), + Attributes: extraLabels(stream.Stream), + Raw: raw, + }) + } + } + return logs +} + +func normalizeLogTimestamp(lokiTimestamp, logTimestamp string) string { + if logTimestamp != "" { + if parsed, err := time.Parse(time.RFC3339Nano, logTimestamp); err == nil { + return parsed.UTC().Format(time.RFC3339Nano) + } + } + return normalizeLokiTimestamp(lokiTimestamp) +} + +func normalizeLokiTimestamp(value string) string { + ns, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return value + } + return time.Unix(0, ns).UTC().Format(time.RFC3339Nano) +} + +// firstLabel returns the first label value that matches any of the keys, or an empty string if none matches +func firstLabel(labels map[string]string, keys ...string) string { + for _, key := range keys { + if value := labels[key]; value != "" { + return value + } + } + return "" +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + +func extractLogField(message string, keys ...string) string { + if value := extractJSONLogField(message, keys...); value != "" { + return value + } + return extractTextLogField(message, keys...) +} + +func extractJSONLogField(message string, keys ...string) string { + var payload map[string]any + if err := json.Unmarshal([]byte(message), &payload); err != nil { + return "" + } + for _, key := range keys { + if value := stringifyLogField(payload[key]); value != "" { + return value + } + } + return "" +} + +func stringifyLogField(value any) string { + switch v := value.(type) { + case string: + return v + case float64, bool: + return fmt.Sprint(v) + default: + return "" + } +} + +// cache the compiled regex per key and reuse it. +var textLogFieldPatterns sync.Map // map[string]*regexp.Regexp +func textLogFieldPattern(key string) *regexp.Regexp { + if re, ok := textLogFieldPatterns.Load(key); ok { + return re.(*regexp.Regexp) + } + compiled := regexp.MustCompile(`(?i)(?:^|[\s{,])"?` + regexp.QuoteMeta(key) + `"?\s*[:=]\s*"?([^"\s,}]+)`) + actual, _ := textLogFieldPatterns.LoadOrStore(key, compiled) + return actual.(*regexp.Regexp) +} + +func extractTextLogField(message string, keys ...string) string { + for _, key := range keys { + pattern := textLogFieldPattern(key) + if matches := pattern.FindStringSubmatch(message); len(matches) == 2 { + return matches[1] + } + } + return "" +} + +// extraLabels returns all labels except the ones used to filter the logs +func extraLabels(labels map[string]string) map[string]string { + attrs := make(map[string]string) + for key, value := range labels { + switch key { + case "mesh", "app", "appName", "service", "serviceName", "service_name", "instance", "instanceName", + "pod", "level", "severity", "detected_level", "trace_id", "traceId", "traceid", "span_id", "spanId", + "spanid", "trace_flags", "traceFlags", "traceflags": + continue + default: + attrs[key] = value + } + } + if len(attrs) == 0 { + return nil + } + return attrs +} + +func dedupeKey(item LogItem) string { + return strings.Join([]string{item.Timestamp, item.Message, item.TraceID, item.SpanID}, "\x00") +} diff --git a/pkg/mcp/tools/log/model.go b/pkg/mcp/tools/log/model.go new file mode 100644 index 000000000..bbf63656d --- /dev/null +++ b/pkg/mcp/tools/log/model.go @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +type SearchLogsReq struct { + Mesh string `json:"mesh,omitempty"` + AppName string `json:"appName,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + InstanceName string `json:"instanceName,omitempty"` + TraceID string `json:"traceId,omitempty"` + Keywords string `json:"keywords,omitempty"` + StartTime string `json:"startTime,omitempty"` + EndTime string `json:"endTime,omitempty"` + Limit int `json:"limit,omitempty"` +} + +type LogItem struct { + Timestamp string `json:"timestamp"` + AppName string `json:"appName,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + InstanceName string `json:"instanceName,omitempty"` + Severity string `json:"severity,omitempty"` + Message string `json:"message"` + TraceID string `json:"traceId,omitempty"` + SpanID string `json:"spanId,omitempty"` + TraceFlags string `json:"traceFlags,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Raw string `json:"raw,omitempty"` +} + +type SearchLogsResp struct { + Logs []LogItem `json:"logs"` + SourceEngine string `json:"sourceEngine"` +} + +type AnalyzeErrorLogsReq struct { + SearchLogsReq +} + +type LogCapabilitiesReq struct { + StartTime string `json:"startTime,omitempty"` + EndTime string `json:"endTime,omitempty"` +} + +type LogCapabilitiesResp struct { + AvailableLabels []string `json:"availableLabels"` + SupportedFilters []string `json:"supportedFilters"` + LabelFilters map[string][]string `json:"labelFilters"` + ContentFilters []string `json:"contentFilters"` + FallbackLabel string `json:"fallbackLabel"` + SourceEngine string `json:"sourceEngine"` +} + +type ErrorPattern struct { + Pattern string `json:"pattern"` + Count int `json:"count"` + Example string `json:"example,omitempty"` + FirstSeen string `json:"firstSeen,omitempty"` + LastSeen string `json:"lastSeen,omitempty"` + Examples []LogItem `json:"examples,omitempty"` +} + +type AnalyzeErrorLogsResp struct { + Summary string `json:"summary"` + TotalErrors int `json:"totalErrors"` + Patterns []ErrorPattern `json:"patterns"` + SourceEngine string `json:"sourceEngine"` +} diff --git a/pkg/mcp/tools/log/tools.go b/pkg/mcp/tools/log/tools.go new file mode 100644 index 000000000..a801ce2f7 --- /dev/null +++ b/pkg/mcp/tools/log/tools.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "fmt" + + consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/mcp/common" +) + +const defaultLogLimit = 100 + +func SearchLogs(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return common.ErrorResult(err), nil + } + resp, err := client.search(requestContext(ctx), buildSearchLogsReq(args)) + if err != nil { + return common.ErrorResult(err), nil + } + return common.JsonResult(resp) +} + +func AnalyzeErrorLogs(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return common.ErrorResult(err), nil + } + req := buildSearchLogsReq(args) + if req.Keywords == "" { + req.Keywords = "Error" + } + searchResp, err := client.search(requestContext(ctx), req) + if err != nil { + return common.ErrorResult(err), nil + } + return common.JsonResult(analyzeErrors(searchResp.Logs, searchResp.SourceEngine)) +} + +func GetLogCapabilities(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { + client, err := lokiClientFromContext(ctx) + if err != nil { + return common.ErrorResult(err), nil + } + resp, err := client.capabilities(requestContext(ctx), buildLogCapabilitiesReq(args)) + if err != nil { + return common.ErrorResult(err), nil + } + return common.JsonResult(resp) +} + +func lokiClientFromContext(ctx consolectx.Context) (*lokiClient, error) { + if ctx == nil || ctx.Config().Observability == nil || ctx.Config().Observability.Logs == nil { + return nil, fmt.Errorf("loki log provider is not configured") + } + provider, ok := ctx.Config().Observability.Logs.Default() + if !ok { + return nil, fmt.Errorf("default loki log provider is not configured") + } + return newLokiClient(provider), nil +} + +func requestContext(ctx consolectx.Context) context.Context { + if ctx == nil || ctx.AppContext() == nil { + return context.Background() + } + return ctx.AppContext() +} + +func buildSearchLogsReq(args map[string]any) *SearchLogsReq { + helper := common.NewArgsHelper(args) + return &SearchLogsReq{ + Mesh: helper.GetString("mesh", ""), + AppName: helper.GetString("appName", ""), + ServiceName: helper.GetString("serviceName", ""), + InstanceName: helper.GetString("instanceName", ""), + TraceID: helper.GetString("traceId", ""), + Keywords: helper.GetString("keywords", ""), + StartTime: helper.GetString("startTime", ""), + EndTime: helper.GetString("endTime", ""), + Limit: helper.GetInt("limit", defaultLogLimit), + } +} + +func buildLogCapabilitiesReq(args map[string]any) *LogCapabilitiesReq { + helper := common.NewArgsHelper(args) + return &LogCapabilitiesReq{ + StartTime: helper.GetString("startTime", ""), + EndTime: helper.GetString("endTime", ""), + } +} + +func LogSearchProperties() map[string]common.PropertyDef { + return map[string]common.PropertyDef{ + "mesh": { + Type: "string", + Description: "Mesh 名称,用于显式按 mesh label 过滤", + }, + "appName": { + Type: "string", + Description: "应用名称", + }, + "serviceName": { + Type: "string", + Description: "服务名称", + }, + "instanceName": { + Type: "string", + Description: "实例、Pod 或主机名称", + }, + "traceId": { + Type: "string", + Description: "TraceID", + }, + "keywords": { + Type: "string", + Description: "日志关键字", + }, + "startTime": { + Type: "string", + Description: "开始时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "endTime": { + Type: "string", + Description: "结束时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "limit": { + Type: "integer", + Description: "返回日志条数上限", + Default: defaultLogLimit, + }, + } +} + +func LogCapabilitiesProperties() map[string]common.PropertyDef { + return map[string]common.PropertyDef{ + "startTime": { + Type: "string", + Description: "开始时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + "endTime": { + Type: "string", + Description: "结束时间,支持 RFC3339/RFC3339Nano 或 Unix 纳秒", + }, + } +} diff --git a/pkg/mcp/tools/log/tools_test.go b/pkg/mcp/tools/log/tools_test.go new file mode 100644 index 000000000..08e2b412e --- /dev/null +++ b/pkg/mcp/tools/log/tools_test.go @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/apache/dubbo-admin/pkg/config/app" + "github.com/apache/dubbo-admin/pkg/config/observability" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/console/counter" + "github.com/apache/dubbo-admin/pkg/core/lock" + "github.com/apache/dubbo-admin/pkg/core/manager" +) + +func TestLogToolPropertiesAreAvailable(t *testing.T) { + searchProperties := LogSearchProperties() + for _, name := range []string{"mesh", "appName", "serviceName", "instanceName", "traceId", "keywords", "startTime", "endTime", "limit"} { + if _, ok := searchProperties[name]; !ok { + t.Fatalf("search property %s was not configured", name) + } + } + + capabilityProperties := LogCapabilitiesProperties() + for _, name := range []string{"startTime", "endTime"} { + if _, ok := capabilityProperties[name]; !ok { + t.Fatalf("capability property %s was not configured", name) + } + } +} + +func TestGetLogCapabilitiesReturnsAvailableLabelsAndResolvedFilters(t *testing.T) { + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/labels" { + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"status":"success","data":["pod","namespace","service_name","mesh"]}`)) + })) + defer loki.Close() + + result, err := GetLogCapabilities(newLogToolTestContext(loki.URL), map[string]any{ + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("GetLogCapabilities returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("GetLogCapabilities returned error result: %s", result.Content[0].Text) + } + + var payload LogCapabilitiesResp + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("failed to decode tool result: %v", err) + } + expectedLabels := []string{"mesh", "namespace", "pod", "service_name"} + if len(payload.AvailableLabels) != len(expectedLabels) { + t.Fatalf("expected labels %v, got %v", expectedLabels, payload.AvailableLabels) + } + for i := range expectedLabels { + if payload.AvailableLabels[i] != expectedLabels[i] { + t.Fatalf("label[%d] expected %q, got %q", i, expectedLabels[i], payload.AvailableLabels[i]) + } + } + if payload.FallbackLabel != "namespace" { + t.Fatalf("expected fallbackLabel namespace, got %q", payload.FallbackLabel) + } + if got := payload.LabelFilters["serviceName"]; len(got) != 1 || got[0] != "service_name" { + t.Fatalf("expected serviceName to resolve to service_name, got %v", got) + } + if got := payload.LabelFilters["instanceName"]; len(got) != 1 || got[0] != "pod" { + t.Fatalf("expected instanceName to resolve to pod, got %v", got) + } + if got := payload.LabelFilters["appName"]; len(got) != 0 { + t.Fatalf("expected appName to have no available labels, got %v", got) + } + if len(payload.ContentFilters) != 2 || payload.ContentFilters[0] != "traceId" || payload.ContentFilters[1] != "keywords" { + t.Fatalf("unexpected content filters: %v", payload.ContentFilters) + } + if payload.SourceEngine != "loki" { + t.Fatalf("expected sourceEngine loki, got %q", payload.SourceEngine) + } +} + +func TestBuildLogQLQueriesUsesCommonLabelAliases(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + ServiceName: "org.apache.DemoService", + Keywords: "Error", + }) + + expected := []string{ + `{service="org.apache.DemoService"} |= "Error"`, + `{serviceName="org.apache.DemoService"} |= "Error"`, + `{service_name="org.apache.DemoService"} |= "Error"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + +func TestBuildLogQLQueriesFiltersTraceIDFromLogContent(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + ServiceName: "org.apache.DemoService", + TraceID: "trace-1", + Keywords: "ERROR", + }) + + expected := []string{ + `{service="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + `{serviceName="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + `{service_name="org.apache.DemoService"} |= "ERROR" |= "trace-1"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + +func TestBuildLogQLQueriesUsesNamespaceSelectorWhenOnlyTraceIDIsProvided(t *testing.T) { + queries := buildLogQLQueries(&SearchLogsReq{ + TraceID: "trace-1", + }) + + expected := []string{ + `{namespace=~".+"} |= "trace-1"`, + } + if len(queries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(queries), queries) + } + for i := range expected { + if queries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], queries[i]) + } + } +} + +func TestNormalizeLokiLogsExtractsTraceContextFromMessage(t *testing.T) { + logs := normalizeLokiLogs(lokiQueryRangeResp{ + Status: "success", + Data: struct { + Result []lokiStream `json:"result"` + }{ + Result: []lokiStream{{ + Stream: map[string]string{ + "app": "demo-provider", + "trace_id": "label-trace", + "span_id": "label-span", + }, + Values: [][]string{{ + "1777110661783444000", + `ERROR trace_id=message-trace span_id=message-span metadata report failed`, + }}, + }}, + }, + }) + + if len(logs) != 1 { + t.Fatalf("expected one log, got %d", len(logs)) + } + if logs[0].TraceID != "message-trace" || logs[0].SpanID != "message-span" { + t.Fatalf("expected trace context from message, got traceID=%q spanID=%q", logs[0].TraceID, logs[0].SpanID) + } +} + +func TestNormalizeLokiLogsParsesDubboGoJSONLog(t *testing.T) { + raw := `{"level":"error","msg":"error message","span_id":"36d69a3dd9bea02c","time":"2026-06-02T14:56:37+08:00","trace_flags":"01","trace_id":"faba6a688ea3070b1613f50fb081c578"}` + logs := normalizeLokiLogs(lokiQueryRangeResp{ + Status: "success", + Data: struct { + Result []lokiStream `json:"result"` + }{ + Result: []lokiStream{{ + Stream: map[string]string{ + "app": "dubbo-go-provider", + "service_name": "org.apache.DemoService", + }, + Values: [][]string{{ + "1777110661783444000", + raw, + }}, + }}, + }, + }) + + if len(logs) != 1 { + t.Fatalf("expected one log, got %d", len(logs)) + } + logItem := logs[0] + if logItem.Timestamp != "2026-06-02T06:56:37Z" { + t.Fatalf("expected timestamp from JSON log time, got %q", logItem.Timestamp) + } + if logItem.Severity != "error" || logItem.Message != "error message" { + t.Fatalf("expected level and msg from JSON log, got severity=%q message=%q", logItem.Severity, logItem.Message) + } + if logItem.TraceID != "faba6a688ea3070b1613f50fb081c578" || logItem.SpanID != "36d69a3dd9bea02c" || logItem.TraceFlags != "01" { + t.Fatalf("unexpected trace context: traceID=%q spanID=%q traceFlags=%q", logItem.TraceID, logItem.SpanID, logItem.TraceFlags) + } + if logItem.Raw != raw { + t.Fatalf("expected raw JSON log to be preserved, got %q", logItem.Raw) + } +} + +func TestSearchLogsQueriesLoki(t *testing.T) { + var seenQueries []string + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + http.Error(w, "labels unavailable", http.StatusInternalServerError) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [{ + "stream": { + "app": "demo-provider", + "service_name": "org.apache.DemoService", + "instance": "127.0.0.1:20880", + "level": "ERROR", + "namespace": "dubbo-system" + }, + "values": [["1777110661783444000", "ERROR trace_id=trace-1 span_id=span-1 test log"]] + }] + } + }`)) + default: + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "serviceName": "org.apache.DemoService", + "keywords": "ERROR", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + "limit": float64(10), + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + + var payload SearchLogsResp + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("failed to decode tool result: %v", err) + } + if len(payload.Logs) != 1 { + t.Fatalf("expected one deduplicated log, got %d", len(payload.Logs)) + } + logItem := payload.Logs[0] + if logItem.ServiceName != "org.apache.DemoService" || logItem.TraceID != "trace-1" || logItem.Severity != "ERROR" { + t.Fatalf("unexpected normalized log: %+v", logItem) + } + if len(seenQueries) != 3 { + t.Fatalf("expected three service label alias queries, got %d: %v", len(seenQueries), seenQueries) + } +} + +func TestSearchLogsUsesLokiLabelsToReduceAliasQueries(t *testing.T) { + var seenQueries []string + labelsRequested := false + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + labelsRequested = true + _, _ = w.Write([]byte(`{"status":"success","data":["namespace","service_name","pod"]}`)) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "result": [{ + "stream": {"service_name": "org.apache.DemoService", "namespace": "dubbo-system"}, + "values": [["1777110661783444000", "ERROR trace_id=trace-1 test log"]] + }] + } + }`)) + default: + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "serviceName": "org.apache.DemoService", + "keywords": "ERROR", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + if !labelsRequested { + t.Fatal("expected labels endpoint to be requested") + } + expected := []string{`{service_name="org.apache.DemoService"} |= "ERROR"`} + if len(seenQueries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(seenQueries), seenQueries) + } + for i := range expected { + if seenQueries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], seenQueries[i]) + } + } +} + +func TestSearchLogsUsesAvailableFallbackLabelWhenOnlyTraceIDIsProvided(t *testing.T) { + var seenQueries []string + loki := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/loki/api/v1/labels": + _, _ = w.Write([]byte(`{"status":"success","data":["pod","job"]}`)) + case "/loki/api/v1/query_range": + seenQueries = append(seenQueries, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{"status":"success","data":{"result":[]}}`)) + default: + t.Fatalf("unexpected loki path: %s", r.URL.Path) + } + })) + defer loki.Close() + + result, err := SearchLogs(newLogToolTestContext(loki.URL), map[string]any{ + "traceId": "trace-1", + "startTime": "2026-04-01T00:00:00Z", + "endTime": "2026-04-01T01:00:00Z", + }) + if err != nil { + t.Fatalf("SearchLogs returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("SearchLogs returned error result: %s", result.Content[0].Text) + } + expected := []string{`{job=~".+"} |= "trace-1"`} + if len(seenQueries) != len(expected) { + t.Fatalf("expected %d queries, got %d: %v", len(expected), len(seenQueries), seenQueries) + } + for i := range expected { + if seenQueries[i] != expected[i] { + t.Fatalf("query[%d] expected %q, got %q", i, expected[i], seenQueries[i]) + } + } +} + +func TestAnalyzeErrorsGroupsPatterns(t *testing.T) { + resp := analyzeErrors([]LogItem{ + {Timestamp: "2026-04-01T00:00:00Z", Severity: "ERROR", Message: "Error 500 for request 123"}, + {Timestamp: "2026-04-01T00:01:00Z", Severity: "ERROR", Message: "Error 503 for request 456"}, + {Timestamp: "2026-04-01T00:02:00Z", Severity: "INFO", Message: "normal log"}, + }, "loki") + + if resp.TotalErrors != 2 { + t.Fatalf("expected 2 errors, got %d", resp.TotalErrors) + } + if len(resp.Patterns) != 1 { + t.Fatalf("expected one pattern, got %d", len(resp.Patterns)) + } + if resp.Patterns[0].Pattern != "Error ? for request ?" || resp.Patterns[0].Count != 2 { + t.Fatalf("unexpected pattern: %+v", resp.Patterns[0]) + } +} + +func newLogToolTestContext(endpoint string) consolectx.Context { + return &logToolTestContext{ + config: app.AdminConfig{ + Observability: &observability.Config{ + Logs: &observability.LogsConfig{ + DefaultProvider: "loki-main", + Providers: []observability.LogProviderConfig{{ + Name: "loki-main", + Type: observability.LogProviderLoki, + Endpoint: endpoint, + }}, + }, + }, + }, + } +} + +type logToolTestContext struct { + config app.AdminConfig +} + +func (c *logToolTestContext) ResourceManager() manager.ResourceManager { + return nil +} + +func (c *logToolTestContext) CounterManager() counter.CounterManager { + return nil +} + +func (c *logToolTestContext) Config() app.AdminConfig { + return c.config +} + +func (c *logToolTestContext) AppContext() context.Context { + return context.Background() +} + +func (c *logToolTestContext) LockManager() lock.Lock { + return nil +} diff --git a/pkg/mcp/tools/search.go b/pkg/mcp/tools/search.go index f8093945f..2b4e27862 100644 --- a/pkg/mcp/tools/search.go +++ b/pkg/mcp/tools/search.go @@ -120,11 +120,11 @@ func (e *appNameSearchExecutor) execute(ctx consolectx.Context, keyword, mesh st func (e *appNameSearchExecutor) buildResult(pagedResult *model.SearchPaginationResult, keyword string, pageSize, pageNumber int) map[string]any { apps := extractGlobalApplications(pagedResult) return map[string]any{ - "keyword": keyword, - "pageSize": pageSize, - "pageNumber": pageNumber, + "keyword": keyword, + "pageSize": pageSize, + "pageNumber": pageNumber, "applications": apps, - "totalCount": len(apps), + "totalCount": len(apps), } }