Skip to content

Commit

Permalink
convert shortform to full JMXServiceURL in TargetPostHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
aali309 committed Feb 15, 2024
1 parent d429b2d commit 00cb5a5
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 198 deletions.
15 changes: 1 addition & 14 deletions src/main/java/io/cryostat/configuration/CredentialsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Optional;
import java.util.Set;

import javax.management.remote.JMXServiceURL;
import javax.script.ScriptException;

import io.cryostat.core.log.Logger;
Expand All @@ -51,7 +50,6 @@
import dagger.Lazy;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.openjdk.jmc.rjmx.ConnectionToolkit;

public class CredentialsManager
extends AbstractEventEmitter<CredentialsManager.CredentialsEvent, String> {
Expand Down Expand Up @@ -166,25 +164,14 @@ public int removeCredentials(String matchExpression) throws MatchExpressionValid
return -1;
}

public JMXServiceURL createServiceURL(String host, int port) throws MalformedURLException {
return ConnectionToolkit.createServiceURL(host, port);
}

public Credentials getCredentialsByTargetId(String targetId) throws ScriptException {
try {
for (ServiceRef service : this.platformClient.listDiscoverableServices()) {
URI uri = service.getServiceUri();
boolean match = false;
boolean isJmx = URIUtil.isJmxUrl(uri);
boolean isShortForm = targetId.matches("localhost:\\d+");

if (isJmx) {
match = Objects.equals(uri.toString(), targetId);
} else if (isShortForm) {
String[] parts = targetId.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
targetId = ConnectionToolkit.createServiceURL(host, port).toString();
} else {
URI in = new URI(targetId);
match = Objects.equals(uri, in);
Expand All @@ -196,7 +183,7 @@ public Credentials getCredentialsByTargetId(String targetId) throws ScriptExcept
}
}
return null;
} catch (URISyntaxException | MalformedURLException use) {
} catch (URISyntaxException use) {
throw new IllegalStateException(use);
}
}
Expand Down
164 changes: 75 additions & 89 deletions src/main/java/io/cryostat/discovery/DiscoveryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
private ScheduledFuture<?> pluginPruneTask;
private ScheduledFuture<?> targetRetryTask;

private final Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> nonConnectableTargets =
new ConcurrentHashMap<>();
private final Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> nonConnectableTargets = new ConcurrentHashMap<>();

public static final String DISCOVERY_STARTUP_ADDRESS = "discovery-startup";

Expand Down Expand Up @@ -138,24 +137,21 @@ public void start(Promise<Void> future) throws Exception {
.onSuccess(ar -> future.complete())
.onFailure(t -> future.fail((Throwable) t))
.eventually(
m ->
getVertx()
.eventBus()
.send(
DISCOVERY_STARTUP_ADDRESS,
"Discovery storage deployed"));
m -> getVertx()
.eventBus()
.send(
DISCOVERY_STARTUP_ADDRESS,
"Discovery storage deployed"));
});

this.pluginPruneTask =
scheduler.scheduleAtFixedRate(
this::pingPrune,
pingPeriod.toMillis(),
pingPeriod.toMillis(),
TimeUnit.MILLISECONDS);
this.pluginPruneTask = scheduler.scheduleAtFixedRate(
this::pingPrune,
pingPeriod.toMillis(),
pingPeriod.toMillis(),
TimeUnit.MILLISECONDS);
// TODO make this configurable
this.targetRetryTask =
scheduler.scheduleAtFixedRate(
this::checkNonConnectedTargetJvmIds, 2, 2, TimeUnit.SECONDS);
this.targetRetryTask = scheduler.scheduleAtFixedRate(
this::checkNonConnectedTargetJvmIds, 2, 2, TimeUnit.SECONDS);
this.credentialsManager
.get()
.addListener(
Expand All @@ -166,12 +162,11 @@ public void start(Promise<Void> future) throws Exception {
entry -> {
try {
ServiceRef target = entry.getKey().getTarget();
boolean credentialsApply =
matchExpressionEvaluator
.get()
.applies(
event.getPayload(),
target);
boolean credentialsApply = matchExpressionEvaluator
.get()
.applies(
event.getPayload(),
target);
return credentialsApply && testJvmId(target);
} catch (ScriptException e) {
logger.error(e);
Expand All @@ -192,10 +187,9 @@ public void start(Promise<Void> future) throws Exception {
switch (tde.getEventKind()) {
case MODIFIED:
testNonConnectedTargets(
entry ->
Objects.equals(
tde.getServiceRef(),
entry.getKey().getTarget()));
entry -> Objects.equals(
tde.getServiceRef(),
entry.getKey().getTarget()));
break;
case LOST:
var it = nonConnectableTargets.entrySet().iterator();
Expand All @@ -220,17 +214,15 @@ private void checkNonConnectedTargetJvmIds() {
ConnectionAttemptRecord attemptRecord = nonConnectableTargets.get(entry);
// TODO make this configurable, use an exponential backoff, have a
// maximum retry policy, etc.
long nextAttempt =
(attemptRecord.attemptCount * attemptRecord.attemptCount)
+ attemptRecord.lastAttemptTimestamp;
long nextAttempt = (attemptRecord.attemptCount * attemptRecord.attemptCount)
+ attemptRecord.lastAttemptTimestamp;
attemptRecord.attemptCount++;
long now = clock.now().getEpochSecond();
if (now < nextAttempt) {
return false;
}
long elapsed =
attemptRecord.lastAttemptTimestamp
- attemptRecord.firstAttemptTimestamp;
long elapsed = attemptRecord.lastAttemptTimestamp
- attemptRecord.firstAttemptTimestamp;
if (elapsed > ConnectionAttemptRecord.MAX_ATTEMPT_INTERVAL) {
return false;
}
Expand All @@ -250,8 +242,7 @@ private boolean testJvmId(ServiceRef serviceRef) {
}

private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicate) {
Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> copy =
new HashMap<>(nonConnectableTargets);
Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> copy = new HashMap<>(nonConnectableTargets);
for (var entry : copy.entrySet()) {
executor.submit(
() -> {
Expand All @@ -260,8 +251,7 @@ private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicat
nonConnectableTargets.remove(entry.getKey());
UUID id = entry.getKey().getValue();
PluginInfo plugin = getById(id).orElseThrow();
EnvironmentNode original =
gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
EnvironmentNode original = gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
update(id, original.getChildren());
}
} catch (JsonSyntaxException e) {
Expand All @@ -282,50 +272,47 @@ public void stop() {
}

private CompletableFuture<Void> pingPrune() {
List<CompletableFuture<Void>> futures =
dao.getAll().stream()
.map(
plugin -> {
UUID key = plugin.getId();
URI uri = plugin.getCallback();
return ping(HttpMethod.POST, uri)
.<Void>handle(
(v, t) -> {
if (t != null || !Boolean.TRUE.equals(v)) {
if (t != null) {
logger.warn(
ExceptionUtils
.getStackTrace(t));
}
removePlugin(key, uri);
}
return null;
});
})
.toList();
List<CompletableFuture<Void>> futures = dao.getAll().stream()
.map(
plugin -> {
UUID key = plugin.getId();
URI uri = plugin.getCallback();
return ping(HttpMethod.POST, uri)
.<Void>handle(
(v, t) -> {
if (t != null || !Boolean.TRUE.equals(v)) {
if (t != null) {
logger.warn(
ExceptionUtils
.getStackTrace(t));
}
removePlugin(key, uri);
}
return null;
});
})
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

private CompletableFuture<Boolean> ping(HttpMethod mtd, URI uri) {
if (Objects.equals(uri, NO_CALLBACK)) {
return CompletableFuture.completedFuture(true);
}
HttpRequest<Buffer> req =
http.request(mtd, uri.getPort(), uri.getHost(), uri.getPath())
.ssl("https".equals(uri.getScheme()))
.timeout(1_000)
.followRedirects(true);
HttpRequest<Buffer> req = http.request(mtd, uri.getPort(), uri.getHost(), uri.getPath())
.ssl("https".equals(uri.getScheme()))
.timeout(1_000)
.followRedirects(true);
Optional<StoredCredentials> opt = getStoredCredentials(uri);
if (opt.isPresent()) {
StoredCredentials credentials = opt.get();
logger.info(
"Using stored credentials id:{} referenced in ping callback userinfo",
credentials.getId());
req =
req.authentication(
new UsernamePasswordCredentials(
credentials.getCredentials().getUsername(),
credentials.getCredentials().getPassword()));
req = req.authentication(
new UsernamePasswordCredentials(
credentials.getCredentials().getUsername(),
credentials.getCredentials().getPassword()));
}
final HttpRequest<Buffer> freq = req;
CompletableFuture<Boolean> result = new CompletableFuture<>();
Expand Down Expand Up @@ -365,8 +352,7 @@ private Optional<StoredCredentials> getStoredCredentials(URI uri) {
if (StringUtils.isNotBlank(userInfo) && userInfo.contains(":")) {
String[] parts = userInfo.split(":");
if ("storedcredentials".equals(parts[0])) {
Optional<StoredCredentials> opt =
credentialsManager.get().getById(Integer.parseInt(parts[1]));
Optional<StoredCredentials> opt = credentialsManager.get().getById(Integer.parseInt(parts[1]));
if (opt.isEmpty()) {
logger.warn("Could not find stored credentials with id:{} !", parts[1]);
}
Expand Down Expand Up @@ -397,20 +383,22 @@ public UUID register(String realm, URI callback) throws RegistrationException {
if (!cf.get()) {
throw new Exception("callback ping failure");
}
// FIXME it's not great to perform this action as two separate database calls, but we
// want to have the ID embedded within the node object. The ID is generated by the
// database when we create the plugin registration record, and the node object is
// FIXME it's not great to perform this action as two separate database calls,
// but we
// want to have the ID embedded within the node object. The ID is generated by
// the
// database when we create the plugin registration record, and the node object
// is
// serialized into a column of that record.
EnvironmentNode initial = new EnvironmentNode(realm, BaseNodeType.REALM);
UUID id = dao.save(realm, callback, initial).getId();
EnvironmentNode update =
new EnvironmentNode(
realm,
initial.getNodeType(),
mergeLabels(
initial.getLabels(),
Map.of(AnnotationKey.REALM.name(), id.toString())),
initial.getChildren());
EnvironmentNode update = new EnvironmentNode(
realm,
initial.getNodeType(),
mergeLabels(
initial.getLabels(),
Map.of(AnnotationKey.REALM.name(), id.toString())),
initial.getChildren());
PluginInfo updated = dao.update(id, update);
logger.trace("Discovery Registration: \"{}\" [{}]", realm, id);
return updated.getId();
Expand Down Expand Up @@ -463,8 +451,7 @@ private List<AbstractNode> modifyChildrenWithJvmIds(

public List<? extends AbstractNode> update(
UUID id, Collection<? extends AbstractNode> children) {
var updatedChildren =
modifyChildrenWithJvmIds(id, Objects.requireNonNull(children, "children"));
var updatedChildren = modifyChildrenWithJvmIds(id, Objects.requireNonNull(children, "children"));

PluginInfo plugin = dao.get(id).orElseThrow(() -> new NotFoundException(id));

Expand Down Expand Up @@ -500,12 +487,11 @@ public PluginInfo deregister(UUID id) {
}

public EnvironmentNode getDiscoveryTree() {
List<EnvironmentNode> realms =
dao.getAll().stream()
.map(PluginInfo::getSubtree)
.map(s -> gson.fromJson(s, EnvironmentNode.class))
.sorted((s1, s2) -> s1.compareTo(s2))
.toList();
List<EnvironmentNode> realms = dao.getAll().stream()
.map(PluginInfo::getSubtree)
.map(s -> gson.fromJson(s, EnvironmentNode.class))
.sorted((s1, s2) -> s1.compareTo(s2))
.toList();
return new EnvironmentNode(
"Universe", BaseNodeType.UNIVERSE, Collections.emptyMap(), realms);
}
Expand Down
Loading

0 comments on commit 00cb5a5

Please sign in to comment.