Skip to content

Commit

Permalink
Enhance server cache and inject unique request id for each run
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 11, 2017
1 parent d4ae669 commit ac34140
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@
import com.google.common.cache.CacheBuilder;
import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.parameters.NamedParams;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectRevision;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.www.GetCacheStatusServlet;
import org.pentaho.di.www.SlaveServerJobStatus;
import org.pentaho.di.www.SlaveServerTransStatus;
import org.pentaho.di.www.WebResult;

import javax.servlet.http.HttpServletRequest;
import java.net.URLEncoder;
import java.util.AbstractMap;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -47,6 +56,9 @@ public final class ServerCache {
= Integer.parseInt(System.getProperty("KETTLE_RESOURCE_EXPIRATION_MINUTE", "1800"));
public static final String PARAM_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER");

static final String KEY_ETL_CACHE_ID = System.getProperty("KETTLE_CACHE_ID_KEY", "CACHE_ID");
static final String KEY_ETL_REQUEST_ID = System.getProperty("KETTLE_REQUEST_ID_KEY", "REQUEST_ID");

// On master node, it's for name -> revision + md5; on slave server, it's name -> md5
private static final Cache<String, String> resourceCache = CacheBuilder.newBuilder()
.maximumSize(RESOURCE_CACHE_SIZE)
Expand Down Expand Up @@ -97,6 +109,55 @@ private static String buildResourceName(AbstractMeta meta, Map<String, String> p
return sb.append('@').append(host).append(':').append(port).toString();
}

public static Map<String, String> buildRequestParameters(String resourceName,
Map<String, String> params,
Map<String, String> vars) {
Map<String, String> map = new HashMap<String, String>();

if (!Strings.isNullOrEmpty(resourceName)) {
map.put(KEY_ETL_CACHE_ID, resourceName);
}

if (params != null) {
String requestId = params.get(KEY_ETL_REQUEST_ID);
if (!Strings.isNullOrEmpty(requestId)) {
map.put(KEY_ETL_REQUEST_ID, requestId);
}
}

if (vars != null) {
String requestId = vars.get(KEY_ETL_REQUEST_ID);
if (!Strings.isNullOrEmpty(requestId)) {
map.put(KEY_ETL_REQUEST_ID, requestId);
}
}

LogChannel.GENERAL.logError("=====> Request Parameters: " + map.toString());

return map;
}

public static void updateParametersAndCache(HttpServletRequest request, NamedParams params, String carteObjectId) {
String cacheId = request == null ? null : request.getHeader(KEY_ETL_CACHE_ID);
String requestId = request == null ? null : request.getHeader(KEY_ETL_REQUEST_ID);

LogChannel.GENERAL.logError(
"=====> cacheId=" + cacheId + ", requetId=" + requestId + ", carteId=" + carteObjectId);

if (!Strings.isNullOrEmpty(requestId)) {
try {
params.setParameterValue(KEY_ETL_REQUEST_ID, requestId);
} catch (UnknownParamException e) {
// this should not happen
}
}

// update cache
if (!Strings.isNullOrEmpty(cacheId) && !Strings.isNullOrEmpty(carteObjectId)) {
cacheIdentity(cacheId, carteObjectId);
}
}

/**
* Retrieve a unique id generated for the given resource if it's been cached.
*
Expand All @@ -107,10 +168,26 @@ public static String getCachedIdentity(String resourceName) {
return RESOURCE_CACHE_DISABLED ? null : resourceCache.getIfPresent(resourceName);
}

public static String getCachedIdentity(AbstractMeta meta, Map<String, String> params, SlaveServer server) {
public static Map.Entry<String, String> getCachedEntry(
AbstractMeta meta, Map<String, String> params, SlaveServer server) {
String resourceName = buildResourceName(meta, params, server);
String identity = getCachedIdentity(resourceName);

if (Strings.isNullOrEmpty(identity)) {
// don't give up so quick as this might be cached on slave server
try {
String reply =
server.execService(GetCacheStatusServlet.CONTEXT_PATH + "/?name="
+ URLEncoder.encode(resourceName, "UTF-8"));
WebResult webResult = WebResult.fromXMLString(reply);
if (webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
identity = webResult.getId();
}
} catch (Exception e) {
// ignore as this is usually a network issue
}
}

// let's see if the slave server still got this
if (!Strings.isNullOrEmpty(identity)) {
try {
Expand Down Expand Up @@ -142,7 +219,7 @@ public static String getCachedIdentity(AbstractMeta meta, Map<String, String> pa
}
}

return identity;
return new AbstractMap.SimpleImmutableEntry<String, String>(resourceName, identity);
}

/**
Expand Down
8 changes: 6 additions & 2 deletions pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,9 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio
// Align logging levels between execution configuration and remote server
slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel());

String carteObjectId = ServerCache.getCachedIdentity(jobMeta, executionConfiguration.getParams(), slaveServer);
Map.Entry<String, String> entry
= ServerCache.getCachedEntry(jobMeta, executionConfiguration.getParams(), slaveServer);
String carteObjectId = entry.getValue();

FileObject tempFile = null;
try {
Expand Down Expand Up @@ -1782,7 +1784,9 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio
//
String reply =
slaveServer.execService(StartJobServlet.CONTEXT_PATH + "/?name=" + URLEncoder.encode(jobMeta.getName(),
"UTF-8") + "&xml=Y&id=" + carteObjectId);
"UTF-8") + "&xml=Y&id=" + carteObjectId,
ServerCache.buildRequestParameters(entry.getKey(),
executionConfiguration.getParams(), executionConfiguration.getVariables()));
WebResult webResult = WebResult.fromXMLString(reply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
ServerCache.invalidate(jobMeta, executionConfiguration.getParams(), slaveServer);
Expand Down
8 changes: 6 additions & 2 deletions pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java
Original file line number Diff line number Diff line change
Expand Up @@ -4109,7 +4109,9 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig
throw new KettleException("The transformation needs a name to uniquely identify it by on the remote server.");
}

String carteObjectId = ServerCache.getCachedIdentity(transMeta, executionConfiguration.getParams(), slaveServer);
Map.Entry<String, String> entry
= ServerCache.getCachedEntry(transMeta, executionConfiguration.getParams(), slaveServer);
String carteObjectId = entry.getValue();

FileObject tempFile = null;
try {
Expand Down Expand Up @@ -4196,7 +4198,9 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig
//
reply =
slaveServer.execService(StartExecutionTransServlet.CONTEXT_PATH + "/?name=" + URLEncoder.encode(transMeta
.getName(), "UTF-8") + "&xml=Y&id=" + carteObjectId);
.getName(), "UTF-8") + "&xml=Y&id=" + carteObjectId,
ServerCache.buildRequestParameters(entry.getKey(),
executionConfiguration.getParams(), executionConfiguration.getVariables()));
webResult = WebResult.fromXMLString(reply);

if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

public class CarteSingleton {

private static Class<?> PKG = Carte.class; // for i18n purposes, needed by Translator2!!

private static SlaveServerConfig slaveServerConfig;
Expand Down Expand Up @@ -173,8 +172,8 @@ public void run() {

// Remove the logging information from the log registry & central log store
//
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);
KettleLogStore.discardLines(logChannelId, false);
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);

// transformationMap.deallocateServerSocketPorts(entry);

Expand Down Expand Up @@ -206,8 +205,8 @@ public void run() {

// Remove the logging information from the log registry & central log store
//
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);
KettleLogStore.discardLines(logChannelId, false);
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);

log.logMinimal("Cleaned up job "
+ entry.getName() + " with id " + entry.getId() + " from " + logDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.owasp.esapi.ESAPI;
import org.owasp.esapi.Encoder;
import org.pentaho.di.cluster.ServerCache;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.xml.XMLHandler;
Expand Down Expand Up @@ -194,6 +195,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro

if (trans != null) {
if (trans.isReadyToStart()) {
ServerCache.updateParametersAndCache(request, trans, id);
startThreads(trans);

if (useXML) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.owasp.esapi.ESAPI;
import org.owasp.esapi.Encoder;
import org.pentaho.di.cluster.ServerCache;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
Expand Down Expand Up @@ -242,6 +243,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
}
}

ServerCache.updateParametersAndCache(request, job, id);
runJob(job);

String message = BaseMessages.getString(PKG, "StartJobServlet.Log.JobStarted", jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.quartz.Scheduler;

import java.util.Map;
import java.util.UUID;

import static org.pentaho.platform.scheduler2.quartz.QuartzSchedulerHelper.*;

Expand Down Expand Up @@ -55,6 +56,7 @@ void applyRule(Phase phase, Scheduler scheduler, JobDetail jobDetail) throws Job
&& etlScript != null) {
jobParams.put(KEY_ETL_JOB_ID, jobKey.toString());
jobParams.put(KEY_ETL_TRACE_ID, lineAgeId);
jobParams.put(KEY_ETL_REQUEST_ID, UUID.randomUUID().toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum Phase {
static final String KEY_ETL_SCRIPT = System.getProperty("KETTLE_JOB_NAME_KEY", "ETL_SCRIPT");
static final String KEY_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER");
static final String KEY_ETL_TRACE_ID = System.getProperty("KETTLE_TRACE_ID_KEY", "UNIQUE_ID");
static final String KEY_ETL_REQUEST_ID = System.getProperty("KETTLE_REQUEST_ID_KEY", "REQUEST_ID");

static final int KETTLE_JOB_KILLER_MAX_WAIT
= Integer.parseInt(System.getProperty("KETTLE_JOB_KILLER_WAIT_SEC", "8000"));
Expand Down

0 comments on commit ac34140

Please sign in to comment.