Skip to content

Commit

Permalink
Add properties to reduce the amount of data fetching from prometheus.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitkharb committed Oct 8, 2024
1 parent daf0d70 commit ced471c
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 15 deletions.
8 changes: 8 additions & 0 deletions docs/src/main/sphinx/connector/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ The following configuration properties are available:
with the values as `value1` and `value2`. Escape comma (`,`) or colon(`:`)
characters in a header name or value with a backslash (`\`).
-
* - `prometheus.query.match.string`
- Match string to send as part of query to Prometheus to filter the data on Prometheus server.
The equivalent catalog session property is `query_match_string`.
-
* - `prometheus.query.functions`
- Comma separated list of functions to be sent to Prometheus HTTP API as part of query.
The equivalent catalog session property is `query_functions`.
-
:::

## Not exhausting your Trino available heap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HttpHeaders;
import com.google.inject.ConfigurationException;
import com.google.inject.spi.Message;
Expand All @@ -29,11 +30,15 @@
import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public class PrometheusConnectorConfig
{
Expand All @@ -48,6 +53,8 @@ public class PrometheusConnectorConfig
private String password;
private boolean caseInsensitiveNameMatching;
private Map<String, String> additionalHeaders = ImmutableMap.of();
private String matchString;
private Set<String> queryFunctions = ImmutableSet.of();

@NotNull
public URI getPrometheusURI()
Expand Down Expand Up @@ -216,6 +223,34 @@ public PrometheusConnectorConfig setAdditionalHeaders(String httpHeaders)
return this;
}

public Optional<String> getMatchString()
{
return Optional.ofNullable(matchString);
}

@Config("prometheus.query.match.string")
@ConfigDescription("match[] filter to be used in Prometheus HTTP API")
public PrometheusConnectorConfig setMatchString(String matchString)
{
this.matchString = matchString;
return this;
}

public Set<String> getQueryFunctions()
{
return queryFunctions;
}

@Config("prometheus.query.functions")
@ConfigDescription("Comma separated list of functions to be sent to Prometheus HTTP API as part of query")
public PrometheusConnectorConfig setQueryFunctions(List<String> queryFunctions)
{
this.queryFunctions = queryFunctions.stream()
.map(value -> value.toLowerCase(ENGLISH))
.collect(toImmutableSet());
return this;
}

@PostConstruct
public void checkConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public final class PrometheusSessionProperties
implements SessionPropertiesProvider
{
private static final String QUERY_CHUNK_SIZE_DURATION = "query_chunk_size_duration";
private static final String MAX_QUERY_RANGE_DURATION = "max_query_range_duration";
private static final String MATCH_FILTER = "query_match_filter";
private static final String QUERY_FUNCTIONS = "query_functions";

private final List<PropertyMetadata<?>> sessionProperties;

Expand All @@ -46,6 +60,28 @@ public PrometheusSessionProperties(PrometheusConnectorConfig connectorConfig)
"Width of overall query to Prometheus, will be divided into query_chunk_size_duration queries",
connectorConfig.getMaxQueryRangeDuration(),
false))
.add(stringProperty(
MATCH_FILTER,
"query match filter for Prometheus HTTP API",
connectorConfig.getMatchString().orElse(""),
false))
.add(new PropertyMetadata<>(
QUERY_FUNCTIONS,
"List of functions that can be used in Prometheus queries",
new ArrayType(VARCHAR),
Set.class,
connectorConfig.getQueryFunctions(),
false,
object -> ((Collection<?>) object).stream()
.map(String.class::cast)
.peek(property -> {
if (isNullOrEmpty(property)) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_FUNCTIONS));
}
})
.map(schema -> schema.toLowerCase(ENGLISH))
.collect(toImmutableSet()),
value -> value))
.build();
}

Expand All @@ -64,4 +100,15 @@ public static Duration getMaxQueryRange(ConnectorSession session)
{
return session.getProperty(MAX_QUERY_RANGE_DURATION, Duration.class);
}

public static Optional<String> getMatchFilter(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(MATCH_FILTER, String.class));
}

@SuppressWarnings("unchecked cast")
public static Set<String> getQueryFunctions(ConnectorSession session)
{
return (Set<String>) session.getProperty(QUERY_FUNCTIONS, Set.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE;
import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_UNKNOWN_ERROR;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMatchFilter;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMaxQueryRange;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryChunkSize;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryFunctions;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static java.time.Instant.ofEpochMilli;
import static java.util.Objects.requireNonNull;
Expand All @@ -59,7 +62,6 @@ public class PrometheusSplitManager
static final long OFFSET_MILLIS = 1L;
private final PrometheusClient prometheusClient;
private final PrometheusClock prometheusClock;

private final URI prometheusURI;

@Inject
Expand Down Expand Up @@ -88,31 +90,56 @@ public ConnectorSplitSource getSplits(

Duration maxQueryRangeDuration = getMaxQueryRange(session);
Duration queryChunkSizeDuration = getQueryChunkSize(session);

Optional<String> matchString = getMatchFilter(session);
Set<String> queryFunctions = getQueryFunctions(session);
List<ConnectorSplit> splits = generateTimesForSplits(prometheusClock.now(), maxQueryRangeDuration, queryChunkSizeDuration, tableHandle)
.stream()
.map(time -> {
try {
return new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration).toString());
.flatMap(time -> {
if (queryFunctions.isEmpty()) {
try {
return Stream.of(new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration,
matchString.orElse("{}"),
Optional.empty()).toString()));
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
}
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
else {
return queryFunctions.stream().map(function -> {
try {
return new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration,
matchString.orElse("{}"),
Optional.of(function)).toString());
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
}
});
}
}).collect(Collectors.toList());
})
.collect(Collectors.toList());
return new FixedSplitSource(splits);
}

// HttpUriBuilder handles URI encode
private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration)
private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration, String matchString, Optional<String> functionName)
throws URISyntaxException
{
String queryString = functionName.map(s -> "label_replace(" + s + "(" + metricName + matchString + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]),\"__function_name__\"," + "\"" + s + "\"" + ",\"\", \".*\")")
.orElseGet(() -> metricName + matchString + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]");

return HttpUriBuilder.uriBuilderFrom(baseURI)
.appendPath("api/v1/query")
.addParameter("query", metricName + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]")
.addParameter("query", queryString)
.addParameter("time", time)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HttpHeaders;
import com.google.inject.ConfigurationException;
Expand All @@ -21,6 +22,7 @@

import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
Expand All @@ -46,7 +48,9 @@ public void testDefaults()
.setPassword(null)
.setReadTimeout(new Duration(10, SECONDS))
.setCaseInsensitiveNameMatching(false)
.setAdditionalHeaders(null));
.setAdditionalHeaders(null)
.setMatchString(null)
.setQueryFunctions(ImmutableList.of()));
}

@Test
Expand All @@ -64,6 +68,8 @@ public void testExplicitPropertyMappings()
.put("prometheus.read-timeout", "30s")
.put("prometheus.case-insensitive-name-matching", "true")
.put("prometheus.http.additional-headers", "key\\:1:value\\,1, key\\,2:value\\:2")
.put("prometheus.query.match.string", "{}")
.put("prometheus.query.functions", "max_over_time,min_over_time,count_over_time,sum_over_time")
.buildOrThrow();

URI uri = URI.create("file://test.json");
Expand All @@ -79,6 +85,8 @@ public void testExplicitPropertyMappings()
expected.setReadTimeout(new Duration(30, SECONDS));
expected.setCaseInsensitiveNameMatching(true);
expected.setAdditionalHeaders("key\\:1:value\\,1, key\\,2:value\\:2");
expected.setMatchString("{}");
expected.setQueryFunctions(List.of("max_over_time", "min_over_time", "count_over_time", "sum_over_time"));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.prometheus;

import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
Expand Down Expand Up @@ -112,6 +113,41 @@ public void testDescribeTable()
"('value', 'double', '', '')");
}

@Test
public void testSessionPropertyMatchString()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("prometheus", "query_match_filter", "{instance=\"bla\"}")
.build();

assertQuery(session,
"SHOW SESSION like 'prometheus.query_match_filter'",
"VALUES ('prometheus.query_match_filter', '{instance=\"bla\"}', '', 'varchar', 'query match filter for Prometheus HTTP API')");
// as there is no record with instance="bla" in the test data, the query should return no rows
assertQuery(session,
"SELECT count(*) FROM default.up",
"VALUES(0)");
}

@Test
public void testSessionPropertyQueryFunctions()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("prometheus", "query_functions", "[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]")
.build();

assertQuery(session,
"SHOW SESSION like 'prometheus.query_functions'",
"VALUES ('prometheus.query_functions', '[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]', '[]', 'array(varchar)', 'List of functions that can be used in Prometheus queries')");
assertQuery(session,
"SELECT count(*) FROM default.up",
"VALUES(4)");

MaterializedResult results = computeActual(session, "SELECT * FROM default.up");
assertThat(results).hasSize(4);
results.getMaterializedRows().forEach(row -> assertThat(row.getField(2)).isEqualTo(1.0));
}

// TODO rewrite this test based on query.
@Test
public void testCorrectNumberOfSplitsCreated()
Expand Down

0 comments on commit ced471c

Please sign in to comment.