Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5410] Improvement(iceberg-reset): Make IT IcebergRestKerberosHiveCatalogIT works in deploy mode. #5423

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions iceberg/iceberg-rest-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ dependencies {
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.hadoop2.common)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
public abstract class IcebergRESTServiceBaseIT {

public static final Logger LOG = LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class);
private SparkSession sparkSession;
protected SparkSession sparkSession;
protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY;
private IcebergRESTServerManager icebergRESTServerManager;

Expand Down Expand Up @@ -121,14 +121,14 @@ private void registerIcebergCatalogConfig() {
LOG.info("Iceberg REST service config registered, {}", StringUtils.join(icebergConfigs));
}

private int getServerPort() {
protected int getServerPort() {
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(
icebergRESTServerManager.getServerConfig(), IcebergConfig.ICEBERG_CONFIG_PREFIX);
return jettyServerConfig.getHttpPort();
}

private void initSparkEnv() {
protected void initSparkEnv() {
int port = getServerPort();
LOG.info("Iceberg REST server port:{}", port);
String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/", port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void purgeNameSpace(String namespace) {
sql("DROP database " + namespace);
}

private void purgeAllIcebergTestNamespaces() {
protected void purgeAllIcebergTestNamespaces() {
List<Object[]> databases =
sql(String.format("SHOW DATABASES like '%s*'", ICEBERG_REST_NS_PREFIX));
Set<String> databasesString = convertToStringSet(databases, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.io.FileUtils;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosClient;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.integration.test.util.ITUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.condition.EnabledIf;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

@Tag("gravitino-docker-test")
@TestInstance(Lifecycle.PER_CLASS)
@EnabledIf("isEmbedded")
public class IcebergRestKerberosHiveCatalogIT extends IcebergRESTHiveCatalogIT {

private static final String HIVE_METASTORE_CLIENT_PRINCIPAL = "cli@HADOOPKRB";
Expand Down Expand Up @@ -149,12 +152,50 @@ Map<String, String> getCatalogConfig() {
return configMap;
}

private static boolean isEmbedded() {
String mode =
System.getProperty(ITUtils.TEST_MODE) == null
? ITUtils.EMBEDDED_TEST_MODE
: System.getProperty(ITUtils.TEST_MODE);
protected void initSparkEnv() {
int port = getServerPort();
LOG.info("Iceberg REST server port:{}", port);
String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/", port);
SparkConf sparkConf =
new SparkConf()
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.rest.type", "rest")
.set("spark.sql.catalog.rest.uri", icebergRESTUri)
.set("spark.locality.wait.node", "0");

if (supportsCredentialVending()) {
sparkConf.set(
"spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation", "vended-credentials");
}

// Login kerberos and use the users to execute the spark job.
try {
Configuration configuration = new Configuration();
configuration.set("hadoop.security.authentication", "kerberos");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please set all configurations in spark config.

Copy link
Contributor Author

@yuqi1129 yuqi1129 Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is only for Kerberos login, it's not for spark.

KerberosClient kerberosClient =
new KerberosClient(
ImmutableMap.of(
"authentication.kerberos.principal",
HIVE_METASTORE_CLIENT_PRINCIPAL,
"authentication.kerberos.keytab-uri",
tempDir + HIVE_METASTORE_CLIENT_KEYTAB),
configuration);

kerberosClient.login(tempDir + HIVE_METASTORE_CLIENT_KEYTAB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid the normal user couldn't do kerberosClient.login in spark-sql or spark-shell, may be you could try add spark.kerberos.principal and spark.kerberos.keytab to spark config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me optimized it.

} catch (Exception e) {
throw new RuntimeException(e);
}

System.setProperty("spark.hadoop.hadoop.security.authentication", "kerberos");
sparkSession = SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate();
}

return Objects.equals(mode, ITUtils.EMBEDDED_TEST_MODE);
@AfterAll
void cleanup() {
purgeAllIcebergTestNamespaces();
UserGroupInformation.reset();
}
}
Loading