Skip to content

Commit

Permalink
Merge pull request #38 from malakaganga/fix_jmx_connection
Browse files Browse the repository at this point in the history
Add proper pooling support for JedisCluster
  • Loading branch information
malakaganga authored Apr 10, 2023
2 parents 277afb8 + 0a01b75 commit 729fdf0
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.wso2.carbon.connector</groupId>
<artifactId>org.wso2.carbon.connector.redis</artifactId>
<packaging>jar</packaging>
<version>2.6.0</version>
<version>2.7.0</version>
<name>WSO2 Carbon - Mediation Library Connector For Redis</name>
<url>http://wso2.org</url>
<properties>
Expand Down
45 changes: 35 additions & 10 deletions src/main/java/org/wso2/carbon/connector/operations/RedisServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
public class RedisServer {

private static ConcurrentHashMap<String, JedisPool> jedisPoolMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, JedisCluster> jedisClusterMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, JedisSentinelPool> jedisSentinelPoolMap = new ConcurrentHashMap<>();
private int maxConnections;
private JedisCluster jedisCluster;

private MessageContext messageContext;
private Boolean isClusterEnabled = false;
private Boolean isJmxEnabled = false;
private int soTimeout = RedisConstants.DEFAULT_TIMEOUT;
private int connectionTimeout;
private boolean useSsl = false;
Expand All @@ -64,6 +65,7 @@ public class RedisServer {
private int dbNumber = DEFAULT_DATABASE;
private Lock lock = new ReentrantLock();
private Lock jedisLock = new ReentrantLock();
private Lock jedisClusterLock = new ReentrantLock();

public RedisServer(MessageContext messageContext) {
this.messageContext = messageContext;
Expand All @@ -72,6 +74,10 @@ public RedisServer(MessageContext messageContext) {
if (redisClusterEnabled != null && !redisClusterEnabled.isEmpty()) {
isClusterEnabled = Boolean.parseBoolean(redisClusterEnabled);
}
String isJmxEnabled = (String) messageContext.getProperty(RedisConstants.IS_JMX_ENABLED);
if (isJmxEnabled != null && !isJmxEnabled.isEmpty()) {
this.isJmxEnabled = Boolean.parseBoolean(isJmxEnabled);
}
String soTimeoutProp = (String) messageContext.getProperty(RedisConstants.TIMEOUT);
String connectionTimeoutProp = (String) messageContext.getProperty(RedisConstants.CONNECTION_TIMEOUT);
String cacheKeyProp = (String) messageContext.getProperty(RedisConstants.CACHEKEY);
Expand Down Expand Up @@ -352,27 +358,46 @@ private JedisCluster createJedisCluster() {
jedisClusterNodes.add(new HostAndPort(redisNode[0].trim(), Integer.parseInt(redisNode[1].trim())));
}

return new JedisCluster(jedisClusterNodes, connectionTimeout, soTimeout, maxAttempts,
cacheKey, clientName, new GenericObjectPoolConfig(),
useSsl);
GenericObjectPoolConfig jedisPoolConfig = new GenericObjectPoolConfig<>();
jedisPoolConfig.setJmxEnabled(isJmxEnabled);
jedisPoolConfig.setMaxTotal(maxConnections); //The maximum number of connections that are supported by the pool.
jedisPoolConfig.setMaxIdle(maxConnections); // Is the actual maximum number of connections required by workloads
// (maxTotal = maxIdle)
jedisPoolConfig.setTestOnBorrow(false); //set to default false
jedisPoolConfig.setTestOnReturn(false); //set to default false
jedisPoolConfig.setTestWhileIdle(true);
jedisPoolConfig.setNumTestsPerEvictionRun(3);
jedisPoolConfig.setBlockWhenExhausted(true);

//Use double lock to avoid creating a new Jedis Cluster Connection Pool for each request.
if (jedisClusterMap.get(uniquePoolId) == null) {
jedisClusterLock.lock();
try {
if (jedisClusterMap.get(uniquePoolId) == null) {
JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, connectionTimeout, soTimeout, maxAttempts,
cacheKey, clientName, jedisPoolConfig, useSsl);
jedisClusterMap.put(uniquePoolId, jedisCluster);
}
} finally {
jedisClusterLock.unlock();
}
}
return jedisClusterMap.get(uniquePoolId);
}

public Jedis getJedis() {
return createJedis();
}

public JedisCluster getJedisCluster() {
this.jedisCluster = createJedisCluster();
return jedisCluster;
return createJedisCluster();
}

/**
* Close the datasources objects associated Jedis and JedisCluster instances.
* Close the datasources objects associated with JedisCluster instances.
*/
public void close() {
if (jedisCluster != null) {
jedisCluster.close();
}
//No need to close the JedisCluster instance as it is handled by the JedisClusterConnectionPool.
}

public Boolean isClusterEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class RedisConstants {
public static final String MAX_ATTEMPTS = "maxAttempts";
public static final String WEIGHT = "weight";
public static final String MAX_CONNECTIONS = "maxConnections";
public static final String IS_JMX_ENABLED = "jmxEnabled";
public static final String CONNECTION_POOL_ID = "redisConnectionPoolId";
public static final String INTERNAL_POOL_ID_SEPARATOR = "INTERNAL_POOL_ID_";
public static final String ARTIFACT_NAME = "ARTIFACT_NAME";
Expand Down
9 changes: 8 additions & 1 deletion src/main/resources/config/init.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
<parameter name="clusterNodes" description="comma separated list of the cluster nodes (host:port)"/>
<parameter name="clientName" description="name of the client"/>
<parameter name="maxAttempts" description="the number of retries"/>

<parameter name="jmxEnabled" description="a flag to enable jmx"/>

<!-- Redis Sentinel specific parameters-->
<parameter name="sentinelPassword" description="sentinel password"/>
<parameter name="sentinelUser" description="sentinel user name"/>
Expand Down Expand Up @@ -100,6 +101,12 @@
<property name="redisClusterEnabled" expression="$func:redisClusterEnabled"/>
</else>
</filter>
<filter xpath="$func:jmxEnabled = '' or not(string($func:jmxEnabled))">
<then/>
<else>
<property name="jmxEnabled" expression="$func:jmxEnabled"/>
</else>
</filter>
<filter xpath="$func:clusterNodes = '' or not(string($func:clusterNodes))">
<then/>
<else>
Expand Down

0 comments on commit 729fdf0

Please sign in to comment.