Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add properties to reduce the amount of data fetching from prometheus. #23712

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/src/main/sphinx/connector/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ The following configuration properties are available:
with the values as `value1` and `value2`. Escape comma (`,`) or colon(`:`)
characters in a header name or value with a backslash (`\`).
-
* - `prometheus.query.match.string`
- Match string to send as part of query to Prometheus to filter the data on Prometheus server.
The equivalent catalog session property is `query_match_string`.
-
* - `prometheus.query.functions`
- Comma separated list of functions to be sent to Prometheus HTTP API as part of query.
The equivalent catalog session property is `query_functions`.
-
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think using session properties which affect query results is a good idea. We usually don't do that.
Please consider creating a table function instead: https://trino.io/docs/current/develop/table-functions.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ebyhr , closing this PR, will create a new one with Table Function

:::

## Not exhausting your Trino available heap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HttpHeaders;
import com.google.inject.ConfigurationException;
import com.google.inject.spi.Message;
Expand All @@ -29,11 +30,15 @@
import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public class PrometheusConnectorConfig
{
Expand All @@ -48,6 +53,8 @@ public class PrometheusConnectorConfig
private String password;
private boolean caseInsensitiveNameMatching;
private Map<String, String> additionalHeaders = ImmutableMap.of();
private String matchString;
private Set<String> queryFunctions = ImmutableSet.of();

@NotNull
public URI getPrometheusURI()
Expand Down Expand Up @@ -216,6 +223,34 @@ public PrometheusConnectorConfig setAdditionalHeaders(String httpHeaders)
return this;
}

public Optional<String> getMatchString()
{
return Optional.ofNullable(matchString);
}

@Config("prometheus.query.match.string")
@ConfigDescription("match[] filter to be used in Prometheus HTTP API")
public PrometheusConnectorConfig setMatchString(String matchString)
{
this.matchString = matchString;
return this;
}

public Set<String> getQueryFunctions()
{
return queryFunctions;
}

@Config("prometheus.query.functions")
@ConfigDescription("Comma separated list of functions to be sent to Prometheus HTTP API as part of query")
public PrometheusConnectorConfig setQueryFunctions(List<String> queryFunctions)
{
this.queryFunctions = queryFunctions.stream()
.map(value -> value.toLowerCase(ENGLISH))
.collect(toImmutableSet());
return this;
}

@PostConstruct
public void checkConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public final class PrometheusSessionProperties
implements SessionPropertiesProvider
{
private static final String QUERY_CHUNK_SIZE_DURATION = "query_chunk_size_duration";
private static final String MAX_QUERY_RANGE_DURATION = "max_query_range_duration";
private static final String MATCH_FILTER = "query_match_filter";
private static final String QUERY_FUNCTIONS = "query_functions";

private final List<PropertyMetadata<?>> sessionProperties;

Expand All @@ -46,6 +60,28 @@ public PrometheusSessionProperties(PrometheusConnectorConfig connectorConfig)
"Width of overall query to Prometheus, will be divided into query_chunk_size_duration queries",
connectorConfig.getMaxQueryRangeDuration(),
false))
.add(stringProperty(
MATCH_FILTER,
"query match filter for Prometheus HTTP API",
connectorConfig.getMatchString().orElse(""),
false))
.add(new PropertyMetadata<>(
QUERY_FUNCTIONS,
"List of functions that can be used in Prometheus queries",
new ArrayType(VARCHAR),
Set.class,
connectorConfig.getQueryFunctions(),
false,
object -> ((Collection<?>) object).stream()
.map(String.class::cast)
.peek(property -> {
if (isNullOrEmpty(property)) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_FUNCTIONS));
}
})
.map(schema -> schema.toLowerCase(ENGLISH))
.collect(toImmutableSet()),
value -> value))
.build();
}

Expand All @@ -64,4 +100,15 @@ public static Duration getMaxQueryRange(ConnectorSession session)
{
return session.getProperty(MAX_QUERY_RANGE_DURATION, Duration.class);
}

public static Optional<String> getMatchFilter(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(MATCH_FILTER, String.class));
}

@SuppressWarnings("unchecked cast")
public static Set<String> getQueryFunctions(ConnectorSession session)
{
return (Set<String>) session.getProperty(QUERY_FUNCTIONS, Set.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE;
import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_UNKNOWN_ERROR;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMatchFilter;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMaxQueryRange;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryChunkSize;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryFunctions;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static java.time.Instant.ofEpochMilli;
import static java.util.Objects.requireNonNull;
Expand All @@ -59,7 +62,6 @@ public class PrometheusSplitManager
static final long OFFSET_MILLIS = 1L;
private final PrometheusClient prometheusClient;
private final PrometheusClock prometheusClock;

private final URI prometheusURI;

@Inject
Expand Down Expand Up @@ -88,31 +90,57 @@ public ConnectorSplitSource getSplits(

Duration maxQueryRangeDuration = getMaxQueryRange(session);
Duration queryChunkSizeDuration = getQueryChunkSize(session);

Optional<String> matchString = getMatchFilter(session);
Set<String> queryFunctions = getQueryFunctions(session);
List<ConnectorSplit> splits = generateTimesForSplits(prometheusClock.now(), maxQueryRangeDuration, queryChunkSizeDuration, tableHandle)
.stream()
.map(time -> {
try {
return new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration).toString());
.flatMap(time -> {
if (queryFunctions.isEmpty()) {
try {
return Stream.of(new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration,
matchString,
Optional.empty()).toString()));
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
}
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
else {
return queryFunctions.stream().map(function -> {
try {
return new PrometheusSplit(buildQuery(
prometheusURI,
time,
table.name(),
queryChunkSizeDuration,
matchString,
Optional.of(function)).toString());
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
}
});
}
}).collect(Collectors.toList());
})
.collect(Collectors.toList());
return new FixedSplitSource(splits);
}

// HttpUriBuilder handles URI encode
private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration)
private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration, Optional<String> matchString, Optional<String> functionName)
throws URISyntaxException
{
String matchFilter = matchString.orElse("");
String queryString = functionName.map(s -> "label_replace(" + s + "(" + metricName + matchFilter + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]),\"__function_name__\"," + "\"" + s + "\"" + ",\"\", \".*\")")
.orElseGet(() -> metricName + matchFilter + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]");

return HttpUriBuilder.uriBuilderFrom(baseURI)
.appendPath("api/v1/query")
.addParameter("query", metricName + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]")
.addParameter("query", queryString)
.addParameter("time", time)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HttpHeaders;
import com.google.inject.ConfigurationException;
Expand All @@ -21,6 +22,7 @@

import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
Expand All @@ -46,7 +48,9 @@ public void testDefaults()
.setPassword(null)
.setReadTimeout(new Duration(10, SECONDS))
.setCaseInsensitiveNameMatching(false)
.setAdditionalHeaders(null));
.setAdditionalHeaders(null)
.setMatchString(null)
.setQueryFunctions(ImmutableList.of()));
}

@Test
Expand All @@ -64,6 +68,8 @@ public void testExplicitPropertyMappings()
.put("prometheus.read-timeout", "30s")
.put("prometheus.case-insensitive-name-matching", "true")
.put("prometheus.http.additional-headers", "key\\:1:value\\,1, key\\,2:value\\:2")
.put("prometheus.query.match.string", "{}")
.put("prometheus.query.functions", "max_over_time,min_over_time,count_over_time,sum_over_time")
.buildOrThrow();

URI uri = URI.create("file://test.json");
Expand All @@ -79,6 +85,8 @@ public void testExplicitPropertyMappings()
expected.setReadTimeout(new Duration(30, SECONDS));
expected.setCaseInsensitiveNameMatching(true);
expected.setAdditionalHeaders("key\\:1:value\\,1, key\\,2:value\\:2");
expected.setMatchString("{}");
expected.setQueryFunctions(List.of("max_over_time", "min_over_time", "count_over_time", "sum_over_time"));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.prometheus;

import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
Expand Down Expand Up @@ -112,6 +113,41 @@ public void testDescribeTable()
"('value', 'double', '', '')");
}

@Test
public void testSessionPropertyMatchString()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("prometheus", "query_match_filter", "{instance=\"bla\"}")
.build();

assertQuery(session,
"SHOW SESSION like 'prometheus.query_match_filter'",
"VALUES ('prometheus.query_match_filter', '{instance=\"bla\"}', '', 'varchar', 'query match filter for Prometheus HTTP API')");
// as there is no record with instance="bla" in the test data, the query should return no rows
assertQuery(session,
"SELECT count(*) FROM default.up",
"VALUES(0)");
}

@Test
public void testSessionPropertyQueryFunctions()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("prometheus", "query_functions", "[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]")
.build();

assertQuery(session,
"SHOW SESSION like 'prometheus.query_functions'",
"VALUES ('prometheus.query_functions', '[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]', '[]', 'array(varchar)', 'List of functions that can be used in Prometheus queries')");
assertQuery(session,
"SELECT count(*) FROM default.up",
"VALUES(4)");

MaterializedResult results = computeActual(session, "SELECT * FROM default.up");
assertThat(results).hasSize(4);
results.getMaterializedRows().forEach(row -> assertThat(row.getField(2)).isEqualTo(1.0));
}

// TODO rewrite this test based on query.
@Test
public void testCorrectNumberOfSplitsCreated()
Expand Down