diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31b5e32c2ddb..6df46b70040c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5570,6 +5570,7 @@ public static enum ConfVars { + ",fs.s3a.access.key" + ",fs.s3a.secret.key" + ",fs.s3a.proxy.password" + + ",iceberg.mr.vended.storage.credentials" + ",dfs.adls.oauth2.credential" + ",fs.adl.oauth2.credential" + ",fs.azure.account.oauth2.client.secret" diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java index 424f8e10c350..a4824597cc6b 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java @@ -39,6 +39,16 @@ public class IcebergCatalogProperties { public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg"; public static final String NO_CATALOG_TYPE = "no catalog"; + /** + * Iceberg REST catalog property for the {@code X-Iceberg-Access-Delegation} HTTP header (via + * {@code header.*} keys recognized by {@link org.apache.iceberg.rest.RESTUtil#configHeaders}). Value is a + * comma-separated list of {@link RestAccessDelegationMode} tokens, for example + * {@link RestAccessDelegationMode#VENDED_CREDENTIALS}. + * + *

Hive configuration: {@code iceberg.catalog..header.X-Iceberg-Access-Delegation} + */ + public static final String REST_ACCESS_DELEGATION_HEADER_PROPERTY = "header.X-Iceberg-Access-Delegation"; + private IcebergCatalogProperties() { } @@ -104,6 +114,19 @@ public static String catalogPropertyConfigKey(String catalogName, String catalog return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty); } + /** + * Returns true when the catalog is configured to request REST vended storage credentials via + * {@link #REST_ACCESS_DELEGATION_HEADER_PROPERTY}. + */ + public static boolean requestsVendedCredentials(Configuration conf, String catalogName) { + if (conf == null || StringUtils.isEmpty(catalogName)) { + return false; + } + String headerValue = + conf.get(catalogPropertyConfigKey(catalogName, REST_ACCESS_DELEGATION_HEADER_PROPERTY)); + return RestAccessDelegationMode.headerRequestsVendedCredentials(headerValue); + } + /** * Return the catalog type based on the catalog name. *

diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/RestAccessDelegationMode.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/RestAccessDelegationMode.java new file mode 100644 index 000000000000..9b6ffc63ed88 --- /dev/null +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/RestAccessDelegationMode.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + +/** + * Values for the Iceberg REST catalog {@code X-Iceberg-Access-Delegation} request header. The header + * accepts a comma-separated list of these modes; configure via + * {@link IcebergCatalogProperties#REST_ACCESS_DELEGATION_HEADER_PROPERTY}. + * + * @see REST catalog spec + */ +public enum RestAccessDelegationMode { + VENDED_CREDENTIALS("vended-credentials"), + REMOTE_SIGNING("remote-signing"); + + private final String modeName; + + RestAccessDelegationMode(String modeName) { + this.modeName = modeName; + } + + /** Spec-defined header token for this delegation mode. */ + public String modeName() { + return modeName; + } + + /** Comma-separated list suitable for {@link IcebergCatalogProperties#REST_ACCESS_DELEGATION_HEADER_PROPERTY}. */ + public static String toHeaderValue(RestAccessDelegationMode... modes) { + return Arrays.stream(modes).map(RestAccessDelegationMode::modeName).collect(Collectors.joining(",")); + } + + /** Parses a single mode name (case-insensitive); throws if unknown. */ + public static RestAccessDelegationMode fromModeName(String modeName) { + for (RestAccessDelegationMode mode : values()) { + if (mode.modeName.equalsIgnoreCase(modeName.trim())) { + return mode; + } + } + throw new IllegalArgumentException( + String.format( + "Unknown REST access delegation mode: %s. Valid values are: %s", + modeName, + Arrays.stream(values()).map(RestAccessDelegationMode::modeName).collect(Collectors.joining(", ")))); + } + + /** Returns true if the {@code X-Iceberg-Access-Delegation} header value includes vended credentials. */ + public static boolean headerRequestsVendedCredentials(String headerValue) { + if (StringUtils.isBlank(headerValue)) { + return false; + } + for (String token : headerValue.split(",")) { + if (VENDED_CREDENTIALS.modeName.equalsIgnoreCase(token.trim())) { + return true; + } + } + return false; + } +} diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestRestAccessDelegationMode.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestRestAccessDelegationMode.java new file mode 100644 index 000000000000..7c51ea66d033 --- /dev/null +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestRestAccessDelegationMode.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestRestAccessDelegationMode { + + @Test + public void vendedCredentialsModeName() { + assertThat(RestAccessDelegationMode.VENDED_CREDENTIALS.modeName()).isEqualTo("vended-credentials"); + } + + @Test + public void toHeaderValueJoinsModes() { + assertThat( + RestAccessDelegationMode.toHeaderValue( + RestAccessDelegationMode.VENDED_CREDENTIALS, RestAccessDelegationMode.REMOTE_SIGNING)) + .isEqualTo("vended-credentials,remote-signing"); + } + + @Test + public void fromModeNameParsesCaseInsensitive() { + assertThat(RestAccessDelegationMode.fromModeName("VENDED-CREDENTIALS")) + .isEqualTo(RestAccessDelegationMode.VENDED_CREDENTIALS); + } + + @Test + public void fromModeNameRejectsUnknown() { + assertThatThrownBy(() -> RestAccessDelegationMode.fromModeName("unknown")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("unknown"); + } + + @Test + public void headerRequestsVendedCredentials() { + assertThat(RestAccessDelegationMode.headerRequestsVendedCredentials(null)).isFalse(); + assertThat(RestAccessDelegationMode.headerRequestsVendedCredentials("remote-signing")).isFalse(); + assertThat(RestAccessDelegationMode.headerRequestsVendedCredentials("vended-credentials")).isTrue(); + assertThat(RestAccessDelegationMode.headerRequestsVendedCredentials("VENDED-CREDENTIALS,remote-signing")) + .isTrue(); + } + + @Test + public void requestsVendedCredentialsFromConfiguration() { + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + assertThat(IcebergCatalogProperties.requestsVendedCredentials(conf, "ice01")).isFalse(); + + conf.set( + "iceberg.catalog.ice01.header.X-Iceberg-Access-Delegation", + RestAccessDelegationMode.VENDED_CREDENTIALS.modeName()); + assertThat(IcebergCatalogProperties.requestsVendedCredentials(conf, "ice01")).isTrue(); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 34ba9f2404ca..5ce7ba4c4700 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -80,6 +80,12 @@ private InputFormatConfig() { public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog."; + /** + * Base64-serialized list of {@link org.apache.iceberg.io.StorageCredential} for Tez/LLAP executors. + * Stored in {@code TableDesc#jobSecrets} (HIVE-20651), not in job properties. + */ + public static final String VENDED_STORAGE_CREDENTIALS = "iceberg.mr.vended.storage.credentials"; + public static final String SORT_ORDER = "sort.order"; public static final String SORT_COLUMNS = "sort.columns"; public static final String ZORDER = "ZORDER"; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 9fabe10488b1..1d743cba2dfb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -312,6 +312,7 @@ private static Multimap collectOutputs(List // fall back to getting the serialized table from the config .orElseGet(() -> HiveTableUtil.deserializeTable(jobContext.getJobConf(), output)); if (table != null) { + IcebergVendedCredentialUtil.applyFromJobConf(jobContext.getJobConf(), table); String catalogName = catalogName(jobContext.getJobConf(), output); outputs.put(new OutputTable(catalogName, output, table), jobContext); } else { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 65852f1a8553..2b953f17d2cb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -284,6 +284,21 @@ public HiveAuthorizationProvider getAuthorizationProvider() { return null; } + @Override + public void configureInputJobCredentials(TableDesc tableDesc, Map secrets) { + if (!IcebergVendedCredentialUtil.requestsVendedCredentials(conf, tableDesc.getProperties())) { + return; + } + try { + Table table = + IcebergVendedCredentialUtil.tableWithVendedCredentials(conf, tableDesc.getProperties()); + String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); + IcebergVendedCredentialUtil.propagateToJob(conf, table, catalogName, null, secrets); + } catch (NoSuchTableException ex) { + // Table may not exist yet for CTAS; credentials will not be available. + } + } + @Override public void configureInputJobProperties(TableDesc tableDesc, Map map) { overlayTableProperties(conf, tableDesc, map); @@ -335,32 +350,10 @@ public void commitJob(JobContext originalContext) { @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { setCommonJobConf(jobConf); - if (tableDesc != null && tableDesc.getProperties() != null && - tableDesc.getProperties().get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName()) != null) { - String tableName = tableDesc.getTableName(); - String opKey = InputFormatConfig.OPERATION_TYPE_PREFIX + tableName; - // set operation type into job conf too - jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey)); - Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR), - "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'"); - if (HiveCustomStorageHandlerUtils.getWriteOperation(tableDesc.getProperties()::getProperty, tableName) != null) { - HiveCustomStorageHandlerUtils.setWriteOperation(jobConf, tableName, - Operation.valueOf(tableDesc.getProperties().getProperty( - HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName))); - } - boolean isMergeTaskEnabled = Boolean.parseBoolean(tableDesc.getProperty( - HiveCustomStorageHandlerUtils.MERGE_TASK_ENABLED + tableName)); - if (isMergeTaskEnabled) { - HiveCustomStorageHandlerUtils.setMergeTaskEnabled(jobConf, tableName, true); - } - String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); - tables = (tables == null) ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); - - String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); - if (catalogName != null) { - jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName); - } + configureOutputTableJobConf(tableDesc, jobConf); + if (IcebergVendedCredentialUtil.requestsVendedCredentials(conf, tableDesc.getProperties())) { + IcebergVendedCredentialUtil.refreshVendedCredentialsIfMissing(conf, tableDesc, jobConf); + IcebergVendedCredentialUtil.applyJobSecretsToJobConf(tableDesc, jobConf); } try { if (!jobConf.getBoolean(ConfVars.HIVE_IN_TEST_IDE.varname, false)) { @@ -373,6 +366,37 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { } } + private static void configureOutputTableJobConf(TableDesc tableDesc, JobConf jobConf) { + if (tableDesc == null || tableDesc.getProperties() == null || + tableDesc.getProperties().get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName()) == null) { + return; + } + String tableName = tableDesc.getTableName(); + String opKey = InputFormatConfig.OPERATION_TYPE_PREFIX + tableName; + // set operation type into job conf too + jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey)); + Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR), + "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'"); + if (HiveCustomStorageHandlerUtils.getWriteOperation(tableDesc.getProperties()::getProperty, tableName) != null) { + HiveCustomStorageHandlerUtils.setWriteOperation(jobConf, tableName, + Operation.valueOf(tableDesc.getProperties().getProperty( + HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName))); + } + boolean isMergeTaskEnabled = Boolean.parseBoolean(tableDesc.getProperty( + HiveCustomStorageHandlerUtils.MERGE_TASK_ENABLED + tableName)); + if (isMergeTaskEnabled) { + HiveCustomStorageHandlerUtils.setMergeTaskEnabled(jobConf, tableName, true); + } + String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES); + tables = (tables == null) ? tableName : tables + TABLE_NAME_SEPARATOR + tableName; + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables); + + String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME); + if (catalogName != null) { + jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName); + } + } + @Override public boolean directInsert() { return true; @@ -1667,7 +1691,11 @@ static void overlayTableProperties(Configuration configuration, TableDesc tableD PartitionSpec spec; String bytes; try { - Table table = IcebergTableUtil.getTable(configuration, props); + boolean vendedCredentials = + IcebergVendedCredentialUtil.requestsVendedCredentials(configuration, props); + Table table = vendedCredentials ? + IcebergVendedCredentialUtil.tableWithVendedCredentials(configuration, props) : + IcebergTableUtil.getTable(configuration, props); location = table.location(); // set table format-version and write-mode information from tableDesc bytes = HiveTableUtil.serializeTable(table, configuration, props, @@ -1677,6 +1705,11 @@ static void overlayTableProperties(Configuration configuration, TableDesc tableD schema = table.schema(); spec = table.spec(); + String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); + if (vendedCredentials) { + IcebergVendedCredentialUtil.propagateToJob(configuration, table, catalogName, map, null); + } + // For intra-txn read-after-write: if the table has in-memory metadata with no metadata file // (i.e. uncommitted changes from a prior statement in the same txn), write the metadata to // a file so the Tez side can reconstruct the table with the updated state. diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java index 69580e358727..9096ce88f716 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java @@ -244,6 +244,7 @@ public static Table deserializeTable(Configuration config, String name) { table = readTableObjectFromFile(location, config); } checkAndSetIoConfig(config, table); + IcebergVendedCredentialUtil.applyFromJobConf(config, table); // For intra-txn read-after-write: if a metadata file was written for uncommitted in-txn state, // reconstruct a BaseTable from it so the Tez side sees changes from prior statements. diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergVendedCredentialUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergVendedCredentialUtil.java new file mode 100644 index 000000000000..56016bab4f19 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergVendedCredentialUtil.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.net.URI; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hive.IcebergCatalogProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.io.SupportsStorageCredentials; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.SerializationUtil; + +/** + * Propagates vended storage credentials from an Iceberg {@link Table}'s {@link FileIO} to Hive job + * configuration so Tez/LLAP executors can access object storage without static catalog keys. + */ +public final class IcebergVendedCredentialUtil { + + // Iceberg S3FileIO config keys (string literals avoid iceberg-aws on compile classpath). + public static final String ACCESS_KEY_ID = "s3.access-key-id"; + public static final String SECRET_ACCESS_KEY = "s3.secret-access-key"; + public static final String SESSION_TOKEN = "s3.session-token"; + public static final String ENDPOINT = "s3.endpoint"; + public static final String PATH_STYLE_ACCESS = "s3.path-style-access"; + + private static final ImmutableSet SECRET_ICEBERG_KEYS = ImmutableSet.of( + ACCESS_KEY_ID, SECRET_ACCESS_KEY, SESSION_TOKEN); + + private IcebergVendedCredentialUtil() { + } + + /** + * Copies vended credentials from the table FileIO into Hive job configuration. + * + *

Follows the HIVE-20651 split used by {@code JdbcStorageHandler}: sensitive values (including + * the serialized {@link StorageCredential} list) go to {@code jobSecrets}; non-secret config such + * as endpoint and path-style access go to {@code jobProperties}. + * + * @param conf session conf used to preserve host-side endpoint overrides + * @param table loaded Iceberg table + * @param catalogName Hive catalog name ({@link InputFormatConfig#CATALOG_NAME}) + * @param jobProperties Tez/MR non-secret job properties; may be {@code null} + * @param jobSecrets sensitive keys and serialized credentials; may be {@code null} + */ + public static void propagateToJob(Configuration conf, Table table, String catalogName, + Map jobProperties, Map jobSecrets) { + + List credentials = + withConfigurationOverrides(conf, catalogName, extractCredentials(table)); + + if (credentials.isEmpty()) { + return; + } + + if (jobSecrets != null) { + jobSecrets.put( + InputFormatConfig.VENDED_STORAGE_CREDENTIALS, + SerializationUtil.serializeToBase64(Lists.newArrayList(credentials))); + } + + for (StorageCredential credential : credentials) { + addCredentialEntries(conf, catalogName, credential, jobProperties, jobSecrets); + } + } + + private static void addCredentialEntries(Configuration conf, String catalogName, + StorageCredential credential, Map jobProperties, Map jobSecrets) { + + String bucket = bucketFromPrefix(credential.prefix()); + for (Map.Entry entry : credential.config().entrySet()) { + addCredentialEntry( + conf, catalogName, bucket, entry.getKey(), entry.getValue(), jobProperties, jobSecrets); + } + } + + private static void addCredentialEntry(Configuration conf, String catalogName, String bucket, String icebergKey, + String value, Map jobProperties, Map jobSecrets) { + + if (StringUtils.isBlank(value)) { + return; + } + String resolvedValue = resolveCredentialValue(conf, catalogName, icebergKey, value); + + if (jobProperties != null && !isSecretKey(icebergKey)) { + addNonSecretCredentialEntry(catalogName, bucket, icebergKey, resolvedValue, jobProperties); + } + + if (jobSecrets != null && isSecretKey(icebergKey)) { + addSecretCredentialEntry(bucket, icebergKey, resolvedValue, jobSecrets); + } + } + + private static void addNonSecretCredentialEntry(String catalogName, String bucket, String icebergKey, String value, + Map jobProperties) { + + if (catalogName != null) { + String catalogConfigKey = + IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, icebergKey); + jobProperties.putIfAbsent(catalogConfigKey, value); + } + + if (bucket != null) { + String s3aKey = toS3aBucketProperty(bucket, icebergKey); + if (s3aKey != null) { + jobProperties.putIfAbsent(s3aKey, value); + } + } + } + + /** Writes Hadoop S3A per-bucket keys only; Iceberg secrets are carried in the serialized blob. */ + private static void addSecretCredentialEntry(String bucket, String icebergKey, String value, + Map jobSecrets) { + if (bucket != null) { + String s3aSecretKey = toS3aBucketProperty(bucket, icebergKey); + if (s3aSecretKey != null) { + jobSecrets.put(s3aSecretKey, value); + } + } + } + + /** + * Applies vended credentials to the table FileIO, merging session/catalog conf overrides (e.g. S3 endpoint). + * Used on executors after deserialization and on HS2 commit when the table is taken from query state. + */ + public static void applyFromJobConf(Configuration conf, Table table) { + if (table == null || conf == null) { + return; + } + + String catalogName = conf.get(InputFormatConfig.CATALOG_NAME); + if (shouldSkipApplyFromJobConf(conf, catalogName)) { + return; + } + + FileIO io = table.io(); + if (!(io instanceof SupportsStorageCredentials credentialIo)) { + return; + } + + List credentials = resolveCredentialsForApply(conf, table, credentialIo); + if (credentials != null && !credentials.isEmpty()) { + credentialIo.setCredentials(withConfigurationOverrides(conf, catalogName, credentials)); + } + } + + private static boolean shouldSkipApplyFromJobConf(Configuration conf, String catalogName) { + return StringUtils.isBlank(conf.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)) && + !IcebergCatalogProperties.requestsVendedCredentials(conf, catalogName); + } + + private static List resolveCredentialsForApply( + Configuration conf, Table table, SupportsStorageCredentials credentialIo) { + List credentials = credentialIo.credentials(); + if (credentials == null || credentials.isEmpty()) { + String serialized = conf.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS); + if (StringUtils.isNotBlank(serialized)) { + @SuppressWarnings("unchecked") + List deserialized = + SerializationUtil.deserializeFromBase64(serialized); + credentials = deserialized; + } + } + + if (credentials == null || credentials.isEmpty()) { + credentials = extractCredentials(table); + } + return credentials; + } + + /** + * Returns true when the table catalog is configured for REST vended storage credentials. + */ + static boolean requestsVendedCredentials(Configuration configuration, Properties properties) { + if (properties == null) { + return false; + } + return IcebergCatalogProperties.requestsVendedCredentials( + configuration, properties.getProperty(InputFormatConfig.CATALOG_NAME)); + } + + /** + * Loads a table and, if needed, bypasses the query-level cache so REST vended credentials are present on the FileIO. + * When vended credentials are not requested for the catalog, returns the cached table without an extra load. + */ + static Table tableWithVendedCredentials(Configuration configuration, Properties properties) { + Table table = IcebergTableUtil.getTable(configuration, properties); + if (requestsVendedCredentials(configuration, properties) && extractCredentials(table).isEmpty()) { + table = IcebergTableUtil.getTable(configuration, properties, true); + } + return table; + } + + /** + * Reloads vended credentials at job launch when compile-time propagation missed them. + * Non-secret config is written to {@code jobConf}; secrets are merged into {@code TableDesc#getJobSecrets()} + * for {@link org.apache.hadoop.hive.ql.plan.PlanUtils#configureJobConf} (HIVE-20651). + */ + static void refreshVendedCredentialsIfMissing(Configuration configuration, TableDesc tableDesc, JobConf jobConf) { + if (tableDesc == null || tableDesc.getProperties() == null || + hasSerializedCredentials(tableDesc.getJobSecrets()) || + StringUtils.isNotBlank(jobConf.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS))) { + return; + } + + Properties props = tableDesc.getProperties(); + if (!requestsVendedCredentials(configuration, props)) { + return; + } + + String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); + if (catalogName == null) { + return; + } + + try { + Table table = tableWithVendedCredentials(configuration, props); + Map jobProps = new LinkedHashMap<>(); + Map secrets = new LinkedHashMap<>(); + propagateToJob(configuration, table, catalogName, jobProps, secrets); + jobProps.forEach(jobConf::set); + mergeJobSecrets(tableDesc, secrets); + } catch (NoSuchTableException ex) { + // Table may not exist yet for CTAS; credentials will not be available. + } + } + + /** Copies {@code jobSecrets} onto {@code jobConf} so the Tez driver can use S3A before tasks start. */ + static void applyJobSecretsToJobConf(TableDesc tableDesc, JobConf jobConf) { + Map secrets = tableDesc.getJobSecrets(); + if (secrets != null) { + secrets.forEach(jobConf::set); + } + } + + private static boolean hasSerializedCredentials(Map jobSecrets) { + return jobSecrets != null && + StringUtils.isNotBlank(jobSecrets.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)); + } + + private static void mergeJobSecrets(TableDesc tableDesc, Map secrets) { + if (secrets.isEmpty()) { + return; + } + Map existing = tableDesc.getJobSecrets(); + if (existing == null) { + tableDesc.setJobSecrets(new LinkedHashMap<>(secrets)); + } else { + secrets.forEach(existing::putIfAbsent); + } + } + + private static boolean isSecretKey(String icebergKey) { + return SECRET_ICEBERG_KEYS.contains(icebergKey); + } + + static List extractCredentials(Table table) { + if (table == null) { + return List.of(); + } + FileIO io = table.io(); + if (io instanceof SupportsStorageCredentials credentialIo) { + List credentials = credentialIo.credentials(); + if (credentials != null && !credentials.isEmpty()) { + return credentials; + } + } + return credentialsFromFileIoProperties(table, io); + } + + private static List credentialsFromFileIoProperties(Table table, FileIO io) { + Map props = io.properties(); + if (props == null || StringUtils.isBlank(props.get(ACCESS_KEY_ID)) || + StringUtils.isBlank(props.get(SECRET_ACCESS_KEY))) { + return List.of(); + } + Map config = new LinkedHashMap<>(); + putIfPresent(config, props, ACCESS_KEY_ID); + putIfPresent(config, props, SECRET_ACCESS_KEY); + putIfPresent(config, props, SESSION_TOKEN); + putIfPresent(config, props, ENDPOINT); + putIfPresent(config, props, PATH_STYLE_ACCESS); + putIfPresent(config, props, "client.region"); + return List.of(StorageCredential.create(credentialPrefix(table), config)); + } + + private static void putIfPresent(Map target, Map source, String key) { + if (source.containsKey(key) && StringUtils.isNotBlank(source.get(key))) { + target.put(key, source.get(key)); + } + } + + private static String credentialPrefix(Table table) { + String location = table.location(); + if (StringUtils.isBlank(location)) { + return ""; + } + String bucket = bucketFromPrefix(location); + if (bucket != null) { + return "s3://" + bucket + "/"; + } + return location.endsWith("/") ? location : location + "/"; + } + + private static List withConfigurationOverrides( + Configuration conf, String catalogName, List credentials) { + + if (credentials.isEmpty() || conf == null || catalogName == null) { + return credentials; + } + + List updated = Lists.newArrayListWithCapacity(credentials.size()); + for (StorageCredential credential : credentials) { + Map config = new LinkedHashMap<>(credential.config()); + applyCatalogConfigOverrides(conf, catalogName, config); + updated.add(StorageCredential.create(credential.prefix(), config)); + } + + return updated; + } + + private static void applyCatalogConfigOverrides( + Configuration conf, String catalogName, Map config) { + for (String icebergKey : List.of(ENDPOINT, PATH_STYLE_ACCESS)) { + String override = + conf.get(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, icebergKey)); + if (StringUtils.isNotBlank(override)) { + config.put(icebergKey, override); + } + } + } + + private static String resolveCredentialValue( + Configuration conf, String catalogName, String icebergKey, String vendedValue) { + if (conf == null || catalogName == null) { + return vendedValue; + } + String override = + conf.get(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, icebergKey)); + return StringUtils.isNotBlank(override) ? override : vendedValue; + } + + private static String bucketFromPrefix(String prefix) { + if (StringUtils.isBlank(prefix)) { + return null; + } + try { + URI uri = new URI(prefix.endsWith("/") ? prefix : prefix + "/"); + if ("s3".equalsIgnoreCase(uri.getScheme()) || "s3a".equalsIgnoreCase(uri.getScheme())) { + return uri.getHost(); + } + } catch (Exception ignored) { + // fall through + } + if (prefix.startsWith("s3://") || prefix.startsWith("s3a://")) { + String withoutScheme = prefix.substring(prefix.indexOf("://") + 3); + int slash = withoutScheme.indexOf('/'); + return slash >= 0 ? withoutScheme.substring(0, slash) : withoutScheme; + } + return null; + } + + private static String toS3aBucketProperty(String bucket, String icebergKey) { + String bucketPrefix = "fs.s3a.bucket." + bucket + "."; + return switch (icebergKey) { + case ACCESS_KEY_ID -> bucketPrefix + "access.key"; + case SECRET_ACCESS_KEY -> bucketPrefix + "secret.key"; + case SESSION_TOKEN -> bucketPrefix + "session.token"; + case ENDPOINT -> bucketPrefix + "endpoint"; + case PATH_STYLE_ACCESS -> bucketPrefix + "path.style.access"; + case "client.region" -> bucketPrefix + "endpoint.region"; + default -> null; + }; + } +} diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergVendedCredentialUtil.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergVendedCredentialUtil.java new file mode 100644 index 000000000000..97ae261ada0d --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergVendedCredentialUtil.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.io.SupportsStorageCredentials; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializationUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestIcebergVendedCredentialUtil { + + @Test + public void requestsVendedCredentialsRequiresDelegationHeader() { + Configuration conf = new Configuration(); + Properties props = new java.util.Properties(); + props.setProperty(InputFormatConfig.CATALOG_NAME, "ice01"); + + assertThat(IcebergVendedCredentialUtil.requestsVendedCredentials(conf, props)).isFalse(); + + conf.set( + "iceberg.catalog.ice01.header.X-Iceberg-Access-Delegation", + "vended-credentials"); + assertThat(IcebergVendedCredentialUtil.requestsVendedCredentials(conf, props)).isTrue(); + } + + @Test + public void propagateToJobMapsIcebergAndS3aProperties() { + Configuration conf = new Configuration(); + conf.set("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ENDPOINT, "http://host:9000"); + + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.setCredentials( + List.of( + StorageCredential.create( + "s3://my-bucket/", + Map.of( + IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret", + IcebergVendedCredentialUtil.ENDPOINT, "http://minio:9000", + IcebergVendedCredentialUtil.PATH_STYLE_ACCESS, "true")))); + + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + Map jobProps = Maps.newHashMap(); + Map jobSecrets = Maps.newHashMap(); + + IcebergVendedCredentialUtil.propagateToJob(conf, table, "ice01", jobProps, jobSecrets); + + assertThat(jobProps.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ENDPOINT)) + .isEqualTo("http://host:9000"); + assertThat(jobProps.get("fs.s3a.bucket.my-bucket.endpoint")).isEqualTo("http://host:9000"); + assertThat(jobProps.get("fs.s3a.bucket.my-bucket.path.style.access")).isEqualTo("true"); + assertThat(jobProps.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)).isNull(); + assertThat(jobProps.get("fs.s3a.bucket.my-bucket.access.key")).isNull(); + assertThat(jobProps.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ACCESS_KEY_ID)).isNull(); + assertThat(jobSecrets.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)).isNotBlank(); + assertThat(jobSecrets.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ACCESS_KEY_ID)).isNull(); + assertThat(jobSecrets.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY)).isNull(); + assertThat(jobSecrets.get("fs.s3a.bucket.my-bucket.access.key")).isEqualTo("access"); + assertThat(jobSecrets.get("fs.s3a.bucket.my-bucket.secret.key")).isEqualTo("secret"); + + List serialized = + SerializationUtil.deserializeFromBase64( + jobSecrets.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)); + assertThat(serialized.getFirst().config().get(IcebergVendedCredentialUtil.ENDPOINT)) + .isEqualTo("http://host:9000"); + } + + @Test + public void propagateNonSecretsOnlyWhenJobSecretsNull() { + Configuration conf = new Configuration(); + conf.set("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ENDPOINT, "http://host:9000"); + + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.setCredentials( + List.of( + StorageCredential.create( + "s3://my-bucket/", + Map.of( + IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret", + IcebergVendedCredentialUtil.ENDPOINT, "http://minio:9000", + IcebergVendedCredentialUtil.PATH_STYLE_ACCESS, "true")))); + + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + Map jobProps = Maps.newHashMap(); + + IcebergVendedCredentialUtil.propagateToJob(conf, table, "ice01", jobProps, null); + + assertThat(jobProps.get("fs.s3a.bucket.my-bucket.endpoint")).isEqualTo("http://host:9000"); + assertThat(jobProps.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)).isNull(); + assertThat(jobProps.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ACCESS_KEY_ID)).isNull(); + } + + @Test + public void extractCredentialsFromFileIoPropertiesWhenCredentialListEmpty() { + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.initialize( + Map.of( + IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret", + IcebergVendedCredentialUtil.ENDPOINT, "http://minio:9000")); + Schema schema = new Schema(Types.NestedField.required(1, "x", Types.IntegerType.get())); + TableMetadata metadata = + TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), "s3://my-bucket/warehouse/t", Map.of()); + Table table = new BaseTable(new StaticTableOperations(metadata, fileIO), "db.t"); + + assertThat(IcebergVendedCredentialUtil.extractCredentials(table)).hasSize(1); + assertThat( + IcebergVendedCredentialUtil.extractCredentials(table) + .getFirst() + .config() + .get(IcebergVendedCredentialUtil.ACCESS_KEY_ID)) + .isEqualTo("access"); + } + + @Test + public void propagateSecretsOnlyToJobSecrets() { + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.setCredentials( + List.of( + StorageCredential.create( + "s3://my-bucket/", + Map.of(IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret")))); + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + Map jobSecrets = Maps.newHashMap(); + + IcebergVendedCredentialUtil.propagateToJob(new Configuration(), table, "ice01", null, jobSecrets); + + assertThat(jobSecrets.get(InputFormatConfig.VENDED_STORAGE_CREDENTIALS)).isNotBlank(); + assertThat(jobSecrets.get("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ACCESS_KEY_ID)).isNull(); + assertThat(jobSecrets.get("fs.s3a.bucket.my-bucket.access.key")).isEqualTo("access"); + assertThat(jobSecrets.get("fs.s3a.bucket.my-bucket.secret.key")).isEqualTo("secret"); + } + + @Test + public void applyFromJobConfSkipsWhenVendedCredentialsNotRequested() { + CredentialFileIO fileIO = new CredentialFileIO(); + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + + Configuration conf = new Configuration(); + conf.set("iceberg.catalog", "ice01"); + + IcebergVendedCredentialUtil.applyFromJobConf(conf, table); + + assertThat(fileIO.credentials()).isEmpty(); + } + + @Test + public void applyFromJobConfOverridesEndpointOnExistingCredentials() { + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.setCredentials( + List.of( + StorageCredential.create( + "s3://my-bucket/", + Map.of( + IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret", + IcebergVendedCredentialUtil.ENDPOINT, "http://minio:9000")))); + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + + Configuration conf = new Configuration(); + conf.set("iceberg.catalog", "ice01"); + conf.set( + "iceberg.catalog.ice01.header.X-Iceberg-Access-Delegation", + "vended-credentials"); + conf.set("iceberg.catalog.ice01." + IcebergVendedCredentialUtil.ENDPOINT, "http://host:9000"); + + IcebergVendedCredentialUtil.applyFromJobConf(conf, table); + + assertThat( + ((SupportsStorageCredentials) table.io()) + .credentials() + .getFirst() + .config() + .get(IcebergVendedCredentialUtil.ENDPOINT)) + .isEqualTo("http://host:9000"); + } + + @Test + public void applyFromJobConfRestoresCredentialsOnExecutor() { + CredentialFileIO fileIO = new CredentialFileIO(); + fileIO.setCredentials( + List.of( + StorageCredential.create( + "s3://my-bucket/", + Map.of(IcebergVendedCredentialUtil.ACCESS_KEY_ID, "access", + IcebergVendedCredentialUtil.SECRET_ACCESS_KEY, "secret")))); + Table table = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", fileIO), "db.t"); + + Map jobProps = Maps.newHashMap(); + Map jobSecrets = Maps.newHashMap(); + IcebergVendedCredentialUtil.propagateToJob( + new Configuration(), table, "ice01", jobProps, jobSecrets); + + Configuration taskConf = new Configuration(); + jobProps.forEach(taskConf::set); + jobSecrets.forEach(taskConf::set); + + CredentialFileIO executorIo = new CredentialFileIO(); + Table executorTable = + new BaseTable(new StaticTableOperations("s3://my-bucket/t", executorIo), "db.t"); + IcebergVendedCredentialUtil.applyFromJobConf(taskConf, executorTable); + + assertThat(((SupportsStorageCredentials) executorTable.io()).credentials()).hasSize(1); + assertThat( + ((SupportsStorageCredentials) executorTable.io()) + .credentials() + .getFirst() + .config() + .get(IcebergVendedCredentialUtil.ACCESS_KEY_ID)) + .isEqualTo("access"); + } + + private static final class CredentialFileIO implements FileIO, SupportsStorageCredentials { + private List credentials = List.of(); + private Map properties = Map.of(); + + @Override + public void initialize(Map props) { + this.properties = props; + } + + @Override + public org.apache.iceberg.io.InputFile newInputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.iceberg.io.OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + } + + @Override + public Map properties() { + return properties; + } + + @Override + public void close() { + } + + @Override + public List credentials() { + return credentials; + } + + @Override + public void setCredentials(List creds) { + this.credentials = creds; + } + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q index 81982ca44d98..209260e4b1fc 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q @@ -3,6 +3,10 @@ --! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ -- Mask random uuid --! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/ +-- Mask Iceberg metadata file name (sequence id + UUID) in show create table +--! qt:replace:/('metadata_location'=')[^']+(')/$1#Masked#$2/ +-- Mask metadata_location path in describe formatted ('|' delimiter: regex contains s3:// so '/' breaks QTestReplaceHandler). +--! qt:replace:|(metadata_location)(\s+)(s3://\S+)|$1$2#Masked#| -- Mask random uuid --! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ -- Mask a random snapshot id @@ -26,9 +30,11 @@ set hive.stats.autogather=false; set metastore.client.impl=org.apache.iceberg.hive.client.HiveRESTCatalogClient; set metastore.catalog.default=ice01; set iceberg.catalog.ice01.type=rest; +-- IcebergCatalogProperties.REST_ACCESS_DELEGATION_HEADER_PROPERTY + RestAccessDelegationMode.VENDED_CREDENTIALS +set iceberg.catalog.ice01.header.X-Iceberg-Access-Delegation=vended-credentials; ---! This config is set in the driver setup (see TestIcebergRESTCatalogLlapLocalCliDriver.java) ---! conf.set('iceberg.catalog.ice01.uri', ); +--! REST URI, OAuth, MinIO + Gravitino S3 warehouse / credential vending, and host S3A are set in +--! TestIcebergRESTCatalogGravitinoLlapLocalCliDriver. create database ice_rest; use ice_rest; @@ -66,11 +72,6 @@ show create table ice_orc2; insert into ice_orc2 partition (company_id=100) VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), ('fn3','ln3', 3, 30); ---! In CI, Testcontainers' .withFileSystemBind() is not able to bind the same host path to the same container path, ---! so as a workaround, the .metadata.json files from container are manually synced in a daemon process, ---! since the sync can take some time, need to wait for it to happen after the insert operation. -! sleep 20; - describe formatted ice_orc2; select * from ice_orc2; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out index 8bba659e8fd1..f394a4d7218a 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out @@ -79,7 +79,7 @@ STORED BY WITH SERDEPROPERTIES ( 'serialization.format'='1') LOCATION -#### A masked pattern was here #### + 's3://iceberg-vend/warehouse/ice_rest/ice_orc2' TBLPROPERTIES ( 'bucketing_version'='2', 'current-schema'='{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"first_name","required":false,"type":"string"},{"id":2,"name":"last_name","required":false,"type":"string"},{"id":3,"name":"dept_id","required":false,"type":"long"},{"id":4,"name":"team_id","required":false,"type":"long"},{"id":5,"name":"company_id","required":false,"type":"long"}]}', @@ -87,7 +87,7 @@ TBLPROPERTIES ( 'format-version'='2', 'iceberg.catalog'='ice01', 'iceberg.orc.files.only'='true', -#### A masked pattern was here #### + 'metadata_location'='#Masked#', 'name'='ice_rest.ice_orc2', 'parquet.compression'='zstd', 'serialization.format'='1', @@ -143,7 +143,7 @@ Table Parameters: format-version 2 iceberg.catalog ice01 iceberg.orc.files.only true -#### A masked pattern was here #### + metadata_location #Masked# name ice_rest.ice_orc2 numFiles 1 numRows 3 diff --git a/itests/qtest-iceberg/pom.xml b/itests/qtest-iceberg/pom.xml index 3dc736007f4e..580dcfe6f6ec 100644 --- a/itests/qtest-iceberg/pom.xml +++ b/itests/qtest-iceberg/pom.xml @@ -37,6 +37,12 @@ metrics-core ${dropwizard.version} + + io.minio + minio + 8.5.11 + test + @@ -140,6 +146,11 @@ junit test + + org.assertj + assertj-core + test + com.sun.jersey jersey-servlet @@ -156,6 +167,11 @@ hadoop-archives test + + org.apache.hadoop + hadoop-aws + test + org.apache.hadoop hadoop-common @@ -271,6 +287,10 @@ hadoop-mapreduce-client-core test + + org.apache.hive + hive-iceberg-handler + org.apache.hive hive-iceberg-handler @@ -364,6 +384,12 @@ hbase-mapreduce test + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + test + org.apache.tez tez-tests @@ -496,6 +522,12 @@ + + software.amazon.awssdk + bundle + ${aws-java-sdk.version} + test + org.testcontainers testcontainers diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java index 89545f83a074..3ed5075c1e06 100644 --- a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java +++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.cli; -import com.github.dockerjava.api.command.CopyArchiveFromContainerCmd; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import io.minio.BucketExistsArgs; +import io.minio.MakeBucketArgs; +import io.minio.MinioClient; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.cli.control.CliAdapter; @@ -29,7 +29,9 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.hive.IcebergCatalogProperties; +import org.apache.iceberg.hive.RestAccessDelegationMode; import org.apache.iceberg.hive.client.HiveRESTCatalogClient; +import org.apache.iceberg.mr.hive.IcebergVendedCredentialUtil; import org.apache.iceberg.rest.extension.OAuth2AuthorizationServer; import org.junit.After; import org.junit.Before; @@ -42,13 +44,13 @@ import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.containers.wait.strategy.WaitAllStrategy; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; -import org.testcontainers.containers.GenericContainer; import java.io.File; import java.io.IOException; @@ -59,28 +61,45 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +/** + * LLAP {@link CliAdapter} qtests for Hive against the Gravitino Iceberg REST server image + * ({@link #GRAVITINO_IMAGE}), with OAuth2 on the catalog HTTP API and an Iceberg warehouse on + * MinIO using Gravitino {@code s3-secret-key} credential vending (see + * {@link #GRAVITINO_S3_CONF_TEMPLATE}). + * + *

Table metadata and data live under {@code s3://} in {@link #S3_BUCKET}. The host temp directory + * {@link #warehouseDir} is used to render the Gravitino server configuration and as the source path for + * {@code .withCopyFileToContainer} (config + H2 driver JAR).

+ */ @RunWith(Parameterized.class) public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver { private static final CliAdapter CLI_ADAPTER = new CliConfigs.TestIcebergRESTCatalogGravitinoLlapLocalCliDriver().getCliAdapter(); - + private static final Logger LOG = LoggerFactory.getLogger(TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.class); - + private static final String CATALOG_NAME = "ice01"; private static final long GRAVITINO_STARTUP_TIMEOUT_MINUTES = 5L; private static final int GRAVITINO_HTTP_PORT = 9001; - private static final String GRAVITINO_CONF_FILE_TEMPLATE = "gravitino-h2-test-template.conf"; + /** + * Classpath resource (under {@code itests/qtest-iceberg/src/test/resources/}) for the Gravitino Iceberg REST + * server: JDBC catalog backend, MinIO {@code s3://} warehouse, {@code s3-secret-key} credential vending, OAuth2, + * and {@code http://minio:9000} for container-to-container access to MinIO. + */ + private static final String GRAVITINO_S3_CONF_TEMPLATE = "gravitino-s3-vended-oauth-template.conf"; private static final String GRAVITINO_ROOT_DIR = "/root/gravitino-iceberg-rest-server"; private static final String GRAVITINO_STARTUP_SCRIPT = GRAVITINO_ROOT_DIR + "/bin/start-iceberg-rest-server.sh"; private static final String GRAVITINO_H2_LIB = GRAVITINO_ROOT_DIR + "/libs/h2-driver.jar"; private static final String GRAVITINO_CONF_FILE = GRAVITINO_ROOT_DIR + "/conf/gravitino-iceberg-rest-server.conf"; - private static final DockerImageName GRAVITINO_IMAGE = - DockerImageName.parse("apache/gravitino-iceberg-rest:1.0.0"); + private static final DockerImageName GRAVITINO_IMAGE = DockerImageName.parse("apache/gravitino-iceberg-rest:1.0.0"); + + private static final String S3_BUCKET = "iceberg-vend"; + private static final String MINIO_ACCESS_KEY = "minioadmin"; + private static final String MINIO_SECRET_KEY = "minioadmin"; + private static final int MINIO_API_PORT = 9000; + private static final DockerImageName MINIO_IMAGE = DockerImageName.parse("minio/minio:RELEASE.2024-09-22T00-33-43Z"); private static final String OAUTH2_SERVER_ICEBERG_CLIENT_ID = "iceberg-client"; private static final String OAUTH2_SERVER_ICEBERG_CLIENT_SECRET = "iceberg-client-secret"; @@ -89,8 +108,8 @@ public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver { private final File qfile; private GenericContainer gravitinoContainer; + private GenericContainer minioContainer; private Path warehouseDir; - private final ScheduledExecutorService fileSyncExecutor = Executors.newSingleThreadScheduledExecutor(); private OAuth2AuthorizationServer oAuth2AuthorizationServer; @Parameters(name = "{0}") @@ -110,14 +129,15 @@ public TestIcebergRESTCatalogGravitinoLlapLocalCliDriver(String name, File qfile } @Before - public void setup() throws IOException { + public void setup() throws Exception { Network dockerNetwork = Network.newNetwork(); - + startOAuth2AuthorizationServer(dockerNetwork); createWarehouseDir(); + startMinio(dockerNetwork); + ensureMinioBucket(); prepareGravitinoConfig(); startGravitinoContainer(dockerNetwork); - fileSyncExecutor.scheduleAtFixedRate(this::syncWarehouseDir, 0, 5, TimeUnit.SECONDS); String host = gravitinoContainer.getHost(); Integer port = gravitinoContainer.getMappedPort(GRAVITINO_HTTP_PORT); @@ -133,67 +153,110 @@ public void setup() throws IOException { conf.set(restCatalogPrefix + "uri", restCatalogUri); conf.set(restCatalogPrefix + "type", CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - // OAUTH2 Configs + // OAUTH2 configs for the Iceberg REST client (catalog HTTP API) conf.set(restCatalogPrefix + "rest.auth.type", "oauth2"); conf.set(restCatalogPrefix + "oauth2-server-uri", oAuth2AuthorizationServer.getTokenEndpoint()); conf.set(restCatalogPrefix + "credential", oAuth2AuthorizationServer.getClientCredential()); + conf.set( + restCatalogPrefix + IcebergCatalogProperties.REST_ACCESS_DELEGATION_HEADER_PROPERTY, + RestAccessDelegationMode.VENDED_CREDENTIALS.modeName()); + + // Host-published MinIO endpoint for S3A / S3FileIO (credentials come from Gravitino vending via + // {@link org.apache.iceberg.mr.hive.IcebergVendedCredentialUtil} on Tez/LLAP tasks). + applyHostS3FilesystemSettings(conf); + applyIcebergS3ClientEndpointOverride(conf, restCatalogPrefix); } @After - public void teardown() throws IOException { + public void teardown() throws Exception { if (gravitinoContainer != null) { gravitinoContainer.stop(); } - + + if (minioContainer != null) { + minioContainer.stop(); + } + if (oAuth2AuthorizationServer != null) { oAuth2AuthorizationServer.stop(); } - fileSyncExecutor.shutdownNow(); - FileUtils.deleteDirectory(warehouseDir.toFile()); + if (warehouseDir != null) { + FileUtils.deleteDirectory(warehouseDir.toFile()); + } + } + + /** + * Iceberg {@code S3FileIO} host endpoint override only (no static keys). Gravitino vends credentials with + * {@code http://minio:9000}; the host must reach the published MinIO port instead. + */ + private void applyIcebergS3ClientEndpointOverride(Configuration conf, String restCatalogPrefix) { + String host = minioContainer.getHost(); + int port = minioContainer.getMappedPort(MINIO_API_PORT); + @SuppressWarnings("HttpUrlsUsage") + String icebergS3Endpoint = String.format("http://%s:%d", host, port); + conf.set(restCatalogPrefix + IcebergVendedCredentialUtil.ENDPOINT, icebergS3Endpoint); + conf.set(restCatalogPrefix + IcebergVendedCredentialUtil.PATH_STYLE_ACCESS, "true"); + conf.set(restCatalogPrefix + "client.region", "us-east-1"); + } + + /** + * Registers S3A for {@code s3://} paths on the published MinIO endpoint. Access keys are supplied per job via + * {@link org.apache.iceberg.mr.hive.IcebergVendedCredentialUtil}, not in session configuration. + */ + private void applyHostS3FilesystemSettings(Configuration conf) { + String minioHost = minioContainer.getHost(); + int minioPort = minioContainer.getMappedPort(MINIO_API_PORT); + @SuppressWarnings("HttpUrlsUsage") + String endpoint = String.format("http://%s:%d", minioHost, minioPort); + conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + conf.set("fs.AbstractFileSystem.s3.impl", "org.apache.hadoop.fs.s3a.S3A"); + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + String bucketPrefix = "fs.s3a.bucket." + S3_BUCKET + "."; + conf.set(bucketPrefix + "endpoint", endpoint); + conf.setBoolean(bucketPrefix + "path.style.access", true); + conf.setBoolean(bucketPrefix + "connection.ssl.enabled", false); } /** - * Starts a Gravitino container with the Iceberg REST server configured for testing. + * Starts a Gravitino container with the Iceberg REST server configured for this test. * *

This method configures the container to: *

* - *

Note: The {@code @SuppressWarnings("resource")} annotation is applied because - * IntelliJ and some compilers flag {@link org.testcontainers.containers.GenericContainer} - * as a resource that should be managed with try-with-resources. In this test setup, - * the container lifecycle is managed explicitly: it is started here and stopped in - * {@code @After} (via {@code gravitinoContainer.stop()}). Using try-with-resources - * would not work in this context, since the container must remain running across - * multiple test methods rather than being confined to a single block scope.

+ *

Note: the {@code @SuppressWarnings("resource")} annotation is applied because IntelliJ and some compilers + * flag {@link GenericContainer} as a resource that should be used with try-with-resources. Here the container + * lifecycle is explicit: it is started in this method and stopped in {@link #teardown()} via + * {@code gravitinoContainer.stop()}.

*/ @SuppressWarnings("resource") private void startGravitinoContainer(Network dockerNetwork) { gravitinoContainer = new GenericContainer<>(GRAVITINO_IMAGE) .withExposedPorts(GRAVITINO_HTTP_PORT) - // Update entrypoint to create the warehouse directory before starting the server + // Bootstrap dir for the server script; warehouse is s3:// on MinIO (see template) .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("bash", "-c", - String.format("mkdir -p %s && exec %s", warehouseDir.toString(), GRAVITINO_STARTUP_SCRIPT))) - // Mount gravitino configuration file + "mkdir -p /tmp/gravitino-bootstrap && exec " + GRAVITINO_STARTUP_SCRIPT)) + // Mount Gravitino configuration file (rendered under warehouseDir on the host) .withCopyFileToContainer( - MountableFile.forHostPath(Paths.get(warehouseDir.toString(), GRAVITINO_CONF_FILE_TEMPLATE)), - GRAVITINO_CONF_FILE - ) + MountableFile.forHostPath(Paths.get(warehouseDir.toString(), GRAVITINO_S3_CONF_TEMPLATE)), + GRAVITINO_CONF_FILE) // Mount the H2 driver JAR into the server's lib directory .withCopyFileToContainer( MountableFile.forHostPath( - Paths.get("target", "test-dependencies", "h2-driver.jar").toAbsolutePath() - ), - GRAVITINO_H2_LIB - ) - // Use the same Docker network as the OAuth2 server so they can communicate + Paths.get("target", "test-dependencies", "h2-driver.jar").toAbsolutePath()), + GRAVITINO_H2_LIB) + // Same Docker network as OAuth2 and MinIO (Gravitino uses http://minio:9000 in config) .withNetwork(dockerNetwork) // Wait for the server to be fully started .waitingFor( @@ -201,123 +264,100 @@ private void startGravitinoContainer(Network dockerNetwork) { .withStrategy(Wait.forLogMessage(".*GravitinoIcebergRESTServer is running.*\\n", 1) .withStartupTimeout(Duration.ofMinutes(GRAVITINO_STARTUP_TIMEOUT_MINUTES))) .withStrategy(Wait.forListeningPort() - .withStartupTimeout(Duration.ofMinutes(GRAVITINO_STARTUP_TIMEOUT_MINUTES))) - ) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory - .getLogger(TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.class))); + .withStartupTimeout(Duration.ofMinutes(GRAVITINO_STARTUP_TIMEOUT_MINUTES)))) + .withLogConsumer(new Slf4jLogConsumer(LOG)); gravitinoContainer.start(); } /** - * Starts a background daemon that continuously synchronizes the Iceberg warehouse - * directory from the running Gravitino container to the host file system. - * - *

In CI environments, Testcontainers' {@code .withFileSystemBind()} cannot reliably - * bind the same host path to the same path inside the container, especially when - * using remote Docker hosts or Docker-in-Docker setups. This causes the container's - * writes (e.g., Iceberg metadata files like {@code .metadata.json}) to be invisible - * on the host.

- * - *

This method works around that limitation by repeatedly copying new files from - * the container's warehouse directory to the corresponding host directory. Existing - * files on the host are preserved, and only files that do not yet exist are copied. - * The sync runs every 1 second while the container is running.

- * - *

Each archive copy from the container is extracted using a {@link TarArchiveInputStream}, - * and directories are created as needed. Files that already exist on the host are skipped - * to avoid overwriting container data.

+ * MinIO for the Iceberg warehouse. {@code .withNetworkAliases("minio")} matches + * {@code gravitino.iceberg-rest.s3-endpoint = http://minio:9000} inside the Gravitino container. */ - private void syncWarehouseDir() { - if (gravitinoContainer.isRunning()) { - try (CopyArchiveFromContainerCmd copyArchiveFromContainerCmd = - gravitinoContainer - .getDockerClient() - .copyArchiveFromContainerCmd(gravitinoContainer.getContainerId(), warehouseDir.toString()); - InputStream tarStream = copyArchiveFromContainerCmd.exec(); - TarArchiveInputStream tis = new TarArchiveInputStream(tarStream)) { - - TarArchiveEntry entry; - while ((entry = tis.getNextEntry()) != null) { - // Skip directories because we only want to copy metadata files from the container. - if (entry.isDirectory()) { - continue; - } - - /* - * Tar entry names include a container-specific top-level folder, e.g.: - * iceberg-test-1759245909247/iceberg_warehouse/ice_rest/.../metadata.json - * - * Strip the first part so the relative path inside the warehouse is preserved - * when mapping to the host warehouseDir. - */ - - String[] parts = entry.getName().split("/", 2); - if (parts.length < 2) { - continue; // defensive guard - } - - Path relativePath = Paths.get(parts[1]); - Path outputPath = warehouseDir.resolve(relativePath); - - // Skip if already present on host to avoid overwriting - if (Files.exists(outputPath)) { - continue; - } - - Files.createDirectories(outputPath.getParent()); - Files.copy(tis, outputPath); - } - - } catch (Exception e) { - LOG.error("Warehouse folder sync failed: {}", e.getMessage()); - } + @SuppressWarnings("resource") + private void startMinio(Network dockerNetwork) { + minioContainer = new GenericContainer<>(MINIO_IMAGE) + .withNetwork(dockerNetwork) + .withNetworkAliases("minio") + .withExposedPorts(MINIO_API_PORT) + .withEnv("MINIO_ROOT_USER", MINIO_ACCESS_KEY) + .withEnv("MINIO_ROOT_PASSWORD", MINIO_SECRET_KEY) + .withCommand("server", "/data") + .waitingFor(Wait.forListeningPort()); + + minioContainer.start(); + } + + /** Creates {@link #S3_BUCKET} if missing so Gravitino and Hive can use {@code s3://} paths. */ + private void ensureMinioBucket() throws Exception { + MinioClient client = MinioClient.builder() + .endpoint(minioContainer.getHost(), minioContainer.getMappedPort(MINIO_API_PORT), false) + .credentials(MINIO_ACCESS_KEY, MINIO_SECRET_KEY) + .build(); + if (!client.bucketExists(BucketExistsArgs.builder().bucket(S3_BUCKET).build())) { + client.makeBucket(MakeBucketArgs.builder().bucket(S3_BUCKET).build()); } } - + + /** Keycloak-backed OAuth2 used by Gravitino REST authentication and by the Hive REST client. */ private void startOAuth2AuthorizationServer(Network dockerNetwork) { oAuth2AuthorizationServer = new OAuth2AuthorizationServer(dockerNetwork, false); oAuth2AuthorizationServer.start(); } + /** + * Host directory used to write the rendered Gravitino config (see {@link #prepareGravitinoConfig}) and as the + * source path for {@code .withCopyFileToContainer} in {@link #startGravitinoContainer}. This is not the Iceberg + * warehouse root; the warehouse is {@code s3://}{@link #S3_BUCKET}{@code /...} on MinIO. + */ private void createWarehouseDir() { try { warehouseDir = Paths.get("/tmp", "iceberg-test-" + System.currentTimeMillis()).toAbsolutePath(); Files.createDirectories(warehouseDir); } catch (Exception e) { - throw new RuntimeException("Failed to create the Iceberg warehouse directory", e); + throw new RuntimeException("Failed to create temp directory for Gravitino config staging", e); } } + /** + * Reads {@link #GRAVITINO_S3_CONF_TEMPLATE} from the classpath, substitutes bucket / MinIO / OAuth placeholders, + * and writes the result under {@link #warehouseDir} for copying into the Gravitino container. + */ private void prepareGravitinoConfig() throws IOException { String content; try (InputStream in = TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.class.getClassLoader() - .getResourceAsStream(GRAVITINO_CONF_FILE_TEMPLATE)) { + .getResourceAsStream(GRAVITINO_S3_CONF_TEMPLATE)) { if (in == null) { - throw new IOException("Resource not found: " + GRAVITINO_CONF_FILE_TEMPLATE); + throw new IOException("Resource not found: " + GRAVITINO_S3_CONF_TEMPLATE); } content = new String(in.readAllBytes(), StandardCharsets.UTF_8); } String updatedContent = content - .replace("/WAREHOUSE_DIR", warehouseDir.toString()) + .replace("S3_BUCKET", S3_BUCKET) + .replace("MINIO_ACCESS_KEY", MINIO_ACCESS_KEY) + .replace("MINIO_SECRET_KEY", MINIO_SECRET_KEY) .replace("OAUTH2_SERVER_URI", oAuth2AuthorizationServer.getIssuer()) .replace("OAUTH2_JWKS_URI", getJwksUri()) .replace("OAUTH2_CLIENT_ID", OAUTH2_SERVER_ICEBERG_CLIENT_ID) .replace("OAUTH2_CLIENT_SECRET", OAUTH2_SERVER_ICEBERG_CLIENT_SECRET) .replace("HTTP_PORT", String.valueOf(GRAVITINO_HTTP_PORT)); - Path configFile = warehouseDir.resolve(GRAVITINO_CONF_FILE_TEMPLATE); + Path configFile = warehouseDir.resolve(GRAVITINO_S3_CONF_TEMPLATE); Files.writeString(configFile, updatedContent); } + /** + * JWKS URL reachable from inside the Gravitino container: host/port in the issuer are rewritten to the + * Keycloak container hostname and its internal HTTP port. + */ private String getJwksUri() { String reachableHost = oAuth2AuthorizationServer.getKeycloackContainerDockerInternalHostName(); int internalPort = 8080; // Keycloak container's internal port return oAuth2AuthorizationServer.getIssuer() .replace("localhost", reachableHost) .replace("127.0.0.1", reachableHost) - // replace issuer's mapped port with keyclock container's internal port + // Replace issuer's mapped host port with Keycloak's internal port on the Docker network .replaceFirst(":[0-9]+", ":" + internalPort); } diff --git a/itests/qtest-iceberg/src/test/resources/gravitino-s3-vended-oauth-template.conf b/itests/qtest-iceberg/src/test/resources/gravitino-s3-vended-oauth-template.conf new file mode 100644 index 000000000000..2edd6ae005ac --- /dev/null +++ b/itests/qtest-iceberg/src/test/resources/gravitino-s3-vended-oauth-template.conf @@ -0,0 +1,38 @@ +gravitino.iceberg-rest.httpPort = HTTP_PORT + +# --- Iceberg REST Catalog Backend (JDBC for catalog metadata) --- +gravitino.iceberg-rest.catalog-backend = jdbc +gravitino.iceberg-rest.uri = jdbc:h2:file:/tmp/gravitino_h2_db;AUTO_SERVER=TRUE +gravitino.iceberg-rest.jdbc-driver = org.h2.Driver +gravitino.iceberg-rest.jdbc-user = sa +gravitino.iceberg-rest.jdbc-password = "" +gravitino.iceberg-rest.jdbc-initialize = true + +# --- Warehouse on MinIO (S3) + credential vending (s3-secret-key) --- +# See https://gravitino.apache.org/docs/latest/security/credential-vending +gravitino.iceberg-rest.warehouse = s3://S3_BUCKET/warehouse +gravitino.iceberg-rest.io-impl = org.apache.iceberg.aws.s3.S3FileIO +gravitino.iceberg-rest.credential-providers = s3-secret-key +gravitino.iceberg-rest.s3-access-key-id = MINIO_ACCESS_KEY +gravitino.iceberg-rest.s3-secret-access-key = MINIO_SECRET_KEY +# Docker network only (Gravitino container -> MinIO). Host-side Hive/Tez/Iceberg use +# iceberg.catalog.ice01.s3.endpoint (no static keys); credentials are vended per table load. +gravitino.iceberg-rest.s3-endpoint = http://minio:9000 +gravitino.iceberg-rest.s3-path-style-access = true +gravitino.iceberg-rest.s3-region = us-east-1 + +# --- OAuth2 Authentication (catalog HTTP API) --- +gravitino.authenticators = oauth + +gravitino.authenticator.oauth.serverUri = OAUTH2_SERVER_URI +gravitino.authenticator.oauth.tokenPath = /protocol/openid-connect/token +gravitino.authenticator.oauth.clientId = OAUTH2_CLIENT_ID +gravitino.authenticator.oauth.scope = openid catalog +gravitino.authenticator.oauth.clientSecret = OAUTH2_CLIENT_SECRET + +gravitino.authenticator.oauth.tokenValidatorClass = org.apache.gravitino.server.authentication.JwksTokenValidator +gravitino.authenticator.oauth.jwksUri = OAUTH2_JWKS_URI/protocol/openid-connect/certs +gravitino.authenticator.oauth.provider = default +gravitino.authenticator.oauth.principalFields = sub +gravitino.authenticator.oauth.allowSkewSecs = 60 +gravitino.authenticator.oauth.serviceAudience = hive-metastore diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index fa45fbbef703..4cac2ba58332 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -891,6 +891,14 @@ private static void configureJobPropertiesForStorageHandler(boolean input, "using configureTableJobProperties",e); storageHandler.configureTableJobProperties(tableDesc, jobProperties); } + + try { + storageHandler.configureInputJobCredentials( + tableDesc, + jobSecrets); + } catch (AbstractMethodError e) { + LOG.info("configureInputJobSecrets not found"); + } } // Job properties are only relevant for non-native tables, so // for native tables, leave it null to avoid cluttering up