Skip to content

Commit

Permalink
Extend DistributedQueryRunner in Vertica builder
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 31, 2024
1 parent 58caa0b commit 8818f05
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 96 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-vertica/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected QueryRunner createQueryRunner()
// Use the latest image to avoid "Must be superuser to run export_statistics"
verticaServer = closeAfterClass(new TestingVerticaServer(LATEST_IMAGE));
return VerticaQueryRunner.builder(verticaServer)
.addConnectorProperties(ImmutableMap.of("statistics.enabled", "true"))
.addConnectorProperty("statistics.enabled", "true")
.setTables(ImmutableList.of(TpchTable.ORDERS, TpchTable.REGION, TpchTable.NATION))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
package io.trino.plugin.vertica;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.Session;
import io.trino.plugin.jmx.JmxPlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.security.Identity;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.airlift.testing.Closeables.closeAllSuppress;
Expand All @@ -41,124 +42,96 @@ private VerticaQueryRunner() {}
public static final String NON_GRANTED_USER = "bob";
public static final String TPCH_SCHEMA = "tpch";

private static DistributedQueryRunner createVerticaQueryRunner(
TestingVerticaServer server,
Map<String, String> extraProperties,
Map<String, String> connectorProperties,
Iterable<TpchTable<?>> tables)
throws Exception
{
DistributedQueryRunner queryRunner = null;
try {
DistributedQueryRunner.Builder<?> builder = DistributedQueryRunner.builder(createSession(GRANTED_USER, "vertica"));
extraProperties.forEach(builder::addExtraProperty);
queryRunner = builder.build();

queryRunner.installPlugin(new JmxPlugin());
queryRunner.createCatalog("jmx", "jmx");

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog(TPCH_SCHEMA, TPCH_SCHEMA);

// Create two users, one of which will have access to the TPCH database/schema
executeAsAdmin(server, "CREATE SCHEMA IF NOT EXISTS tpch");
executeAsAdmin(server, "CREATE ROLE " + GRANTED_USER);
executeAsAdmin(server, "CREATE ROLE " + NON_GRANTED_USER);
executeAsAdmin(server, "GRANT ALL PRIVILEGES ON DATABASE tpch TO " + GRANTED_USER);
executeAsAdmin(server, "GRANT ALL PRIVILEGES ON SCHEMA tpch TO " + GRANTED_USER);

// Allow the user to set the roles
executeAsAdmin(server, "GRANT " + GRANTED_USER + " TO " + server.getUsername());
executeAsAdmin(server, "GRANT " + NON_GRANTED_USER + " TO " + server.getUsername());

queryRunner.installPlugin(new VerticaPlugin());
queryRunner.createCatalog("vertica", "vertica", connectorProperties);

copyTpchTables(queryRunner, TPCH_SCHEMA, TINY_SCHEMA_NAME, createSession(GRANTED_USER, "vertica"), tables);

// Revoke all access to the database for the server's user if impersonation is enabled
// This will allow the impersonation to work as intended for testing as Vertica roles add to the user's existing permissions
// Running queries with the NON_GRANTED_USER user/role will succeed because the user in the JDBC connection has access to the tables
if (Boolean.parseBoolean(connectorProperties.getOrDefault("vertica.impersonation.enabled", "false"))) {
executeAsAdmin(server, "REVOKE ALL ON SCHEMA tpch FROM " + server.getUsername());
executeAsAdmin(server, "REVOKE ALL ON DATABASE tpch FROM " + server.getUsername());
}

return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}

public static Session createSession(String user, String catalogName)
{
return testSessionBuilder()
.setCatalog(catalogName)
.setSchema(TPCH_SCHEMA)
.setIdentity(Identity.ofUser(user))
.build();
}

public static Builder builder(TestingVerticaServer server)
{
return new Builder(server);
return new Builder(server)
.addConnectorProperty("connection-url", requireNonNull(server.getJdbcUrl(), "jdbcUrl is null"))
.addConnectorProperty("connection-user", requireNonNull(server.getUsername(), "user is null"))
.addConnectorProperty("connection-password", requireNonNull(server.getPassword(), "password is null"));
}

public static class Builder
public static final class Builder
extends DistributedQueryRunner.Builder<Builder>
{
private final TestingVerticaServer server;
private Iterable<TpchTable<?>> tables = ImmutableList.of();
private Map<String, String> connectorProperties;
private Map<String, String> extraProperties;
private List<TpchTable<?>> tables = ImmutableList.of();
private final Map<String, String> connectorProperties = new HashMap<>();

public Builder(TestingVerticaServer server)
private Builder(TestingVerticaServer server)
{
super(testSessionBuilder()
.setCatalog("vertica")
.setSchema(TPCH_SCHEMA)
.build());
this.server = requireNonNull(server, "server is null");
connectorProperties = ImmutableMap.<String, String>builder()
.put("connection-url", requireNonNull(server.getJdbcUrl(), "jdbcUrl is null"))
.put("connection-user", requireNonNull(server.getUsername(), "user is null"))
.put("connection-password", requireNonNull(server.getPassword(), "password is null"))
.buildOrThrow();
extraProperties = ImmutableMap.of();
}

public Builder addConnectorProperties(Map<String, String> properties)
@CanIgnoreReturnValue
public Builder addConnectorProperty(String key, String value)
{
connectorProperties = updateProperties(connectorProperties, properties);
return this;
}

public Builder addExtraProperties(Map<String, String> properties)
{
extraProperties = updateProperties(extraProperties, properties);
connectorProperties.put(key, value);
return this;
}

@CanIgnoreReturnValue
public Builder setTables(Iterable<TpchTable<?>> tables)
{
this.tables = ImmutableList.copyOf(requireNonNull(tables, "tables is null"));
return this;
}

public QueryRunner build()
@Override
public DistributedQueryRunner build()
throws Exception
{
return createVerticaQueryRunner(
server,
extraProperties,
connectorProperties,
tables);
DistributedQueryRunner queryRunner = super.build();
try {
queryRunner.installPlugin(new JmxPlugin());
queryRunner.createCatalog("jmx", "jmx");

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog(TPCH_SCHEMA, TPCH_SCHEMA);

// Create two users, one of which will have access to the TPCH database/schema
executeAsAdmin(server, "CREATE SCHEMA IF NOT EXISTS tpch");
executeAsAdmin(server, "CREATE ROLE " + GRANTED_USER);
executeAsAdmin(server, "CREATE ROLE " + NON_GRANTED_USER);
executeAsAdmin(server, "GRANT ALL PRIVILEGES ON DATABASE tpch TO " + GRANTED_USER);
executeAsAdmin(server, "GRANT ALL PRIVILEGES ON SCHEMA tpch TO " + GRANTED_USER);

// Allow the user to set the roles
executeAsAdmin(server, "GRANT " + GRANTED_USER + " TO " + server.getUsername());
executeAsAdmin(server, "GRANT " + NON_GRANTED_USER + " TO " + server.getUsername());

queryRunner.installPlugin(new VerticaPlugin());
queryRunner.createCatalog("vertica", "vertica", connectorProperties);

copyTpchTables(queryRunner, TPCH_SCHEMA, TINY_SCHEMA_NAME, createSession(GRANTED_USER, "vertica"), tables);

// Revoke all access to the database for the server's user if impersonation is enabled
// This will allow the impersonation to work as intended for testing as Vertica roles add to the user's existing permissions
// Running queries with the NON_GRANTED_USER user/role will succeed because the user in the JDBC connection has access to the tables
if (Boolean.parseBoolean(connectorProperties.getOrDefault("vertica.impersonation.enabled", "false"))) {
executeAsAdmin(server, "REVOKE ALL ON SCHEMA tpch FROM " + server.getUsername());
executeAsAdmin(server, "REVOKE ALL ON DATABASE tpch FROM " + server.getUsername());
}

return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}
}

private static Map<String, String> updateProperties(Map<String, String> properties, Map<String, String> update)
public static Session createSession(String user, String catalogName)
{
return ImmutableMap.<String, String>builder()
.putAll(requireNonNull(properties, "properties is null"))
.putAll(requireNonNull(update, "update is null"))
.buildOrThrow();
return testSessionBuilder()
.setCatalog(catalogName)
.setSchema(TPCH_SCHEMA)
.setIdentity(Identity.ofUser(user))
.build();
}

private static void executeAsAdmin(TestingVerticaServer server, String sql)
Expand All @@ -171,8 +144,8 @@ public static void main(String[] args)
{
Logging.initialize();

DistributedQueryRunner queryRunner = (DistributedQueryRunner) builder(new TestingVerticaServer())
.addExtraProperties(ImmutableMap.of("http-server.http.port", "8080"))
DistributedQueryRunner queryRunner = builder(new TestingVerticaServer())
.addCoordinatorProperty("http-server.http.port", "8080")
.setTables(TpchTable.getTables())
.build();

Expand Down

0 comments on commit 8818f05

Please sign in to comment.