diff --git a/docs/src/main/sphinx/connector/prometheus.md b/docs/src/main/sphinx/connector/prometheus.md index b559672ae7db47..bbad92bf3ab02f 100644 --- a/docs/src/main/sphinx/connector/prometheus.md +++ b/docs/src/main/sphinx/connector/prometheus.md @@ -91,6 +91,14 @@ The following configuration properties are available: with the values as `value1` and `value2`. Escape comma (`,`) or colon(`:`) characters in a header name or value with a backslash (`\`). - +* - `prometheus.query.match.string` + - Match string to send as part of query to Prometheus to filter the data on Prometheus server. + The equivalent catalog session property is `query_match_string`. + - +* - `prometheus.query.functions` + - Comma separated list of functions to be sent to Prometheus HTTP API as part of query. + The equivalent catalog session property is `query_functions`. + - ::: ## Not exhausting your Trino available heap diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusConnectorConfig.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusConnectorConfig.java index d31d7c133fc102..f2960961a61f5f 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusConnectorConfig.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusConnectorConfig.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.net.HttpHeaders; import com.google.inject.ConfigurationException; import com.google.inject.spi.Message; @@ -29,11 +30,15 @@ import java.io.File; import java.net.URI; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; public class PrometheusConnectorConfig { @@ -48,6 +53,8 @@ public class PrometheusConnectorConfig private String password; private boolean caseInsensitiveNameMatching; private Map additionalHeaders = ImmutableMap.of(); + private String matchString; + private Set queryFunctions = ImmutableSet.of(); @NotNull public URI getPrometheusURI() @@ -216,6 +223,34 @@ public PrometheusConnectorConfig setAdditionalHeaders(String httpHeaders) return this; } + public Optional getMatchString() + { + return Optional.ofNullable(matchString); + } + + @Config("prometheus.query.match.string") + @ConfigDescription("match[] filter to be used in Prometheus HTTP API") + public PrometheusConnectorConfig setMatchString(String matchString) + { + this.matchString = matchString; + return this; + } + + public Set getQueryFunctions() + { + return queryFunctions; + } + + @Config("prometheus.query.functions") + @ConfigDescription("Comma separated list of functions to be sent to Prometheus HTTP API as part of query") + public PrometheusConnectorConfig setQueryFunctions(List queryFunctions) + { + this.queryFunctions = queryFunctions.stream() + .map(value -> value.toLowerCase(ENGLISH)) + .collect(toImmutableSet()); + return this; + } + @PostConstruct public void checkConfig() { diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSessionProperties.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSessionProperties.java index 7b6afdbdc0e4b9..e979424cec1db2 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSessionProperties.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSessionProperties.java @@ -17,18 +17,32 @@ import com.google.inject.Inject; import io.airlift.units.Duration; import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; +import java.util.Collection; import java.util.List; +import java.util.Optional; +import java.util.Set; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; +import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; +import static java.util.Locale.ENGLISH; public final class PrometheusSessionProperties implements SessionPropertiesProvider { private static final String QUERY_CHUNK_SIZE_DURATION = "query_chunk_size_duration"; private static final String MAX_QUERY_RANGE_DURATION = "max_query_range_duration"; + private static final String MATCH_FILTER = "query_match_filter"; + private static final String QUERY_FUNCTIONS = "query_functions"; private final List> sessionProperties; @@ -46,6 +60,28 @@ public PrometheusSessionProperties(PrometheusConnectorConfig connectorConfig) "Width of overall query to Prometheus, will be divided into query_chunk_size_duration queries", connectorConfig.getMaxQueryRangeDuration(), false)) + .add(stringProperty( + MATCH_FILTER, + "query match filter for Prometheus HTTP API", + connectorConfig.getMatchString().orElse(""), + false)) + .add(new PropertyMetadata<>( + QUERY_FUNCTIONS, + "List of functions that can be used in Prometheus queries", + new ArrayType(VARCHAR), + Set.class, + connectorConfig.getQueryFunctions(), + false, + object -> ((Collection) object).stream() + .map(String.class::cast) + .peek(property -> { + if (isNullOrEmpty(property)) { + throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_FUNCTIONS)); + } + }) + .map(schema -> schema.toLowerCase(ENGLISH)) + .collect(toImmutableSet()), + value -> value)) .build(); } @@ -64,4 +100,15 @@ public static Duration getMaxQueryRange(ConnectorSession session) { return session.getProperty(MAX_QUERY_RANGE_DURATION, Duration.class); } + + public static Optional getMatchFilter(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(MATCH_FILTER, String.class)); + } + + @SuppressWarnings("unchecked cast") + public static Set getQueryFunctions(ConnectorSession session) + { + return (Set) session.getProperty(QUERY_FUNCTIONS, Set.class); + } } diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java index 3c2be8bdc774dc..e7235c24ffe90d 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java @@ -44,11 +44,14 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE; import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_UNKNOWN_ERROR; +import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMatchFilter; import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMaxQueryRange; import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryChunkSize; +import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryFunctions; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static java.time.Instant.ofEpochMilli; import static java.util.Objects.requireNonNull; @@ -59,7 +62,6 @@ public class PrometheusSplitManager static final long OFFSET_MILLIS = 1L; private final PrometheusClient prometheusClient; private final PrometheusClock prometheusClock; - private final URI prometheusURI; @Inject @@ -88,31 +90,56 @@ public ConnectorSplitSource getSplits( Duration maxQueryRangeDuration = getMaxQueryRange(session); Duration queryChunkSizeDuration = getQueryChunkSize(session); - + Optional matchString = getMatchFilter(session); + Set queryFunctions = getQueryFunctions(session); List splits = generateTimesForSplits(prometheusClock.now(), maxQueryRangeDuration, queryChunkSizeDuration, tableHandle) .stream() - .map(time -> { - try { - return new PrometheusSplit(buildQuery( - prometheusURI, - time, - table.name(), - queryChunkSizeDuration).toString()); + .flatMap(time -> { + if (queryFunctions.isEmpty()) { + try { + return Stream.of(new PrometheusSplit(buildQuery( + prometheusURI, + time, + table.name(), + queryChunkSizeDuration, + matchString.orElse("{}"), + Optional.empty()).toString())); + } + catch (URISyntaxException e) { + throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage()); + } } - catch (URISyntaxException e) { - throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage()); + else { + return queryFunctions.stream().map(function -> { + try { + return new PrometheusSplit(buildQuery( + prometheusURI, + time, + table.name(), + queryChunkSizeDuration, + matchString.orElse("{}"), + Optional.of(function)).toString()); + } + catch (URISyntaxException e) { + throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage()); + } + }); } - }).collect(Collectors.toList()); + }) + .collect(Collectors.toList()); return new FixedSplitSource(splits); } // HttpUriBuilder handles URI encode - private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration) + private static URI buildQuery(URI baseURI, String time, String metricName, Duration queryChunkSizeDuration, String matchString, Optional functionName) throws URISyntaxException { + String queryString = functionName.map(s -> "label_replace(" + s + "(" + metricName + matchString + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]),\"__function_name__\"," + "\"" + s + "\"" + ",\"\", \".*\")") + .orElseGet(() -> metricName + matchString + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]"); + return HttpUriBuilder.uriBuilderFrom(baseURI) .appendPath("api/v1/query") - .addParameter("query", metricName + "[" + queryChunkSizeDuration.roundTo(queryChunkSizeDuration.getUnit()) + Duration.timeUnitToString(queryChunkSizeDuration.getUnit()) + "]") + .addParameter("query", queryString) .addParameter("time", time) .build(); } diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusConnectorConfig.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusConnectorConfig.java index 5a8d2c4e81f3ef..766a032c68325e 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusConnectorConfig.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusConnectorConfig.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.prometheus; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.net.HttpHeaders; import com.google.inject.ConfigurationException; @@ -21,6 +22,7 @@ import java.io.File; import java.net.URI; +import java.util.List; import java.util.Map; import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; @@ -46,7 +48,9 @@ public void testDefaults() .setPassword(null) .setReadTimeout(new Duration(10, SECONDS)) .setCaseInsensitiveNameMatching(false) - .setAdditionalHeaders(null)); + .setAdditionalHeaders(null) + .setMatchString(null) + .setQueryFunctions(ImmutableList.of())); } @Test @@ -64,6 +68,8 @@ public void testExplicitPropertyMappings() .put("prometheus.read-timeout", "30s") .put("prometheus.case-insensitive-name-matching", "true") .put("prometheus.http.additional-headers", "key\\:1:value\\,1, key\\,2:value\\:2") + .put("prometheus.query.match.string", "{}") + .put("prometheus.query.functions", "max_over_time,min_over_time,count_over_time,sum_over_time") .buildOrThrow(); URI uri = URI.create("file://test.json"); @@ -79,6 +85,8 @@ public void testExplicitPropertyMappings() expected.setReadTimeout(new Duration(30, SECONDS)); expected.setCaseInsensitiveNameMatching(true); expected.setAdditionalHeaders("key\\:1:value\\,1, key\\,2:value\\:2"); + expected.setMatchString("{}"); + expected.setQueryFunctions(List.of("max_over_time", "min_over_time", "count_over_time", "sum_over_time")); assertFullMapping(properties, expected); } diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java index 1c04fc6c78a1e1..dfa08017bec591 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java @@ -14,6 +14,7 @@ package io.trino.plugin.prometheus; import io.airlift.units.Duration; +import io.trino.Session; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.Constraint; @@ -112,6 +113,41 @@ public void testDescribeTable() "('value', 'double', '', '')"); } + @Test + public void testSessionPropertyMatchString() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("prometheus", "query_match_filter", "{instance=\"bla\"}") + .build(); + + assertQuery(session, + "SHOW SESSION like 'prometheus.query_match_filter'", + "VALUES ('prometheus.query_match_filter', '{instance=\"bla\"}', '', 'varchar', 'query match filter for Prometheus HTTP API')"); + // as there is no record with instance="bla" in the test data, the query should return no rows + assertQuery(session, + "SELECT count(*) FROM default.up", + "VALUES(0)"); + } + + @Test + public void testSessionPropertyQueryFunctions() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("prometheus", "query_functions", "[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]") + .build(); + + assertQuery(session, + "SHOW SESSION like 'prometheus.query_functions'", + "VALUES ('prometheus.query_functions', '[\"max_over_time\",\"min_over_time\",\"count_over_time\",\"sum_over_time\"]', '[]', 'array(varchar)', 'List of functions that can be used in Prometheus queries')"); + assertQuery(session, + "SELECT count(*) FROM default.up", + "VALUES(4)"); + + MaterializedResult results = computeActual(session, "SELECT * FROM default.up"); + assertThat(results).hasSize(4); + results.getMaterializedRows().forEach(row -> assertThat(row.getField(2)).isEqualTo(1.0)); + } + // TODO rewrite this test based on query. @Test public void testCorrectNumberOfSplitsCreated()