Skip to content

Commit

Permalink
Remove Airlift's event client from verifier
Browse files Browse the repository at this point in the history
This is a preparation for the removal of the event module from Airlift
  • Loading branch information
wendigo committed Nov 6, 2024
1 parent 0cc8303 commit 8f66aef
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 118 deletions.
15 changes: 0 additions & 15 deletions service/trino-verifier/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down Expand Up @@ -53,11 +48,6 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>event</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
Expand Down Expand Up @@ -88,11 +78,6 @@
<artifactId>trino-parser</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.verifier;

import com.google.inject.Inject;
import io.airlift.event.client.AbstractEventClient;
import io.airlift.json.JsonCodec;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
Expand All @@ -26,7 +25,7 @@
import static java.util.Objects.requireNonNull;

public class DatabaseEventClient
extends AbstractEventClient
implements EventConsumer
{
private final VerifierQueryEventDao dao;
private final JsonCodec<List<String>> codec;
Expand All @@ -45,9 +44,8 @@ public void postConstruct()
}

@Override
protected <T> void postEvent(T event)
public void postEvent(VerifierQueryEvent queryEvent)
{
VerifierQueryEvent queryEvent = (VerifierQueryEvent) event;
VerifierQueryEventEntity entity = new VerifierQueryEventEntity(
queryEvent.getSuite(),
Optional.ofNullable(queryEvent.getRunId()),
Expand Down Expand Up @@ -79,4 +77,9 @@ private static OptionalDouble toOptionalDouble(@Nullable Double value)
}
return OptionalDouble.of(value);
}

@Override
public void close()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.verifier;

import java.io.Closeable;

public interface EventConsumer
extends Closeable
{
void postEvent(VerifierQueryEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
package io.trino.verifier;

import com.google.inject.Inject;
import io.airlift.event.client.AbstractEventClient;
import io.airlift.stats.QuantileDigest;
import io.airlift.units.Duration;

import java.io.Closeable;
import java.util.Optional;
import java.util.regex.Pattern;

Expand All @@ -29,8 +27,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

public class HumanReadableEventClient
extends AbstractEventClient
implements Closeable
implements EventConsumer
{
private static final double LARGE_SPEEDUP = 0.5;
private static final double SMALL_SPEEDUP = 1.0;
Expand All @@ -56,10 +53,8 @@ public HumanReadableEventClient(VerifierConfig config)
}

@Override
public <T> void postEvent(T event)
public void postEvent(VerifierQueryEvent queryEvent)
{
VerifierQueryEvent queryEvent = (VerifierQueryEvent) event;

Optional<Double> cpuRatio = getCpuRatio(queryEvent);
if (cpuRatio.isPresent()) {
recordCpuRatio(cpuRatio.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,18 @@
*/
package io.trino.verifier;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.inject.Inject;
import io.airlift.event.client.AbstractEventClient;
import io.airlift.event.client.JsonEventSerializer;
import io.airlift.json.JsonCodec;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;

import static io.trino.plugin.base.util.JsonUtils.jsonFactory;
import static java.nio.charset.Charset.defaultCharset;
import static java.util.Objects.requireNonNull;

public class JsonEventClient
extends AbstractEventClient
implements EventConsumer
{
// TODO we should use JsonEventWriter instead
private final JsonEventSerializer serializer = new JsonEventSerializer(VerifierQueryEvent.class);
private final JsonFactory factory = jsonFactory();
private final JsonCodec<VerifierQueryEvent> serializer = JsonCodec.jsonCodec(VerifierQueryEvent.class);
private final PrintStream out;

@Inject
Expand All @@ -47,16 +36,13 @@ public JsonEventClient(VerifierConfig config)
}

@Override
public <T> void postEvent(T event)
public void postEvent(VerifierQueryEvent event)
{
out.println(serializer.toJson(event));
}

@Override
public void close()
{
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
JsonGenerator generator = factory.createGenerator(buffer, JsonEncoding.UTF8);
serializer.serialize(event, generator);
out.println(buffer.toString(defaultCharset()).trim());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.event.client.EventClient;
import org.jdbi.v3.core.Jdbi;

import java.util.Set;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.event.client.EventBinder.eventBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;

public class TrinoVerifierModule
Expand All @@ -36,29 +34,26 @@ public class TrinoVerifierModule
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(VerifierConfig.class);
eventBinder(binder).bindEventClient(VerifierQueryEvent.class);

Multibinder<String> supportedClients = newSetBinder(binder, String.class, SupportedEventClients.class);
supportedClients.addBinding().toInstance("human-readable");
supportedClients.addBinding().toInstance("file");
supportedClients.addBinding().toInstance("database");
Set<String> eventClientTypes = buildConfigObject(VerifierConfig.class).getEventClients();
bindEventClientClasses(eventClientTypes, binder, newSetBinder(binder, EventClient.class));
bindEventClientClasses(eventClientTypes, binder, newSetBinder(binder, EventConsumer.class));
}

private static void bindEventClientClasses(Set<String> eventClientTypes, Binder binder, Multibinder<EventClient> multibinder)
private static void bindEventClientClasses(Set<String> eventClientTypes, Binder binder, Multibinder<EventConsumer> consumers)
{
for (String eventClientType : eventClientTypes) {
if (eventClientType.equals("human-readable")) {
multibinder.addBinding().to(HumanReadableEventClient.class).in(Scopes.SINGLETON);
}
else if (eventClientType.equals("file")) {
multibinder.addBinding().to(JsonEventClient.class).in(Scopes.SINGLETON);
}
else if (eventClientType.equals("database")) {
jsonCodecBinder(binder).bindListJsonCodec(String.class);
binder.bind(VerifierQueryEventDao.class).toProvider(VerifierQueryEventDaoProvider.class);
multibinder.addBinding().to(DatabaseEventClient.class).in(Scopes.SINGLETON);
switch (eventClientType) {
case "human-readable" ->
consumers.addBinding().to(HumanReadableEventClient.class).in(Scopes.SINGLETON);
case "file" -> consumers.addBinding().to(JsonEventClient.class).in(Scopes.SINGLETON);
case "database" -> {
jsonCodecBinder(binder).bindListJsonCodec(String.class);
binder.bind(VerifierQueryEventDao.class).toProvider(VerifierQueryEventDaoProvider.class);
consumers.addBinding().to(DatabaseEventClient.class).in(Scopes.SINGLETON);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import io.airlift.event.client.EventClient;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.spi.ErrorCode;
import io.trino.spi.TrinoException;
import jakarta.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -87,17 +85,17 @@ public class Verifier
private final String skipCorrectnessRegex;
private final boolean simplifiedControlQueriesGenerationEnabled;
private final String simplifiedControlQueriesOutputDirectory;
private final Set<EventClient> eventClients;
private final Set<EventConsumer> eventConsumers;
private final int threadCount;
private final Set<String> allowedQueries;
private final Set<String> bannedQueries;
private final int precision;

public Verifier(PrintStream out, VerifierConfig config, Set<EventClient> eventClients)
public Verifier(PrintStream out, VerifierConfig config, Set<EventConsumer> eventConsumers)
{
requireNonNull(out, "out is null");
requireNonNull(config, "config is null");
this.eventClients = requireNonNull(eventClients, "eventClients is null");
this.eventConsumers = requireNonNull(eventConsumers, "eventConsumers is null");
this.allowedQueries = requireNonNull(config.getAllowedQueries(), "allowedQueries is null");
this.bannedQueries = requireNonNull(config.getBannedQueries(), "bannedQueries is null");
this.runId = config.getRunId();
Expand Down Expand Up @@ -224,10 +222,10 @@ public int run(List<QueryPair> queries)
failed++;
}

for (EventClient eventClient : eventClients) {
eventClient.post(buildEvent(validator));
VerifierQueryEvent queryEvent = buildEvent(validator);
for (EventConsumer eventConsumer : eventConsumers) {
eventConsumer.postEvent(queryEvent);
}

double progress = (((double) total) / totalQueries) * 100;
if (!isQuiet || (progress - lastProgress) > 1) {
log.info("Progress: %s valid, %s failed, %s skipped, %.2f%% done", valid, failed, skipped, progress);
Expand All @@ -238,15 +236,13 @@ public int run(List<QueryPair> queries)
log.info("Results: %s / %s (%s skipped)", valid, failed, skipped);
log.info("");

for (EventClient eventClient : eventClients) {
if (eventClient instanceof Closeable) {
try {
((Closeable) eventClient).close();
}
catch (IOException _) {
}
log.info("");
for (EventConsumer eventConsumer : eventConsumers) {
try {
eventConsumer.close();
}
catch (IOException _) {
}
log.info("");
}

return failed;
Expand Down
Loading

0 comments on commit 8f66aef

Please sign in to comment.