Skip to content

Commit

Permalink
Merge changes from 7.1.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Oct 10, 2017
1 parent 7f43cad commit e2ae55e
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pentaho-kettle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.github.zhicwu</groupId>
<artifactId>pdi-cluster</artifactId>
<version>7.1.0.0-SNAPSHOT</version>
<version>7.1.0.5-SNAPSHOT</version>
</parent>
<artifactId>pentaho-kettle</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ protected String appendExtraOptions(String url, Map<String, String> extraOptions
urlBuilder.append(optionSeparator);
}

urlBuilder.append(parameter).append(valueSeparator).append(value);
urlBuilder.append(environmentSubstitute(parameter)).append(valueSeparator).append(environmentSubstitute(value));
first = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -44,9 +44,10 @@ public class LogChannelFileWriter {

private AtomicBoolean active;
private KettleException exception;
private int lastBufferLineNr;
protected OutputStream logFileOutputStream;

private LogChannelFileWriterBuffer buffer;

/**
* Create a new log channel file writer
*
Expand All @@ -63,7 +64,6 @@ public LogChannelFileWriter(String logChannelId, FileObject logFile, boolean app
this.pollingInterval = pollingInterval;

active = new AtomicBoolean(false);
lastBufferLineNr = KettleLogStore.getLastBufferLineNr();

// it's basic move to create the directory *before* creating log file
try {
Expand All @@ -80,6 +80,9 @@ public LogChannelFileWriter(String logChannelId, FileObject logFile, boolean app
} catch (IOException e) {
throw new KettleException("There was an error while trying to open file '" + logFile + "' for writing", e);
}

this.buffer = new LogChannelFileWriterBuffer( this.logChannelId );
LoggingRegistry.getInstance().registerLogChannelFileWriterBuffer( this.buffer );
}

/**
Expand Down Expand Up @@ -133,10 +136,8 @@ public void run() {

public synchronized void flush() {
try {
int last = KettleLogStore.getLastBufferLineNr();
StringBuffer buffer = KettleLogStore.getAppender().getBuffer(logChannelId, false, lastBufferLineNr, last);
StringBuffer buffer = this.buffer.getBuffer();
logFileOutputStream.write(buffer.toString().getBytes());
lastBufferLineNr = last;
logFileOutputStream.flush();
} catch (Exception e) {
exception = new KettleException("There was an error logging to file '" + logFile + "'", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -34,6 +34,7 @@ public class LoggingRegistry {
private static final int DEFAULT_MAX_SIZE = 10000;

private final Map<String, LoggingObjectInterface> map;
private Map<String, LogChannelFileWriterBuffer> fileWriterBuffers;
private final Map<String, List<String>> childrenMap;
private final int maxSize;
private final Object syncObject = new Object();
Expand All @@ -42,6 +43,7 @@ public class LoggingRegistry {

private LoggingRegistry() {
this.map = new ConcurrentHashMap<String, LoggingObjectInterface>();
this.fileWriterBuffers = new ConcurrentHashMap<>();
this.childrenMap = new ConcurrentHashMap<String, List<String>>();

this.lastModificationTime = new Date();
Expand Down Expand Up @@ -131,9 +133,12 @@ public int compare(LoggingObjectInterface o1, LoggingObjectInterface o2) {
}
});
int cutCount = this.maxSize < 1000 ? this.maxSize : 1000;
List<String> channelsNotToRemove = getLogChannelFileWriterBufferIds();
for (int i = 0; i < cutCount; i++) {
LoggingObjectInterface toRemove = all.get(i);
this.map.remove(toRemove.getLogChannelId());
if (!channelsNotToRemove.contains(toRemove.getLogChannelId())) {
this.map.remove(toRemove.getLogChannelId());
}
}
removeOrphans();
}
Expand Down Expand Up @@ -252,4 +257,39 @@ public void removeOrphans() {
// Remove all orphaned children
this.childrenMap.keySet().retainAll(this.map.keySet());
}

public void registerLogChannelFileWriterBuffer(LogChannelFileWriterBuffer fileWriterBuffer) {
this.fileWriterBuffers.put(fileWriterBuffer.getLogChannelId(), fileWriterBuffer);
}

public LogChannelFileWriterBuffer getLogChannelFileWriterBuffer(String id) {
for (String bufferId : this.fileWriterBuffers.keySet()) {
if (getLogChannelChildren(bufferId).contains(id)) {
return this.fileWriterBuffers.get(bufferId);
}
}
return null;
}

protected List<String> getLogChannelFileWriterBufferIds() {
Set<String> bufferIds = this.fileWriterBuffers.keySet();

List<String> ids = new ArrayList<>();
for (String id : bufferIds) {
ids.addAll(getLogChannelChildren(id));
}

ids.addAll(bufferIds);
return ids;
}

public void removeLogChannelFileWriterBuffer(String id) {
Set<String> bufferIds = this.fileWriterBuffers.keySet();

for (String bufferId : bufferIds) {
if (getLogChannelChildren(id).contains(bufferId)) {
this.fileWriterBuffers.remove(bufferId);
}
}
}
}
3 changes: 3 additions & 0 deletions pentaho-kettle/src/main/java/org/pentaho/di/job/JobMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,9 @@ public void loadXML(Node jobnode, String fname, Repository rep, IMetaStore metaS
// Set the filename here so it can be used in variables for ALL aspects of the job FIX: PDI-8890
if (null == rep) {
setFilename(fname);
} else {
// Set the repository here so it can be used in variables for ALL aspects of the job FIX: PDI-16441
setRepository(rep);
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.pentaho.metastore.api.IMetaStore;
import org.w3c.dom.Node;

import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;

Expand Down Expand Up @@ -984,6 +985,32 @@ public Result execute(Result result, int nr) throws KettleException {
}
}

// PDI-14781
// Write log from carte to file
if (setLogfile && jobStatus != null) {
String logFromCarte = jobStatus.getLoggingString();
if (!Utils.isEmpty(logFromCarte)) {
FileObject logfile = logChannelFileWriter.getLogFile();
OutputStream logFileOutputStream = null;
try {
logFileOutputStream = KettleVFS.getOutputStream(logfile, setAppendLogfile);
logFileOutputStream.write(logFromCarte.getBytes());
logFileOutputStream.flush();
} catch (Exception e) {
logError("There was an error logging to file '" + logfile + "'", e);
} finally {
try {
if (logFileOutputStream != null) {
logFileOutputStream.close();
logFileOutputStream = null;
}
} catch (Exception e) {
logError("There was an error closing log file file '" + logfile + "'", e);
}
}
}
}

if (!waitingToFinish) {
// Since the job was posted successfully, the result is true...
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,9 @@ public void loadXML(Node transnode, String fname, IMetaStore metaStore, Reposito
// Set the filename here so it can be used in variables for ALL aspects of the transformation FIX: PDI-8890
if (null == rep) {
setFilename(fname);
} else {
// Set the repository here so it can be used in variables for ALL aspects of the job FIX: PDI-16441
setRepository(rep);
}

// Read all the database connections from the repository to make sure that we don't overwrite any there by
Expand Down Expand Up @@ -3112,6 +3115,10 @@ public void loadXML(Node transnode, String fname, IMetaStore metaStore, Reposito
for (int i = 0; i < nrSlaveServers; i++) {
Node slaveServerNode = XMLHandler.getSubNodeByNr(slaveServersNode, SlaveServer.XML_TAG, i);
SlaveServer slaveServer = new SlaveServer(slaveServerNode);
if (slaveServer.getName() == null) {
log.logError(BaseMessages.getString(PKG, "TransMeta.Log.WarningWhileCreationSlaveServer", slaveServer.getName()));
continue;
}
slaveServer.shareVariablesWith(this);

// Check if the object exists and if it's a shared object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -27,6 +27,7 @@
import org.pentaho.di.core.Const;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleRowException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.injection.Injection;
Expand Down Expand Up @@ -297,7 +298,7 @@ public void getFields(RowMetaInterface r, String name, RowMetaInterface[] info,
boolean found = false;
for (int i = 0; i < info.length && !found; i++) {
if (info[i] != null) {
r.mergeRowMeta(info[i]);
r.mergeRowMeta(info[i], name);
found = true;
}
}
Expand Down Expand Up @@ -338,6 +339,36 @@ public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepM
PKG, "MergeRowsMeta.CheckResult.OneSourceStepMissing"), stepMeta);
remarks.add(cr);
}

RowMetaInterface referenceRowMeta = null;
RowMetaInterface compareRowMeta = null;
try {
referenceRowMeta = transMeta.getPrevStepFields(referenceStream.getStepname());
compareRowMeta = transMeta.getPrevStepFields(compareStream.getStepname());
} catch (KettleStepException kse) {
new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(
PKG, "MergeRowsMeta.CheckResult.ErrorGettingPrevStepFields"), stepMeta);
}
if (referenceRowMeta != null && compareRowMeta != null) {
boolean rowsMatch = false;
try {
MergeRows.checkInputLayoutValid(referenceRowMeta, compareRowMeta);
rowsMatch = true;
} catch (KettleRowException kre) {
rowsMatch = false;
}
if (rowsMatch) {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(
PKG, "MergeRowsMeta.CheckResult.RowDefinitionMatch"), stepMeta);
remarks.add(cr);
} else {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(
PKG, "MergeRowsMeta.CheckResult.RowDefinitionNotMatch"), stepMeta);
remarks.add(cr);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -209,7 +209,7 @@ public void cookClasses() {
if (def.isTransformClass()) {
cookedTransformClass = (Class<TransformClassBase>) cookedClass;
}
} catch (Exception e) {
} catch (Throwable e) {
CompileException exception = new CompileException(e.getMessage(), null);
exception.setStackTrace(new StackTraceElement[]{});
cookErrors.add(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -202,6 +202,8 @@ public void run() {

// Let's remove this from the job map...
//
String id = jobMap.getJob(entry).getLogChannelId();
LoggingRegistry.getInstance().removeLogChannelFileWriterBuffer(id);
jobMap.removeJob(entry);

// Remove the logging information from the log registry & central log store
Expand Down
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion pentaho-platform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.github.zhicwu</groupId>
<artifactId>pdi-cluster</artifactId>
<version>7.1.0.0-SNAPSHOT</version>
<version>7.1.0.5-SNAPSHOT</version>
</parent>
<artifactId>pentaho-platform</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the GNU General Public License for more details.
*
*
* Copyright 2006 - 2016 Pentaho Corporation. All rights reserved.
* Copyright 2006 - 2017 Pentaho Corporation. All rights reserved.
*/

package org.pentaho.platform.engine.services.connection.datasource.dbcp;
Expand Down Expand Up @@ -192,7 +192,7 @@ public static PoolingDataSource setupPooledDataSource(IDatabaseConnection databa
poolingDataSource = new PoolingDataSource();
if (dialect instanceof IDriverLocator) {
if (!((IDriverLocator) dialect).initialize(driverClass)) {
throw new RuntimeException(Messages.getInstance()
throw new DriverNotInitializedException(Messages.getInstance()
.getErrorString("PooledDatasourceHelper.ERROR_0009_UNABLE_TO_POOL_DATASOURCE_CANT_INITIALIZE",
databaseConnection.getName(), driverClass));
}
Expand Down Expand Up @@ -412,7 +412,7 @@ private static void initDriverClass(DriverManagerDataSource driverManagerDataSou
String databaseConnectionName) throws DBDatasourceServiceException {
if (dialect instanceof IDriverLocator) {
if (!((IDriverLocator) dialect).initialize(driverClassName)) {
throw new RuntimeException(Messages.getInstance()
throw new DriverNotInitializedException(Messages.getInstance()
.getErrorString("PooledDatasourceHelper.ERROR_0009_UNABLE_TO_POOL_DATASOURCE_CANT_INITIALIZE",
databaseConnectionName, driverClassName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,7 @@ public void triggerNow(String jobId) throws SchedulerException {
} else if (trigger instanceof CronTrigger) {
((CronTrigger) trigger).setPreviousFireTime(new Date());
}
if (trigger.getStartTime() != null && trigger.getStartTime().before(new Date())) {
Date newStartTime = trigger.getFireTimeAfter(new Date());
if (newStartTime != null) {
trigger.setStartTime(newStartTime);
}
}

// force the trigger to be updated with the previous fire time
scheduler.rescheduleJob(jobId, jobKey.getUserName(), trigger);
}
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.zhicwu</groupId>
<artifactId>pdi-cluster</artifactId>
<version>7.1.0.0-SNAPSHOT</version>
<version>7.1.0.5-SNAPSHOT</version>
<packaging>pom</packaging>
<name>PDI Cluster</name>
<description>Instructions and workarounds for building a cluster using Pentaho BA server and Kettle.</description>
Expand All @@ -17,7 +17,7 @@
</modules>

<properties>
<pentaho-ce.version>7.1.0.0-12</pentaho-ce.version>
<pentaho-ce.version>7.1.0.5-67</pentaho-ce.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.basedir>${project.basedir}</main.basedir>
<compiler-plugin.version>3.5.1</compiler-plugin.version>
Expand Down Expand Up @@ -47,7 +47,7 @@
<repositories>
<repository>
<id>pentaho-repo</id>
<url>http://repo.pentaho.org/artifactory/repo/</url>
<url>http://nexus.pentaho.org/content/groups/omni</url>
</repository>
</repositories>

Expand Down

0 comments on commit e2ae55e

Please sign in to comment.