diff --git a/.build/build-resolver.xml b/.build/build-resolver.xml index 7e88a4ee7334..7a4cf947ac8b 100644 --- a/.build/build-resolver.xml +++ b/.build/build-resolver.xml @@ -185,7 +185,7 @@ - + @@ -212,7 +212,7 @@ - + diff --git a/.build/cassandra-build-maven-pom.xml b/.build/cassandra-build-maven-pom.xml index bf2adfd9f8e4..6afc44108ca9 100644 --- a/.build/cassandra-build-maven-pom.xml +++ b/.build/cassandra-build-maven-pom.xml @@ -164,5 +164,9 @@ cassandra-accord tests + + io.opentelemetry + opentelemetry-sdk-testing + diff --git a/.build/cassandra-deps-maven-pom.xml b/.build/cassandra-deps-maven-pom.xml index 474c1037d0b1..9acf26ffb22e 100644 --- a/.build/cassandra-deps-maven-pom.xml +++ b/.build/cassandra-deps-maven-pom.xml @@ -388,5 +388,26 @@ org.passay passay + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + + + io.opentelemetry + opentelemetry-exporter-otlp + runtime + + + io.opentelemetry.semconv + opentelemetry-semconv + diff --git a/.build/parent-maven-pom.xml b/.build/parent-maven-pom.xml index e8d82bc4edf4..33d4928a9197 100644 --- a/.build/parent-maven-pom.xml +++ b/.build/parent-maven-pom.xml @@ -1316,6 +1316,18 @@ passay 1.6.4 + + io.opentelemetry + opentelemetry-bom + 1.62.0 + pom + import + + + io.opentelemetry.semconv + opentelemetry-semconv + 1.41.1 + diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 422ad0a482ad..d589ee69b61d 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -2999,3 +2999,9 @@ compression_dictionary_cache_size: 10 # Expired dictionaries will be removed from memory but can be reloaded if needed. # Min unit: s compression_dictionary_cache_expire: 24h + +# OpenTelemetry integration +# If enabled, Cassandra can export telemetry using OpenTelemetry. +# Currently, only tracing is exported +# Default: false +opentelemetry_enabled: false diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml index ee68c700827f..af356c8029cb 100644 --- a/conf/cassandra_latest.yaml +++ b/conf/cassandra_latest.yaml @@ -2742,3 +2742,9 @@ compression_dictionary_cache_size: 10 # Expired dictionaries will be removed from memory but can be reloaded if needed. # Min unit: s compression_dictionary_cache_expire: 24h + +# OpenTelemetry integration +# If enabled, Cassandra can export telemetry using OpenTelemetry. +# Currently, only tracing is exported +# Default: false +opentelemetry_enabled: false diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 501d1284d2e0..f1c2074cde36 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1041,6 +1041,8 @@ public static void setClientMode(boolean clientMode) public volatile CustomGuardrailConfig role_name_policy = new CustomGuardrailConfig(); public volatile AutoRepairConfig auto_repair = new AutoRepairConfig(); + public volatile boolean opentelemetry_enabled = false; + /** * The variants of paxos implementation and semantics supported by Cassandra. */ diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9ee09db70709..081370df883a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -6255,6 +6255,11 @@ public static void setRepairDiskHeadroomRejectRatio(double value) conf.repair_disk_headroom_reject_ratio = value; } + public static boolean getOpenTelemetryEnabled() + { + return conf.opentelemetry_enabled; + } + @VisibleForTesting public static void setPartitioner(String name) { diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java index 25c42136692e..6597c737d714 100644 --- a/src/java/org/apache/cassandra/cql3/CQLStatement.java +++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java @@ -129,6 +129,17 @@ default boolean eligibleAsPreparedStatement() return false; } + /** + * The query summary describes a class of database queries and is useful as a grouping key. + * This is used in telemetry to group similar queries together. + * + * @see Generating a summary of the query + */ + default String getQuerySummary() + { + return ""; + } + abstract class Raw { protected VariableSpecifications bindVariables; diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 518cd694ba4d..05643831920c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -818,4 +819,19 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.BATCH); } + + /** + * For batch statement, the query summary consists of deduplicated list of targets + * not to produce too long query summary. + */ + @Override + public String getQuerySummary() + { + TreeSet summary = new TreeSet<>(); + for (ModificationStatement statement : statements) + { + summary.add(statement.getQuerySummary()); + } + return String.format("%s BATCH %s", type.name(), String.join(" ", summary)); + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java index dcd1395ab8df..58f88859ab2f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java @@ -188,6 +188,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_ROLE); } + @Override + public String getQuerySummary() + { + return "CREATE ROLE"; + } + @Override public String obfuscatePassword(String query) { diff --git a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java index 5f47063809c3..ba2ff266a5a1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java @@ -143,6 +143,12 @@ public final AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.DESCRIBE); } + @Override + public String getQuerySummary() + { + return "DESCRIBE"; + } + @Override public final ResultMessage execute(QueryState state, QueryOptions options, Dispatcher.RequestTime requestTime) throws RequestValidationException, RequestExecutionException { diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java index cc43b4e2e3c1..2c83dbe500f4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java @@ -89,6 +89,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.DROP_IDENTITY); } + @Override + public String getQuerySummary() + { + return "DROP IDENTITY"; + } + @Override public ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException { diff --git a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java index 7a7b78da80d2..4ce17fbfd6c6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java @@ -97,4 +97,10 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.DROP_ROLE); } + + @Override + public String getQuerySummary() + { + return "DROP ROLE"; + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java index eb581c3c96a9..becc66ec40ef 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java @@ -156,4 +156,10 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.LIST_PERMISSIONS); } + + @Override + public String getQuerySummary() + { + return "LIST PERMISSIONS"; + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java index 3a592fec6403..7a6f59f04f45 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java @@ -155,4 +155,10 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.LIST_ROLES); } + + @Override + public String getQuerySummary() + { + return "LIST ROLES"; + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/ListSuperUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListSuperUsersStatement.java index 3c94a14195f0..19ba5cd9099c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListSuperUsersStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListSuperUsersStatement.java @@ -99,4 +99,10 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.LIST_SUPERUSERS); } + + @Override + public String getQuerySummary() + { + return "LIST SUPERUSERS"; + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index f7d9fbc9a3ca..19e7e57fb4c6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -311,6 +311,12 @@ public boolean eligibleAsPreparedStatement() return true; } + @Override + public String getQuerySummary() + { + return String.format("%s %s", type.name(), metadata.toString()); + } + public void addFunctionsTo(List functions) { attrs.addFunctionsTo(functions); diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 7e8119a2e395..5913e06d91ae 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -534,6 +534,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.SELECT, keyspace(), table.name); } + @Override + public String getQuerySummary() + { + return String.format("SELECT %s", table.toString()); + } + // Simple wrapper class to avoid some code duplication private static abstract class Pager { diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index 8799ffc97144..2cd4512c6588 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@ -134,4 +134,10 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.TRUNCATE, keyspace(), name()); } + + @Override + public String getQuerySummary() + { + return String.format("TRUNCATE %s", qualifiedName); + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java index 319e14899035..e7d73ed947ec 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java @@ -87,6 +87,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.USE_KEYSPACE, keyspace); } + @Override + public String getQuerySummary() + { + return String.format("USE %s", keyspace); + } + public String keyspace() { return keyspace; diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 5ef2b68a9c09..0512b1f6ab7d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -232,6 +232,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.ALTER_KEYSPACE, keyspaceName); } + @Override + public String getQuerySummary() + { + return String.format("ALTER KEYSPACE %s", keyspaceName); + } + public String toString() { return String.format("%s (%s)", getClass().getSimpleName(), keyspaceName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index aee1496cb5f9..6287e81875de 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -156,6 +156,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, keyspaceName, tableName); } + @Override + public String getQuerySummary() + { + return String.format("ALTER TABLE %s.%s", keyspaceName, tableName); + } + public String toString() { return format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java index dd619af7a4cf..5177518a575e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java @@ -101,6 +101,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.ALTER_TYPE, keyspaceName, typeName); } + @Override + public String getQuerySummary() + { + return String.format("ALTER TYPE %s.%s", keyspaceName, typeName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, typeName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java index 0bce6f6f30df..3b2c6817edc3 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java @@ -126,6 +126,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.ALTER_VIEW, keyspaceName, viewName); } + @Override + public String getQuerySummary() + { + return String.format("ALTER VIEW %s.%s", keyspaceName, viewName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, viewName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java index 1250cb79ec6b..6f277ae8c73f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java @@ -112,6 +112,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_TABLE_LIKE, targetKeyspace, targetTableName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE TABLE %s.%s LIKE %s.%s", targetKeyspace, targetTableName, sourceKeyspace, sourceTableName); + } + @Override public boolean compatibleWith(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java index eaddab2af7f9..19cb2598095a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java @@ -297,6 +297,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_AGGREGATE, keyspaceName, aggregateName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE AGGREGATE %s.%s", keyspaceName, aggregateName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, aggregateName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java index 9505d7f43606..d19866779a38 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java @@ -210,6 +210,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_FUNCTION, keyspaceName, functionName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE FUNCTION %s.%s", keyspaceName, functionName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, functionName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java index d3e08dfee5c7..018af58916bd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -347,6 +347,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_INDEX, keyspaceName, indexName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE INDEX %s.%s", keyspaceName, indexName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, indexName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java index a622b861921c..2e67fe948ea8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java @@ -136,6 +136,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_KEYSPACE, keyspaceName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE KEYSPACE %s", keyspaceName); + } + public String toString() { return String.format("%s (%s)", getClass().getSimpleName(), keyspaceName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java index 26dfbb8dec76..f5886d4a5541 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java @@ -256,6 +256,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_TABLE, keyspaceName, tableName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE TABLE %s.%s", keyspaceName, tableName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java index fcf23af3e0ef..b0d1b5200039 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java @@ -129,6 +129,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_TRIGGER, keyspaceName, triggerName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE TRIGGER %s.%s", keyspaceName, triggerName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, triggerName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java index 38d28443b6f4..5fb73578dd73 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java @@ -140,6 +140,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_TYPE, keyspaceName, typeName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE TYPE %s.%s", keyspaceName, typeName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, typeName); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java index d0b219ca62cc..70d6d2f46cec 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java @@ -418,6 +418,12 @@ public AuditLogContext getAuditLogContext() return new AuditLogContext(AuditLogEntryType.CREATE_VIEW, keyspaceName, viewName); } + @Override + public String getQuerySummary() + { + return String.format("CREATE VIEW %s.%s", keyspaceName, viewName); + } + public String toString() { return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, viewName); diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index d0897afdd5af..e456a1bf7032 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.db; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +29,8 @@ import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.transport.Dispatcher; +import io.opentelemetry.api.trace.Span; + public class CounterMutationVerbHandler extends AbstractMutationVerbHandler { public static final CounterMutationVerbHandler instance = new CounterMutationVerbHandler(); @@ -36,6 +40,12 @@ public class CounterMutationVerbHandler extends AbstractMutationVerbHandler message, InetAddressAndPort respondToAddress) { final CounterMutation cm = message.payload; + if (Span.current().getSpanContext().isValid()) + { + String target = cm.getPartitionUpdates().stream() + .map((pu) -> pu.metadata().toString()).collect(Collectors.joining(" ")); + Span.current().updateName(String.format("%s %s", message.verb().name(), target)); + } logger.trace("Applying forwarded {}", cm); String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter; diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index f4244c84e1b1..e247b7382727 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.util.Map; +import java.util.stream.Collectors; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -27,6 +28,8 @@ import org.apache.cassandra.net.ParamType; import org.apache.cassandra.tracing.Tracing; +import io.opentelemetry.api.trace.Span; + import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; @@ -58,6 +61,13 @@ public void doVerb(Message message) return; } + if (Span.current().getSpanContext().isValid()) + { + String target = message.payload.getPartitionUpdates().stream() + .map((pu) -> pu.metadata().toString()).collect(Collectors.joining(" ")); + Span.current().updateName(String.format("%s %s", message.verb().name(), target)); + } + MessageParams.reset(); message.payload.validateSize(MessagingService.current_version, ENTRY_OVERHEAD_SIZE); WriteThresholds.checkWriteThresholds(message.payload); diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index b418ecdfeb2a..c0a8034f3cde 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -44,6 +44,8 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import io.opentelemetry.api.trace.Span; + import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.exceptions.RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM; @@ -67,6 +69,10 @@ public ReadResponse doRead(ReadCommand command, boolean trackRepairedData) public void doVerb(Message message) { + if (Span.current().getSpanContext().isValid()) + { + Span.current().updateName(String.format("%s %s", message.verb().name(), message.payload.metadata().toString())); + } if (message.epoch().isAfter(Epoch.EMPTY)) { ClusterMetadata metadata = ClusterMetadata.current(); diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java index 3e0d40675940..e129e71de6ae 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java @@ -37,6 +37,8 @@ import org.apache.cassandra.net.ResourceLimits.Limit; import org.apache.cassandra.service.accord.debug.AccordRemoteTracing; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; @@ -46,6 +48,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; @@ -421,12 +427,41 @@ private void dispatch(ProcessMessage task) { Header header = task.header(); + // Start tracing + Context otelContext = header.traceContext(); + final Span span; + // Only start replica span if the context was propagated + SpanContext propagatedSpanContext = Span.fromContext(otelContext).getSpanContext(); + if (propagatedSpanContext.isValid() && propagatedSpanContext.isRemote()) + { + span = Telemetry.getRequestTracer() + .spanBuilder(header.verb.name()) // Span name will be updated with additional details + .setParent(otelContext) + .setAttribute(CassandraAttributes.CASSANDRA_NET_VERB, header.verb.name()) + .setAttribute(CassandraAttributes.THREAD_ID, Thread.currentThread().getId()) + .setAttribute(CassandraAttributes.THREAD_NAME, Thread.currentThread().getName()) + .startSpan(); + } + else + { + span = Span.getInvalid(); + } + // Legacy tracing TraceState state = Tracing.instance.initializeFromMessage(header); if (state != null) state.trace("{} message received from {}", header.verb, header.from); AccordRemoteTracing.traceOffWire(header); callbacks.onDispatched(task.size(), header); - header.verb.stage.execute(ExecutorLocals.create(state), task); + header.verb.stage.execute(ExecutorLocals.create(state), () -> { + try (Scope scope = span.makeCurrent()) + { + task.run(); + } + finally + { + span.end(); + } + }); } private abstract class ProcessMessage implements Runnable diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index 41f0281706a8..163a1f86cad2 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -53,6 +53,9 @@ import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.TimeUUID; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; + import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.db.TypeSizes.sizeof; @@ -293,7 +296,7 @@ private static Message withParam(long id, Verb verb, long expiresAtNanos, flags = ARTIFICIAL_LATENCY.addTo(flags); InetAddressAndPort from = getBroadcastAddressAndPort(); - return new Message<>(new Header(id, epochSupplier.get(), verb, from, createdAtNanos, expiresAtNanos, flags, buildParams(paramType, paramValue)), payload); + return new Message<>(new Header(id, epochSupplier.get(), verb, from, createdAtNanos, expiresAtNanos, flags, buildParams(paramType, paramValue, verb.isResponse())), payload); } public static Message internalResponse(Verb verb, T payload) @@ -428,7 +431,7 @@ public Message withParams(Map values) private static final EnumMap NO_PARAMS = new EnumMap<>(ParamType.class); - private static Map buildParams(ParamType type, Object value) + private static Map buildParams(ParamType type, Object value, boolean isResponse) { EnumMap params = NO_PARAMS; if (Tracing.isTracing()) @@ -440,6 +443,13 @@ private static Map buildParams(ParamType type, Object value) params = new EnumMap<>(ParamType.class); params.put(type, value); } + // OpenTelemetry tracing + if (!isResponse && Span.current().isRecording()) + { + if (params.isEmpty()) + params = new EnumMap<>(ParamType.class); + params.put(ParamType.TRACE_CONTEXT, Context.current()); + } return params; } @@ -645,6 +655,14 @@ public TraceType traceType() return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY); } + /** + * @return the OpenTelemetry context when it is propagated from the remote peer. + */ + public Context traceContext() + { + return (Context) params.getOrDefault(ParamType.TRACE_CONTEXT, Context.current()); + } + public Map params() { return Collections.unmodifiableMap(params); diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java index b7f666617e10..638df6ad20d3 100644 --- a/src/java/org/apache/cassandra/net/ParamType.java +++ b/src/java/org/apache/cassandra/net/ParamType.java @@ -22,6 +22,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.service.accord.debug.AccordRemoteTracing; import org.apache.cassandra.service.writes.thresholds.WriteThresholdMapSerializer; +import org.apache.cassandra.telemetry.tracing.TraceContextSerializer; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.Int32Serializer; import org.apache.cassandra.utils.Int64Serializer; @@ -60,6 +61,7 @@ public enum ParamType WRITE_SIZE_WARN (18, WriteThresholdMapSerializer.serializer), WRITE_TOMBSTONE_WARN (19, WriteThresholdMapSerializer.serializer), ACCORD_TRACING (20, AccordRemoteTracing.tracingSerializer), + TRACE_CONTEXT (21, TraceContextSerializer.serializer), ; final int id; diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 70de57d6be1a..0112b84a1597 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -177,6 +177,8 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.triggers.TriggerExecutor; @@ -192,6 +194,11 @@ import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + import static accord.primitives.Txn.Kind.Read; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Iterables.concat; @@ -304,8 +311,29 @@ private StorageProxy() { EndpointsForToken selected = targets.contacts().withoutSelf(); Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 - Stage.COUNTER_MUTATION.executor() - .execute(counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter, requestTime)); + Runnable task = counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter, requestTime); + Context context = Context.current(); + Stage.COUNTER_MUTATION.execute(() -> { + // Create additional span on coordinator counter-mutation to distinguish from the coordinator span + Span parentSpan = Span.fromContext(context); + String target = mutation.getPartitionUpdates().stream().map((pu) -> pu.metadata().toString()).collect(Collectors.joining(" ")); + Span counterMutationSpan = parentSpan.getSpanContext().isValid() ? Telemetry.getRequestTracer() + .spanBuilder(String.format("%s %s", Verb.COUNTER_MUTATION_REQ.name(), target)) + .setParent(context) + .setAttribute(CassandraAttributes.THREAD_ID, Thread.currentThread().getId()) + .setAttribute(CassandraAttributes.THREAD_NAME, Thread.currentThread().getName()) + .startSpan() + : Span.getInvalid(); + + try (Scope ignore = counterMutationSpan.makeCurrent()) + { + task.run(); + } + finally + { + counterMutationSpan.end(); + } + }); }; @@ -930,7 +958,7 @@ private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLev */ private static void commitPaxosLocal(Replica localReplica, final Message message, final AbstractWriteResponseHandler responseHandler, Dispatcher.RequestTime requestTime) { - PAXOS_COMMIT_REQ.stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica, requestTime) + PAXOS_COMMIT_REQ.stage.maybeExecuteImmediately(Context.current().wrap(new LocalMutationRunnable(localReplica, requestTime) { public void runMayThrow() { @@ -959,7 +987,7 @@ protected Verb verb() { return PAXOS_COMMIT_REQ; } - }); + })); } /** @@ -1989,10 +2017,15 @@ private static Replica pickReplica(EndpointsForToken targets) private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, String description, Dispatcher.RequestTime requestTime) { - stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica, requestTime) + stage.maybeExecuteImmediately(Context.current().wrap(new LocalMutationRunnable(localReplica, requestTime) { public void runMayThrow() { + // update span name if keyspace and table name are available + if (Span.current().getSpanContext().isValid()) + { + Span.current().updateName(String.format("%s %s", verb().name(), description)); + } try { runnable.run(); @@ -2014,15 +2047,31 @@ protected Verb verb() { return Verb.MUTATION_REQ; } - }); + })); } private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback handler, Object description, Dispatcher.RequestTime requestTime) { - stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica, requestTime) + stage.maybeExecuteImmediately(Context.current().wrap(new LocalMutationRunnable(localReplica, requestTime) { public void runMayThrow() { + // update span name if keyspace and table name are available + if (Span.current().getSpanContext().isValid()) + { + String target; + if (description instanceof IMutation) + { + IMutation mutation = (IMutation) description; + target = mutation.getPartitionUpdates().stream() + .map((pu) -> pu.metadata().toString()).collect(Collectors.joining(" ")); + } + else + { + target = description.toString(); + } + Span.current().updateName(String.format("%s %s", verb().name(), target)); + } try { MessageParams.reset(); @@ -2038,6 +2087,7 @@ public void runMayThrow() { if (!(ex instanceof WriteTimeoutException) && !(ex instanceof RetryOnDifferentSystemException)) logger.error("Failed to apply mutation locally : ", ex); + Span.current().recordException(ex); handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailure.forException(ex)); } finally @@ -2059,7 +2109,7 @@ protected Verb verb() { return Verb.MUTATION_REQ; } - }); + })); } /** @@ -2736,7 +2786,16 @@ public LocalReadRunnable(ReadCommand command, ReadCallback handler, Dispatcher.R protected void runMayThrow() { - try + Span parentSpan = Span.current(); + // Create additional Span for coordinator local read if the parent is from remote + // to prevent creating root span for internal read + Span localReadSpan = parentSpan.getSpanContext().isValid() ? Telemetry.getRequestTracer() + .spanBuilder(String.format("%s %s", verb.toString(), command.metadata().toString())) + .setAttribute(CassandraAttributes.THREAD_ID, Thread.currentThread().getId()) + .setAttribute(CassandraAttributes.THREAD_NAME, Thread.currentThread().getName()) + .startSpan() + : Span.getInvalid(); + try (Scope scope = localReadSpan.makeCurrent()) { MessageParams.reset(); @@ -2754,13 +2813,15 @@ protected void runMayThrow() { if (!command.isTrackingWarnings()) throw e; - + + localReadSpan.recordException(e); response = command.createEmptyResponse(); readRejected = true; } catch (QueryCancelledException e) { logger.debug("Query cancelled (timeout)", e); + localReadSpan.recordException(e); response = null; Preconditions.checkState(!command.isCompleted(), "Local read marked as completed despite being aborted by timeout to table %s", command.metadata()); } @@ -2782,6 +2843,8 @@ protected void runMayThrow() } catch (Throwable t) { + localReadSpan.setStatus(StatusCode.ERROR, t.getMessage()); + localReadSpan.recordException(t); if (t instanceof TombstoneOverwhelmingException) { handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailure.READ_TOO_MANY_TOMBSTONES); @@ -2793,6 +2856,10 @@ protected void runMayThrow() throw t; } } + finally + { + localReadSpan.end(); + } } @Override @@ -3174,6 +3241,15 @@ private static abstract class LocalMutationRunnable implements RunnableDebuggabl public final void run() { + Span parentSpan = Span.current(); + // Create additional Span for coordinator local mutation if the parent is valid + // to prevent creating root span for internal mutation + Span localMutationSpan = parentSpan.getSpanContext().isValid() ? Telemetry.getRequestTracer() + .spanBuilder(verb().name()) + .setAttribute(CassandraAttributes.THREAD_ID, Thread.currentThread().getId()) + .setAttribute(CassandraAttributes.THREAD_NAME, Thread.currentThread().getName()) + .startSpan() + : Span.getInvalid(); final Verb verb = verb(); long now = MonotonicClock.Global.approxTime.now(); long deadline = requestTime.computeDeadline(verb.expiresAfterNanos()); @@ -3191,21 +3267,33 @@ public final void run() { protected void runMayThrow() throws Exception { - LocalMutationRunnable.this.runMayThrow(); + try (Scope scope = localMutationSpan.makeCurrent()) + { + LocalMutationRunnable.this.runMayThrow(); + } + finally + { + localMutationSpan.end(); + } } }; submitHint(runnable); return; } - try + try (Scope scope = localMutationSpan.makeCurrent()) { runMayThrow(); } catch (Exception e) { + localMutationSpan.recordException(e); throw new RuntimeException(e); } + finally + { + localMutationSpan.end(); + } } @Override diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2446dd7fa866..e967b3539ff6 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -218,6 +218,7 @@ import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.Unregister; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.transport.ClientResourceLimits; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ExecutorUtils; @@ -858,6 +859,13 @@ public void runMayThrow() throws InterruptedException, ExecutionException, IOExc RegistrationStatus.instance.onRegistration(); Startup.maybeExecuteStartupTransformation(self); + Telemetry.init(DatabaseDescriptor.getClusterName(), + FBUtilities.getReleaseVersionString(), + FBUtilities.getBroadcastAddressAndPort().getHostAddress(false), + FBUtilities.getBroadcastAddressAndPort().getPort(), + Integer.toString(self.id()), + ClusterMetadata.current().locator.local()); + if (CassandraRelevantProperties.SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP.getBoolean()) SystemPeersValidator.validateAndRepair(ClusterMetadata.current()); diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index bbe2a2d25e7f..b69a0e92a849 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -45,11 +45,12 @@ import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.FBUtilities; +import io.opentelemetry.context.Context; + import static com.google.common.collect.Iterables.all; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -73,7 +74,6 @@ public abstract class AbstractReadExecutor protected final ReadRepair readRepair; protected final DigestResolver digestResolver; protected final ReadCallback handler; - protected final TraceState traceState; protected final ColumnFamilyStore cfs; protected final Dispatcher.RequestTime requestTime; @@ -91,7 +91,6 @@ public abstract class AbstractReadExecutor this.digestResolver = new DigestResolver<>(coordinator, command, this.replicaPlan, requestTime); this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, requestTime); this.cfs = cfs; - this.traceState = Tracing.instance.get(); this.requestTime = requestTime; @@ -152,8 +151,7 @@ private void makeRequests(ReadCommand readCommand, Iterable replicas) continue; } - if (traceState != null) - traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + Tracing.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); if (null == message) message = readCommand.createMessage(false, requestTime).withEpoch(ClusterMetadata.current().epoch); @@ -165,7 +163,7 @@ private void makeRequests(ReadCommand readCommand, Iterable replicas) if (hasLocalEndpoint) { logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data"); - Stage.READ.maybeExecuteImmediately(new LocalReadRunnable(readCommand, handler, requestTime)); + Stage.READ.maybeExecuteImmediately(Context.current().wrap(new LocalReadRunnable(readCommand, handler, requestTime))); } } @@ -358,8 +356,7 @@ public void maybeTryAdditionalReplicas() // nor would we be able to speculate a new 'write' if the repair writes are insufficient super.replicaPlan.addToContacts(extraReplica); - if (traceState != null) - traceState.trace("speculating read retry on {}", extraReplica); + Tracing.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); MessagingService.instance().sendWithCallback(retryCommand.createMessage(false, requestTime), extraReplica.endpoint(), handler); } diff --git a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java index 6b2913320143..41a052ac08fa 100644 --- a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java @@ -78,6 +78,8 @@ import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.btree.BTreeSet; +import io.opentelemetry.context.Context; + /** * Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results * being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}). @@ -172,7 +174,7 @@ private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, Replica if (source.isSelf() && coordinator.localReadSupported()) { - Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler, requestTime)); + Stage.READ.maybeExecuteImmediately(Context.current().wrap(new StorageProxy.LocalReadRunnable(cmd, handler, requestTime))); } else { diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index d1562b1b4de9..d423ae898709 100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@ -47,6 +47,8 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; +import io.opentelemetry.context.Context; + public class ShortReadPartitionsProtection extends Transformation implements MorePartitions { private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class); @@ -188,7 +190,7 @@ UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shar if (source.isSelf() && coordinator.localReadSupported()) { - Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler, requestTime)); + Stage.READ.maybeExecuteImmediately(Context.current().wrap(new StorageProxy.LocalReadRunnable(cmd, handler, requestTime))); } else { diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java index 62725a1d6d44..82737bba44f3 100644 --- a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java +++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java @@ -67,6 +67,8 @@ import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; +import io.opentelemetry.context.Context; + import static com.google.common.base.Preconditions.checkState; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetricsForLevel; @@ -220,7 +222,7 @@ private SingleRangeResponse executeNormal(ReplicaPlan.ForRangeRead replicaPlan, if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf() && readCoordinator.localReadSupported()) { - Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime, trackRepairedStatus)); + Stage.READ.execute(Context.current().wrap(new StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime, trackRepairedStatus))); } else { diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 67c484297969..3e97dee2ae01 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -48,6 +48,8 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; +import io.opentelemetry.context.Context; + import static java.util.concurrent.TimeUnit.MICROSECONDS; public abstract class AbstractReadRepair, P extends ReplicaPlan.ForRead> @@ -99,7 +101,7 @@ void sendReadCommand(Replica to, ReadCallback readCallback, boolean specul if (to.isSelf() && coordinator.localReadSupported()) { - Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(command, readCallback, requestTime, trackRepairedStatus)); + Stage.READ.maybeExecuteImmediately(Context.current().wrap(new StorageProxy.LocalReadRunnable(command, readCallback, requestTime, trackRepairedStatus))); return; } diff --git a/src/java/org/apache/cassandra/telemetry/CassandraAttributes.java b/src/java/org/apache/cassandra/telemetry/CassandraAttributes.java new file mode 100644 index 000000000000..34ddae3341b3 --- /dev/null +++ b/src/java/org/apache/cassandra/telemetry/CassandraAttributes.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry; + +import io.opentelemetry.api.common.AttributeKey; + +public final class CassandraAttributes +{ + // Until thread attributes are stabilized, we use these. + // See https://opentelemetry.io/docs/specs/semconv/registry/attributes/thread/ + public static final AttributeKey THREAD_ID = AttributeKey.longKey("thread.id"); + public static final AttributeKey THREAD_NAME = AttributeKey.stringKey("thread.name"); + + public static final AttributeKey CASSANDRA_DC = AttributeKey.stringKey("cassandra.dc"); + public static final AttributeKey CASSANDRA_RACK = AttributeKey.stringKey("cassandra.rack"); + public static final AttributeKey CASSANDRA_QUERY_TYPE = AttributeKey.stringKey("cassandra.query.type"); + public static final AttributeKey CASSANDRA_PAGE_SIZE = AttributeKey.longKey("cassandra.page.size"); + public static final AttributeKey CASSANDRA_CONSISTENCY_LEVEL = AttributeKey.stringKey("cassandra.consistency.level"); + public static final AttributeKey CASSANDRA_SERIAL_CONSISTENCY_LEVEL = AttributeKey.stringKey("cassandra.serial.consistency.level"); + public static final AttributeKey CASSANDRA_NET_VERB = AttributeKey.stringKey("cassandra.net.verb"); + public static final AttributeKey CASSANDRA_COORDINATOR_ADDRESS = AttributeKey.stringKey("cassandra.coordinator.address"); + public static final AttributeKey CASSANDRA_COORDINATOR_PORT = AttributeKey.longKey("cassandra.coordinator.port"); + + /** Value for {@code db.system.name} identifying Apache Cassandra. */ + public static final String DB_SYSTEM_NAME_CASSANDRA = "cassandra"; + + private CassandraAttributes() + { + } +} diff --git a/src/java/org/apache/cassandra/telemetry/Telemetry.java b/src/java/org/apache/cassandra/telemetry/Telemetry.java new file mode 100644 index 000000000000..d1d136995066 --- /dev/null +++ b/src/java/org/apache/cassandra/telemetry/Telemetry.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.tcm.membership.Location; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.ServiceAttributes; + +/** + * Holds references to OpenTelemetry objects + */ +public final class Telemetry +{ + private static volatile OpenTelemetry otel; + + public static OpenTelemetry init(String clusterName, + String cassandraVersion, + String listenAddress, + int port, + String nodeId, + Location location) + { + if (otel != null) return otel; + synchronized (Telemetry.class) + { + if (otel != null) return otel; + + if (DatabaseDescriptor.getOpenTelemetryEnabled()) + { + Resource cassandraResource = Resource.builder() + .put(ServiceAttributes.SERVICE_NAME, clusterName) + .put(ServiceAttributes.SERVICE_NAMESPACE, "cassandra") + .put(ServiceAttributes.SERVICE_VERSION, cassandraVersion) + .put(ServiceAttributes.SERVICE_INSTANCE_ID, nodeId) + .put(ServerAttributes.SERVER_ADDRESS, listenAddress) + .put(ServerAttributes.SERVER_PORT, port) + .put(CassandraAttributes.CASSANDRA_DC, location.datacenter) + .put(CassandraAttributes.CASSANDRA_RACK, location.rack) + .build(); + otel = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesCustomizer((config) -> { + Map customConfig = new HashMap<>(); + // Disable metrics and log export for now + customConfig.put("otel.metrics.exporter", "none"); + customConfig.put("otel.logs.exporter", "none"); + return customConfig; + }) + .addResourceCustomizer((r, config) -> r.merge(cassandraResource)) + .build() + .getOpenTelemetrySdk(); + } + else + { + otel = OpenTelemetry.noop(); + } + return otel; + } + } + + private Telemetry() + { + } + + @VisibleForTesting + static void setOpenTelemetryUnsafe(OpenTelemetry openTelemetry) + { + otel = openTelemetry; + } + + /** + * Returns OpenTelemetry {@link Tracer} to trace client requests. + * Safe to call before `init` completes; returns noop tracer until initialization. + * + * @return Client request {@link Tracer} + */ + public static Tracer getRequestTracer() + { + OpenTelemetry current = otel; + if (current == null) + current = OpenTelemetry.noop(); + return current.getTracer("org.apache.cassandra.request"); + } +} diff --git a/src/java/org/apache/cassandra/telemetry/tracing/CustomPayloadGetter.java b/src/java/org/apache/cassandra/telemetry/tracing/CustomPayloadGetter.java new file mode 100644 index 000000000000..93459e24638e --- /dev/null +++ b/src/java/org/apache/cassandra/telemetry/tracing/CustomPayloadGetter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry.tracing; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.util.Map; + +import org.apache.cassandra.utils.ByteBufferUtil; + +import io.opentelemetry.context.propagation.TextMapGetter; + +/** + * TextMapGetter that extracts W3C Trace Context from native protocol's custom payload + *

+ * In native protocol's custom payload, the following keys can be set to propagate tracing + * from the applications. + *

    + *
  • traceparent
  • + *
  • tracestate
  • + *
+ * OpenTelemetry only supports text format for propagating context right now. + * So the values associated with the above keys are Strings. + *

+ * + * @see W3C Trace Context + */ +public final class CustomPayloadGetter implements TextMapGetter> +{ + public static final CustomPayloadGetter instance = new CustomPayloadGetter(); + + private CustomPayloadGetter() {} + + @Override + public Iterable keys(Map carrier) + { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) + { + if (carrier == null || !carrier.containsKey(key)) + { + return null; + } + ByteBuffer value = carrier.get(key); + try + { + return ByteBufferUtil.string(value); + } + catch (CharacterCodingException e) + { + return null; + } + } +} diff --git a/src/java/org/apache/cassandra/telemetry/tracing/TraceContextSerializer.java b/src/java/org/apache/cassandra/telemetry/tracing/TraceContextSerializer.java new file mode 100644 index 000000000000..97c657169498 --- /dev/null +++ b/src/java/org/apache/cassandra/telemetry/tracing/TraceContextSerializer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry.tracing; + +import java.io.IOException; +import java.util.Map; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.TraceStateBuilder; +import io.opentelemetry.context.Context; + +/** + * Serializer for OpenTelemetry {@link Context}. + *

+ * This is used to propagate Tracing Context across the nodes through inter-node messaging. + *

+ */ +public class TraceContextSerializer implements IVersionedSerializer +{ + public static final TraceContextSerializer serializer = new TraceContextSerializer(); + + @Override + public void serialize(Context context, DataOutputPlus out, int version) throws IOException + { + SpanContext spanContext = Span.fromContext(context).getSpanContext(); + out.write(spanContext.getTraceIdBytes()); + out.write(spanContext.getSpanIdBytes()); + out.write(spanContext.getTraceFlags().asByte()); + TraceState traceState = spanContext.getTraceState(); + out.writeUnsignedVInt32(traceState.size()); + for (Map.Entry entry : traceState.asMap().entrySet()) + { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + + @Override + public Context deserialize(DataInputPlus in, int version) throws IOException + { + // Trace ID - 16 bytes + byte[] traceIdBytes = new byte[16]; + in.readFully(traceIdBytes); + String traceId = TraceId.fromBytes(traceIdBytes); + // Span ID - 8 bytes + byte[] spanIdBytes = new byte[8]; + in.readFully(spanIdBytes); + String spanId = SpanId.fromBytes(spanIdBytes); + // Trace flag - 1 byte + byte traceFlagsByte = in.readByte(); + TraceFlags traceFlags = TraceFlags.fromByte(traceFlagsByte); + + TraceStateBuilder traceStateBuilder = TraceState.builder(); + int traceStateSize = in.readUnsignedVInt32(); + for (int i = 0; i < traceStateSize; i++) + traceStateBuilder.put(in.readUTF(), in.readUTF()); + + SpanContext remoteContext = SpanContext.createFromRemoteParent(traceId, spanId, traceFlags, traceStateBuilder.build()); + + if (!remoteContext.isValid()) + return Context.current(); + + return Context.current().with(Span.wrap(remoteContext)); + } + + @Override + public long serializedSize(Context context, int version) + { + SpanContext spanContext = Span.fromContext(context).getSpanContext(); + TraceState traceState = spanContext.getTraceState(); + long size = 16 + 8 + 1 + TypeSizes.sizeofUnsignedVInt(traceState.size()); + for (Map.Entry entry : traceState.asMap().entrySet()) + size += TypeSizes.sizeof(entry.getKey()) + TypeSizes.sizeof(entry.getValue()); + return size; + } +} diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 4ef2e500cb4d..5342fd50f57c 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.helpers.MessageFormatter; import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.config.DatabaseDescriptor; @@ -39,10 +40,14 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.ParamType; +import org.apache.cassandra.telemetry.CassandraAttributes; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.TimeUUID; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; + import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_TRACING_CLASS; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; @@ -328,6 +333,15 @@ public static void traceRepair(String format, Object... args) // normal traces get zero-, one-, and two-argument overloads so common case doesn't need to create varargs array public static void trace(String message) { + Span span = Span.current(); + if (span.isRecording()) + { + Attributes attr = Attributes.builder() + .put(CassandraAttributes.THREAD_ID, Thread.currentThread().getId()) + .put(CassandraAttributes.THREAD_NAME, Thread.currentThread().getName()) + .build(); + span.addEvent(message, attr); + } final TraceState state = instance.get(); if (state == null) // inline isTracing to avoid implicit two calls to state.get() return; @@ -337,29 +351,20 @@ public static void trace(String message) public static void trace(String format, Object arg) { - final TraceState state = instance.get(); - if (state == null) // inline isTracing to avoid implicit two calls to state.get() - return; - - state.trace(format, arg); + String message = MessageFormatter.format(format, arg).getMessage(); + trace(message); } public static void trace(String format, Object arg1, Object arg2) { - final TraceState state = instance.get(); - if (state == null) // inline isTracing to avoid implicit two calls to state.get() - return; - - state.trace(format, arg1, arg2); + String message = MessageFormatter.format(format, arg1, arg2).getMessage(); + trace(message); } public static void trace(String format, Object... args) { - final TraceState state = instance.get(); - if (state == null) // inline isTracing to avoid implicit two calls to state.get() - return; - - state.trace(format, args); + String message = MessageFormatter.arrayFormat(format, args).getMessage(); + trace(message); } /** diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index d55e011b3a12..c87011dfe655 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -19,7 +19,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,6 +32,7 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.telemetry.tracing.CustomPayloadGetter; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Envelope.Header.Flag; import org.apache.cassandra.transport.messages.AuthChallenge; @@ -55,6 +58,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; @@ -239,6 +247,16 @@ protected boolean isTraceable() return false; } + protected Span createSpan(InetSocketAddress clientAddress, Context context) + { + return Span.getInvalid(); + } + + protected Map getInitialTraceParameters() + { + return Collections.emptyMap(); + } + /** * @return true if warnings should be tracked and aborts enforced for resource limits on this {@link Request} */ @@ -254,6 +272,7 @@ public final Response execute(QueryState queryState, Dispatcher.RequestTime requ boolean shouldTrace = false; TimeUUID tracingSessionId = null; + final Span span; if (isTraceable()) { if (isTracingRequested()) @@ -267,15 +286,41 @@ else if (StorageService.instance.shouldTraceProbablistically()) shouldTrace = true; Tracing.instance.newSession(getCustomPayload()); } + + // Try getting OpenTelemetry tracing context from custom payload containing W3C trace context + Context context = W3CTraceContextPropagator.getInstance() + .extract(Context.current(), getCustomPayload(), CustomPayloadGetter.instance); + + // Create span if the span is propagated, or cassandra tracing is requested + // Note that probablistic tracing will not create OpenTelemetry tracing + if (Span.fromContext(context).getSpanContext().isRemote() || isTracingRequested()) + { + span = createSpan(queryState.getClientState().getRemoteAddress(), context); + } + else + { + span = Span.getInvalid(); + } + } + else + { + span = Span.getInvalid(); } Response response; - try + try (Scope scope = span.makeCurrent()) { response = execute(queryState, requestTime, shouldTrace); } + catch (Throwable e) + { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); + throw e; + } finally { + span.end(); if (shouldTrace) Tracing.instance.stopSession(); } diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index d40b6d01cbcd..f3bcd6f311ec 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport.messages; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -36,22 +37,31 @@ import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.LocalizeString; import org.apache.cassandra.utils.MD5Digest; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.ClientAttributes; +import io.opentelemetry.semconv.DbAttributes; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; public class BatchMessage extends Message.Request { - public static final Message.Codec codec = new Message.Codec() + public static final Message.Codec codec = new Message.Codec<>() { public BatchMessage decode(ByteBuf body, ProtocolVersion version) { @@ -225,6 +235,10 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor // (and no value would be really correct, so we prefer passing a clearly wrong one). BatchStatement batch = new BatchStatement(batchType, VariableSpecifications.empty(), statements, Attributes.none()); + if (Span.current().getSpanContext().isValid()) + { + Span.current().updateName(String.format("%s %s", type.name(), batch.getQuerySummary())); + } long queryTime = currentTimeMillis(); Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), requestTime); @@ -236,10 +250,31 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ { QueryEvents.instance.notifyBatchFailure(prepared, batchType, queryOrIdList, values, options, state, e); JVMStabilityInspector.inspectThrowable(e); + Span.current().recordException(e); return ErrorMessage.fromException(e); } } + @Override + protected Span createSpan(InetSocketAddress clientAddress, Context context) + { + String consistencyValue = options.getConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getConsistency().name()) : ""; + String serialConsistencyValue = options.getSerialConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getSerialConsistency().name()) : ""; + return Telemetry.getRequestTracer().spanBuilder(type.name()) // Span name will be updated after successful statement parsing + .setSpanKind(SpanKind.SERVER) + .setParent(context) + .setAttribute(DbAttributes.DB_SYSTEM_NAME, CassandraAttributes.DB_SYSTEM_NAME_CASSANDRA) + .setAttribute(CassandraAttributes.CASSANDRA_QUERY_TYPE, type.name()) + .setAttribute(ClientAttributes.CLIENT_ADDRESS, clientAddress.getAddress().getHostAddress()) + .setAttribute(ClientAttributes.CLIENT_PORT, clientAddress.getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_ADDRESS, FBUtilities.getBroadcastNativeAddressAndPort().getHostAddress(false)) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_PORT, FBUtilities.getBroadcastNativeAddressAndPort().getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_PAGE_SIZE, options.getPageSize()) + .setAttribute(CassandraAttributes.CASSANDRA_CONSISTENCY_LEVEL, consistencyValue) + .setAttribute(CassandraAttributes.CASSANDRA_SERIAL_CONSISTENCY_LEVEL, serialConsistencyValue) + .startSpan(); + } + private void traceQuery(QueryState state) { ImmutableMap.Builder builder = ImmutableMap.builder(); diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 0f6afe0fe123..c678fa02d9a0 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport.messages; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -33,6 +34,8 @@ import org.apache.cassandra.net.ArtificialLatency; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Dispatcher; @@ -40,11 +43,18 @@ import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.LocalizeString; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.NoSpamLogger; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.ClientAttributes; +import io.opentelemetry.semconv.DbAttributes; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -52,7 +62,7 @@ public class ExecuteMessage extends Message.Request { private static final NoSpamLogger nospam = NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES); - public static final Message.Codec codec = new Message.Codec() + public static final Message.Codec codec = new Message.Codec<>() { public ExecuteMessage decode(ByteBuf body, ProtocolVersion version) { @@ -122,6 +132,26 @@ protected boolean isTraceable() return true; } + @Override + protected Span createSpan(InetSocketAddress clientAddress, Context context) + { + String consistencyValue = options.getConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getConsistency().name()) : ""; + String serialConsistencyValue = options.getSerialConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getSerialConsistency().name()) : ""; + return Telemetry.getRequestTracer().spanBuilder(type.name()) // Span name will be updated after successful statement parsing + .setSpanKind(SpanKind.SERVER) + .setParent(context) + .setAttribute(DbAttributes.DB_SYSTEM_NAME, CassandraAttributes.DB_SYSTEM_NAME_CASSANDRA) + .setAttribute(CassandraAttributes.CASSANDRA_QUERY_TYPE, type.name()) + .setAttribute(ClientAttributes.CLIENT_ADDRESS, clientAddress.getAddress().getHostAddress()) + .setAttribute(ClientAttributes.CLIENT_PORT, clientAddress.getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_ADDRESS, FBUtilities.getBroadcastNativeAddressAndPort().getHostAddress(false)) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_PORT, FBUtilities.getBroadcastNativeAddressAndPort().getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_PAGE_SIZE, options.getPageSize()) + .setAttribute(CassandraAttributes.CASSANDRA_CONSISTENCY_LEVEL, consistencyValue) + .setAttribute(CassandraAttributes.CASSANDRA_SERIAL_CONSISTENCY_LEVEL, serialConsistencyValue) + .startSpan(); + } + @Override protected boolean isTrackable() { @@ -150,6 +180,12 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ } CQLStatement statement = prepared.statement; + // update span name + Span span = Span.current(); + if (span.getSpanContext().isValid()) + { + span.updateName(String.format("%s %s", type.name(), statement.getQuerySummary())); + } options.prepare(statement.getBindVariables()); if (options.getPageSize() == 0) @@ -167,6 +203,13 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ long requestStartTime = currentTimeMillis(); + if (span.isRecording()) + { + // Use raw CQL statement here to store parameterized query + // Until proper sanitization can be done, bound parameters are not added as attributes + span.setAttribute(DbAttributes.DB_QUERY_TEXT, prepared.rawCQLStatement); + } + Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), requestTime); QueryEvents.instance.notifyExecuteSuccess(prepared.statement, prepared.rawCQLStatement, options, state, requestStartTime, response); @@ -207,6 +250,7 @@ else if (options.skipMetadata()) { QueryEvents.instance.notifyExecuteFailure(prepared, options, state, e); JVMStabilityInspector.inspectThrowable(e); + Span.current().recordException(e); return ErrorMessage.fromException(e); } } diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index e92b69b16e00..86ad44aafb67 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport.messages; +import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; @@ -28,15 +29,23 @@ import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.NoSpamLogger; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.ClientAttributes; +import io.opentelemetry.semconv.DbAttributes; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -45,7 +54,7 @@ public class PrepareMessage extends Message.Request private static final Logger logger = LoggerFactory.getLogger(PrepareMessage.class); private static final NoSpamLogger nospam = NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES); - public static final Message.Codec codec = new Message.Codec() + public static final Message.Codec codec = new Message.Codec<>() { public PrepareMessage decode(ByteBuf body, ProtocolVersion version) { @@ -115,6 +124,22 @@ protected boolean isTraceable() return true; } + @Override + protected Span createSpan(InetSocketAddress clientAddress, Context context) + { + return Telemetry.getRequestTracer().spanBuilder(type.name()) + .setSpanKind(SpanKind.SERVER) + .setParent(context) + .setAttribute(DbAttributes.DB_SYSTEM_NAME, CassandraAttributes.DB_SYSTEM_NAME_CASSANDRA) + .setAttribute(DbAttributes.DB_QUERY_TEXT, query) + .setAttribute(CassandraAttributes.CASSANDRA_QUERY_TYPE, type.name()) + .setAttribute(ClientAttributes.CLIENT_ADDRESS, clientAddress.getAddress().getHostAddress()) + .setAttribute(ClientAttributes.CLIENT_PORT, clientAddress.getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_ADDRESS, FBUtilities.getBroadcastNativeAddressAndPort().getHostAddress(false)) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_PORT, FBUtilities.getBroadcastNativeAddressAndPort().getPort()) + .startSpan(); + } + @Override protected Message.Response execute(QueryState state, Dispatcher.RequestTime requestTime, boolean traceRequest) { @@ -127,6 +152,14 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ QueryHandler queryHandler = ClientState.getCQLQueryHandler(); long queryTime = currentTimeMillis(); ResultMessage.Prepared response = queryHandler.prepare(query, clientState, getCustomPayload()); + if (Span.current().getSpanContext().isValid()) + { + QueryHandler.Prepared prepared = queryHandler.getPrepared(response.statementId); + if (prepared != null && prepared.statement != null) + { + Span.current().updateName(String.format("%s %s", type.name(), prepared.statement.getQuerySummary())); + } + } QueryEvents.instance.notifyPrepareSuccess(() -> queryHandler.getPrepared(response.statementId), query, state, queryTime, response); return response; } @@ -134,6 +167,7 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ { QueryEvents.instance.notifyPrepareFailure(null, query, state, e); JVMStabilityInspector.inspectThrowable(e); + Span.current().recordException(e); return ErrorMessage.fromException(e); } } diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 61f3a9cfbd83..84a5dc51ae58 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.transport.messages; +import java.net.InetSocketAddress; + import com.google.common.collect.ImmutableMap; import org.apache.cassandra.cql3.CQLStatement; @@ -27,15 +29,24 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.telemetry.CassandraAttributes; +import org.apache.cassandra.telemetry.Telemetry; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.LocalizeString; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.ClientAttributes; +import io.opentelemetry.semconv.DbAttributes; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -44,7 +55,7 @@ */ public class QueryMessage extends Message.Request { - public static final Message.Codec codec = new Message.Codec() + public static final Message.Codec codec = new Message.Codec<>() { public QueryMessage decode(ByteBuf body, ProtocolVersion version) { @@ -103,6 +114,7 @@ protected boolean isTrackable() protected Message.Response execute(QueryState state, Dispatcher.RequestTime requestTime, boolean traceRequest) { CQLStatement statement = null; + Span span = Span.current(); try { if (options.getPageSize() == 0) @@ -115,11 +127,16 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ QueryHandler queryHandler = ClientState.getCQLQueryHandler(); statement = queryHandler.parse(query, state, options); + // update span name + if (span.getSpanContext().isValid()) + { + span.updateName(String.format("%s %s", type.name(), statement.getQuerySummary())); + } Message.Response response = queryHandler.process(statement, state, options, getCustomPayload(), requestTime); QueryEvents.instance.notifyQuerySuccess(statement, query, options, state, queryStartTime, response); if (options.skipMetadata() && response instanceof ResultMessage.Rows) - ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); + ((ResultMessage.Rows) response).result.metadata.setSkipMetadata(); return response; } @@ -129,10 +146,31 @@ protected Message.Response execute(QueryState state, Dispatcher.RequestTime requ JVMStabilityInspector.inspectThrowable(e); if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException))) logger.error("Unexpected error during query", e); + span.recordException(e); return ErrorMessage.fromException(e); } } + @Override + protected Span createSpan(InetSocketAddress clientAddress, Context context) + { + String consistencyValue = options.getConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getConsistency().name()) : ""; + String serialConsistencyValue = options.getSerialConsistency() != null ? LocalizeString.toLowerCaseLocalized(options.getSerialConsistency().name()) : ""; + return Telemetry.getRequestTracer().spanBuilder(type.name()) // Span name will be updated after successful statement parsing + .setSpanKind(SpanKind.SERVER) + .setParent(context) + .setAttribute(DbAttributes.DB_SYSTEM_NAME, CassandraAttributes.DB_SYSTEM_NAME_CASSANDRA) + .setAttribute(CassandraAttributes.CASSANDRA_QUERY_TYPE, type.name()) + .setAttribute(ClientAttributes.CLIENT_ADDRESS, clientAddress.getAddress().getHostAddress()) + .setAttribute(ClientAttributes.CLIENT_PORT, clientAddress.getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_ADDRESS, FBUtilities.getBroadcastNativeAddressAndPort().getHostAddress(false)) + .setAttribute(CassandraAttributes.CASSANDRA_COORDINATOR_PORT, FBUtilities.getBroadcastNativeAddressAndPort().getPort()) + .setAttribute(CassandraAttributes.CASSANDRA_PAGE_SIZE, options.getPageSize()) + .setAttribute(CassandraAttributes.CASSANDRA_CONSISTENCY_LEVEL, consistencyValue) + .setAttribute(CassandraAttributes.CASSANDRA_SERIAL_CONSISTENCY_LEVEL, serialConsistencyValue) + .startSpan(); + } + private void traceQuery(QueryState state) { ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -150,7 +188,7 @@ private void traceQuery(QueryState state) @Override public String toString() { - return String.format("QUERY %s [pageSize = %d] at consistency %s", + return String.format("QUERY %s [pageSize = %d] at consistency %s", query, options.getPageSize(), options.getConsistency()); } } diff --git a/test/unit/org/apache/cassandra/telemetry/OpenTelemetryTracingTest.java b/test/unit/org/apache/cassandra/telemetry/OpenTelemetryTracingTest.java new file mode 100644 index 000000000000..793fa76f9ec3 --- /dev/null +++ b/test/unit/org/apache/cassandra/telemetry/OpenTelemetryTracingTest.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.QueryTrace; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.service.StorageService; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.DbAttributes; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Single node OpenTelemetry tracing test. + */ +public class OpenTelemetryTracingTest extends CQLTester +{ + @Rule + public final OpenTelemetryRule otelTesting = OpenTelemetryRule.create(); + + private static final String OTEL_TEST_TABLE_NAME = "otel_test"; + private static final String OTEL_TEST_COUNTER_TABLE_NAME = "otel_test_counter"; + + @BeforeClass + public static void setUpClass() + { + ServerTestUtils.daemonInitialization(); + CQLTester.setUpClass(); + // For counter-mutations, we need to enable coordinator execution + StorageService.instance.setRpcReady(true); + } + + @AfterClass + public static void tearDownClass() + { + CQLTester.disableCoordinatorExecution(); + Telemetry.setOpenTelemetryUnsafe(OpenTelemetry.noop()); + } + + @Before + public void setUp() + { + Telemetry.setOpenTelemetryUnsafe(otelTesting.getOpenTelemetry()); + CQLTester.requireNetwork(); + createTable(KEYSPACE, "CREATE TABLE IF NOT EXISTS %s (key text PRIMARY KEY, value text)", OTEL_TEST_TABLE_NAME); + createTable(KEYSPACE, "CREATE TABLE IF NOT EXISTS %s (key text PRIMARY KEY, count counter)", OTEL_TEST_COUNTER_TABLE_NAME); + } + + /** + * Test Span creation for a single partition read with cassandra tracing enabled. + */ + @Test + public void testSpanForSinglePartitionReadWithTracing() + { + int pageSize = 100; + ConsistencyLevel cl = ConsistencyLevel.LOCAL_QUORUM; + Session session = sessionNet(); + + String query = String.format("SELECT * FROM %s.%s WHERE key = 'a'", KEYSPACE, OTEL_TEST_TABLE_NAME); + Statement simpleStatement = new SimpleStatement(query) + .setConsistencyLevel(cl) + .setFetchSize(pageSize) + .enableTracing(); + ResultSet result = session.execute(simpleStatement); + QueryTrace cassandraTrace = result.getExecutionInfo().getQueryTrace(); + + List spans = otelTesting.getSpans(); + + assertThat(spans).hasSize(2); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData readSpan = spansByName.get(String.format("READ_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorSpan = spansByName.get(String.format("QUERY SELECT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + + assertThat(readSpan).isNotNull(); + assertThat(coordinatorSpan).isNotNull(); + + assertThat(readSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + assertThat(readSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + + Map, Object> querySpanAttrs = coordinatorSpan.getAttributes().asMap(); + assertThat(querySpanAttrs).containsEntry(DbAttributes.DB_SYSTEM_NAME, CassandraAttributes.DB_SYSTEM_NAME_CASSANDRA); + assertThat(querySpanAttrs).containsEntry(CassandraAttributes.CASSANDRA_QUERY_TYPE, "QUERY"); + // Page size is a Long attribute type + assertThat(querySpanAttrs).containsEntry(CassandraAttributes.CASSANDRA_PAGE_SIZE, (long) pageSize); + // Consistency level in span attributes is in lowercase + assertThat(querySpanAttrs).containsEntry(CassandraAttributes.CASSANDRA_CONSISTENCY_LEVEL, cl.name().toLowerCase()); + // Attribute should not leak raw query + assertThat(querySpanAttrs).doesNotContainKey(DbAttributes.DB_QUERY_TEXT); + + // Together, spans contain the same number of events as Cassandra trace + int eventsCount = spans.stream().mapToInt(SpanData::getTotalRecordedEvents).sum(); + assertThat(eventsCount).isEqualTo(cassandraTrace.getEvents().size()); + } + + /** + * Test Span creation for a partition range read with cassandra tracing enabled. + */ + @Test + public void testSpanForPartitionRangeReadWithTracing() + { + Session session = sessionNet(); + + String query = String.format("SELECT * FROM %s.%s LIMIT 5", KEYSPACE, OTEL_TEST_TABLE_NAME); + Statement simpleStatement = new SimpleStatement(query).enableTracing(); + ResultSet result = session.execute(simpleStatement); + QueryTrace cassandraTrace = result.getExecutionInfo().getQueryTrace(); + + List spans = otelTesting.getSpans(); + + assertThat(spans).hasSize(2); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData readSpan = spansByName.get(String.format("READ_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorSpan = spansByName.get(String.format("QUERY SELECT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + + assertThat(readSpan).isNotNull(); + assertThat(coordinatorSpan).isNotNull(); + + assertThat(readSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + // coordinator span + // |-> replica span (read) + assertThat(readSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + + int eventsCount = spans.stream().mapToInt(SpanData::getTotalRecordedEvents).sum(); + assertThat(eventsCount).isEqualTo(cassandraTrace.getEvents().size()); + } + + /** + * Test Span creation for mutation with cassandra tracing enabled. + */ + @Test + public void testSpanForMutationWithTracing() + { + Session session = sessionNet(); + + String query = String.format("INSERT INTO %s.%s (key, value) VALUES ('key', 'value')", KEYSPACE, OTEL_TEST_TABLE_NAME); + Statement simpleStatement = new SimpleStatement(query).enableTracing(); + ResultSet result = session.execute(simpleStatement); + QueryTrace cassandraTrace = result.getExecutionInfo().getQueryTrace(); + + List spans = otelTesting.getSpans(); + + assertThat(spans).hasSize(2); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData mutationSpan = spansByName.get(String.format("MUTATION_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorSpan = spansByName.get(String.format("QUERY INSERT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + + assertThat(mutationSpan).isNotNull(); + assertThat(coordinatorSpan).isNotNull(); + + assertThat(mutationSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + // coordinator span + // |-> replica span (mutation) + assertThat(mutationSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + + int eventsCount = spans.stream().mapToInt(SpanData::getTotalRecordedEvents).sum(); + assertThat(eventsCount).isEqualTo(cassandraTrace.getEvents().size()); + } + + /** + * Test Span creation for counter-mutation with cassandra tracing enabled. + */ + @Test + public void testSpanForCounterMutationWithTracing() + { + Session session = sessionNet(); + + String query = String.format("UPDATE %s.%s SET count += 1 WHERE key = 'a'", KEYSPACE, OTEL_TEST_COUNTER_TABLE_NAME); + Statement simpleStatement = new SimpleStatement(query).enableTracing(); + ResultSet result = session.execute(simpleStatement); + QueryTrace cassandraTrace = result.getExecutionInfo().getQueryTrace(); + + List spans = otelTesting.getSpans(); + + assertThat(spans).hasSize(2); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData counterMutationSpan = spansByName.get(String.format("COUNTER_MUTATION_REQ %s.%s", KEYSPACE, OTEL_TEST_COUNTER_TABLE_NAME)); + SpanData coordinatorSpan = spansByName.get(String.format("QUERY UPDATE %s.%s", KEYSPACE, OTEL_TEST_COUNTER_TABLE_NAME)); + + assertThat(counterMutationSpan).isNotNull(); + assertThat(coordinatorSpan).isNotNull(); + + assertThat(counterMutationSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + // coordinator span + // |-> replica span (counter mutation) + assertThat(counterMutationSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + + int eventsCount = spans.stream().mapToInt(SpanData::getTotalRecordedEvents).sum(); + assertThat(eventsCount).isEqualTo(cassandraTrace.getEvents().size()); + } + + /** + * Test Span creation for batch mutation with cassandra tracing enabled. + */ + @Test + public void testSpanForLoggedBatchMutationWithTracing() + { + Session session = sessionNet(); + + String query1 = String.format("INSERT INTO %s.%s (key, value) VALUES ('k1', 'v1')", KEYSPACE, OTEL_TEST_TABLE_NAME); + String query2 = String.format("INSERT INTO %s.%s (key, value) VALUES ('k2', 'v2')", KEYSPACE, OTEL_TEST_TABLE_NAME); + BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); + batch.add(new SimpleStatement(query1)); + batch.add(new SimpleStatement(query2)); + batch.enableTracing(); + ResultSet result = session.execute(batch); + QueryTrace cassandraTrace = result.getExecutionInfo().getQueryTrace(); + + List spans = otelTesting.getSpans(); + + // Since this is a logged batch with two partitions, we have: + // 1. coordinator span + // 2. a replica span for batchlog store + // 3. two replica spans for actual mutations + // 4. a replica span for batchlog remove (optional, since this is async) + assertThat(spans.size()).isGreaterThanOrEqualTo(4); + assertThat(spans.size()).isLessThanOrEqualTo(5); + + String batchlogStoreSpanName = "MUTATION_REQ Batchlog store"; + String mutationSpanName = String.format("MUTATION_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME); + String coordinatorSpanName = String.format("BATCH LOGGED BATCH INSERT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME); + String batchlogRemoveSpanName = "MUTATION_REQ Batchlog remove"; + + // Coordinator should be unique + List coordinatorSpans = spans.stream() + .filter(s -> s.getName().equals(coordinatorSpanName)) + .collect(Collectors.toList()); + assertThat(coordinatorSpans).hasSize(1); + SpanData coordinatorSpan = coordinatorSpans.get(0); + + // Batchlog store mutation span should be 1 + List batchlogStoreSpans = spans.stream() + .filter(s -> s.getName().equals(batchlogStoreSpanName)) + .collect(Collectors.toList()); + assertThat(batchlogStoreSpans).hasSize(1); + SpanData batchlogSpan = batchlogStoreSpans.get(0); + assertThat(batchlogSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + assertThat(batchlogSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + + // Replica mutation spans can be multiple + List mutationSpans = spans.stream() + .filter(s -> s.getName().equals(mutationSpanName)) + .collect(Collectors.toList()); + assertThat(mutationSpans).hasSize(2); + for (SpanData mutationSpan : mutationSpans) + { + assertThat(mutationSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + assertThat(mutationSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + } + + // if batchlog remove span is present, validate it + List batchlogRemoveSpans = spans.stream() + .filter(s -> s.getName().equals(batchlogRemoveSpanName)) + .collect(Collectors.toList()); + if (!batchlogRemoveSpans.isEmpty()) + { + assertThat(batchlogRemoveSpans).hasSize(1); + SpanData batchlogRemoveSpan = batchlogRemoveSpans.get(0); + assertThat(batchlogRemoveSpan.getTraceId()).isEqualTo(coordinatorSpan.getTraceId()); + // Batchlog remove is submitted from the mutation stage of the last successful mutation + // so should not be from the coordinator span + assertThat(batchlogRemoveSpan.getParentSpanId()).isNotEqualTo(coordinatorSpan.getSpanId()); + } + + int eventsCount = spans.stream().mapToInt(SpanData::getTotalRecordedEvents).sum(); + assertThat(eventsCount).isEqualTo(cassandraTrace.getEvents().size()); + } + + /** + * Test Span creation of query from trace context propagation. + */ + @Test + public void testSpanFromContextPropagation() + { + ConsistencyLevel cl = ConsistencyLevel.LOCAL_QUORUM; + Session session = sessionNet(); + + String clientSpanName = "testSpanFromContextPropagation"; + Span span = otelTesting.getOpenTelemetry() + .getTracer("openTelemetryTracingTest") + .spanBuilder(clientSpanName) + .startSpan(); + try (Scope ignore = span.makeCurrent()) + { + Map payload = createTracingPayload(); + String query = String.format("SELECT * FROM %s.%s WHERE key = 'a'", KEYSPACE, OTEL_TEST_TABLE_NAME); + Statement simpleStatement = new SimpleStatement(query) + .setConsistencyLevel(cl) + .setOutgoingPayload(payload); + session.execute(simpleStatement); + } + finally + { + span.end(); + } + + List spans = otelTesting.getSpans(); + // Should contain: client span, coordinator span and replica span + assertThat(spans).hasSize(3); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData replicaSpan = spansByName.get(String.format("READ_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorSpan = spansByName.get(String.format("QUERY SELECT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData clientSpan = spansByName.get(clientSpanName); + + assertThat(replicaSpan).isNotNull(); + assertThat(coordinatorSpan).isNotNull(); + assertThat(clientSpan).isNotNull(); + + // They all should have the same trace ID + assertThat(replicaSpan.getTraceId()).isEqualTo(clientSpan.getTraceId()); + assertThat(coordinatorSpan.getTraceId()).isEqualTo(clientSpan.getTraceId()); + // client span + // |-> coordinator span + // |-> replica span + assertThat(coordinatorSpan.getParentSpanId()).isEqualTo(clientSpan.getSpanId()); + assertThat(replicaSpan.getParentSpanId()).isEqualTo(coordinatorSpan.getSpanId()); + } + + /** + * Test Span creation of preparing and executing a query from trace context propagation. + */ + @Test + public void testSpanForPrepareAndExecuteFromContextPropagation() + { + Session session = sessionNet(); + + String clientSpanName = "testSpanForPrepareAndExecuteFromContextPropagation"; + String query = String.format("SELECT * FROM %s.%s WHERE key = ?", KEYSPACE, OTEL_TEST_TABLE_NAME); + Span span = otelTesting.getOpenTelemetry() + .getTracer("openTelemetryTracingTest") + .spanBuilder(clientSpanName) + .startSpan(); + try (Scope ignore = span.makeCurrent()) + { + Map payload = createTracingPayload(); + SimpleStatement simpleStatement = new SimpleStatement(query); + simpleStatement.setOutgoingPayload(payload); + PreparedStatement prepared = session.prepare(simpleStatement); + + BoundStatement bound = prepared.bind().setString(0, "a"); + bound.setOutgoingPayload(payload); + session.execute(bound); + } + finally + { + span.end(); + } + + List spans = otelTesting.getSpans(); + + // Should contain: client span, prepare span, execute span and replica span + assertThat(spans).hasSize(4); + + // Spans are not guaranteed to be returned in order. + Map spansByName = spans.stream().collect(Collectors.toMap(SpanData::getName, Function.identity())); + + SpanData replicaSpan = spansByName.get(String.format("READ_REQ %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorPrepareSpan = spansByName.get(String.format("PREPARE SELECT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData coordinatorExecuteSpan = spansByName.get(String.format("EXECUTE SELECT %s.%s", KEYSPACE, OTEL_TEST_TABLE_NAME)); + SpanData clientSpan = spansByName.get(clientSpanName); + + assertThat(replicaSpan).isNotNull(); + assertThat(coordinatorPrepareSpan).isNotNull(); + assertThat(coordinatorExecuteSpan).isNotNull(); + assertThat(clientSpan).isNotNull(); + + // All spans should have the same trace ID + assertThat(replicaSpan.getTraceId()).isEqualTo(clientSpan.getTraceId()); + assertThat(coordinatorPrepareSpan.getTraceId()).isEqualTo(clientSpan.getTraceId()); + assertThat(coordinatorExecuteSpan.getTraceId()).isEqualTo(clientSpan.getTraceId()); + // client span + // |-> coordinator prepare span + // |-> coordinator execute span + // |-> replica span + assertThat(replicaSpan.getParentSpanId()).isEqualTo(coordinatorExecuteSpan.getSpanId()); + assertThat(coordinatorExecuteSpan.getParentSpanId()).isEqualTo(clientSpan.getSpanId()); + assertThat(coordinatorPrepareSpan.getParentSpanId()).isEqualTo(clientSpan.getSpanId()); + + // Prepare and Execute spanes can contain parameterized query in attributes + Map, Object> prepareSpanAttrs = coordinatorPrepareSpan.getAttributes().asMap(); + assertThat(prepareSpanAttrs).containsEntry(DbAttributes.DB_QUERY_TEXT, query); + Map, Object> executeSpanAttrs = coordinatorExecuteSpan.getAttributes().asMap(); + assertThat(executeSpanAttrs).containsEntry(DbAttributes.DB_QUERY_TEXT, query); + } + + private Map createTracingPayload() + { + Map payload = new HashMap<>(); + otelTesting.getOpenTelemetry() + .getPropagators() + .getTextMapPropagator() + .inject(Context.current(), payload, (carrier, key, value) -> + { + if (carrier != null) + carrier.put(key, ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); + }); + return payload; + } +} diff --git a/test/unit/org/apache/cassandra/telemetry/tracing/TraceContextSerializerTest.java b/test/unit/org/apache/cassandra/telemetry/tracing/TraceContextSerializerTest.java new file mode 100644 index 000000000000..d1ca5f36873c --- /dev/null +++ b/test/unit/org/apache/cassandra/telemetry/tracing/TraceContextSerializerTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.telemetry.tracing; + +import org.junit.Test; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TraceContextSerializerTest +{ + private static final int VERSION = MessagingService.current_version; + + @Test + public void testRoundTripPreservesSpanContextAndMarksParentRemote() throws Exception + { + TraceState traceState = TraceState.builder() + .put("tenant", "alpha") + .put("sample-rate", "0.5") + .build(); + SpanContext localSpanContext = SpanContext.create("0123456789abcdef0123456789abcdef", + "0123456789abcdef", + TraceFlags.getSampled(), + traceState); + Context context = Context.root().with(Span.wrap(localSpanContext)); + long expectedSerializedSize = 25 + + TypeSizes.sizeofUnsignedVInt(traceState.size()) + + TypeSizes.sizeof("tenant") + + TypeSizes.sizeof("alpha") + + TypeSizes.sizeof("sample-rate") + + TypeSizes.sizeof("0.5"); + + assertFalse(localSpanContext.isRemote()); + assertEquals(expectedSerializedSize, TraceContextSerializer.serializer.serializedSize(context, VERSION)); + + try (DataOutputBuffer out = new DataOutputBuffer()) + { + TraceContextSerializer.serializer.serialize(context, out, VERSION); + assertEquals(expectedSerializedSize, out.getLength()); + + try (DataInputBuffer in = new DataInputBuffer(out.buffer(), true)) + { + Context deserialized = TraceContextSerializer.serializer.deserialize(in, VERSION); + SpanContext deserializedSpanContext = Span.fromContext(deserialized).getSpanContext(); + + assertEquals(localSpanContext.getTraceId(), deserializedSpanContext.getTraceId()); + assertEquals(localSpanContext.getSpanId(), deserializedSpanContext.getSpanId()); + assertEquals(localSpanContext.getTraceFlags(), deserializedSpanContext.getTraceFlags()); + assertEquals(localSpanContext.getTraceState().asMap(), deserializedSpanContext.getTraceState().asMap()); + assertTrue(deserializedSpanContext.isRemote()); + assertEquals(0, in.available()); + } + } + } +}