Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Fix bug: Ack and exe threads may read the ShellSpout stdout in same time. #380

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 41 additions & 22 deletions jstorm-core/src/main/java/backtype/storm/spout/ShellSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import clojure.lang.RT;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +48,13 @@ public class ShellSpout implements ISpout {

private TopologyContext _context;

private SpoutMsg _spoutMsg;
private SpoutMsg _ackSpoutMsg;

private SpoutMsg _nextSpoutMsg;

private SpoutMsg _failSpoutMsg;

private boolean isSingleSpoutThread;

private int workerTimeoutMills;
private ScheduledExecutorService heartBeatExecutorService;
Expand All @@ -65,10 +72,14 @@ public void open(Map stormConf, TopologyContext context, SpoutOutputCollector co
_collector = collector;
_context = context;

initSpoutMsg();

workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));

_process = new ShellProcess(_command);

isSingleSpoutThread = JStormServerUtils.isSingleThread(stormConf);

Number subpid = _process.launch(stormConf, context);
LOG.info("Launched subprocess with pid " + subpid);

Expand All @@ -81,30 +92,17 @@ public void close() {
}

public void nextTuple() {
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
_spoutMsg.setCommand("next");
_spoutMsg.setId("");
querySubprocess();
syncQuerySubprocess(_nextSpoutMsg);
}

public void ack(Object msgId) {
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
_spoutMsg.setCommand("ack");
_spoutMsg.setId(msgId);
querySubprocess();
_ackSpoutMsg.setId(msgId);
syncQuerySubprocess(_ackSpoutMsg);
}

public void fail(Object msgId) {
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
_spoutMsg.setCommand("fail");
_spoutMsg.setId(msgId);
querySubprocess();
_failSpoutMsg.setId(msgId);
syncQuerySubprocess(_failSpoutMsg);
}

private void handleMetrics(ShellMsg shellMsg) {
Expand Down Expand Up @@ -135,9 +133,19 @@ private void handleMetrics(ShellMsg shellMsg) {
}
}

private void querySubprocess() {
private void syncQuerySubprocess(SpoutMsg spoutMsg) {
if (isSingleSpoutThread) {
querySubprocess(spoutMsg);
} else {
synchronized (this) {
querySubprocess(spoutMsg);
}
}
}

private void querySubprocess(SpoutMsg spoutMsg) {
try {
_process.writeSpoutMsg(_spoutMsg);
_process.writeSpoutMsg(spoutMsg);

while (true) {
ShellMsg shellMsg = _process.readShellMsg();
Expand Down Expand Up @@ -210,6 +218,18 @@ private void handleError(String msg) {
_collector.reportError(new Exception("Shell Process Exception: " + msg));
}

private void initSpoutMsg() {
_ackSpoutMsg = new SpoutMsg();
_ackSpoutMsg.setCommand("ack");

_failSpoutMsg = new SpoutMsg();
_failSpoutMsg.setCommand("fail");

_nextSpoutMsg = new SpoutMsg();
_nextSpoutMsg.setCommand("next");
_nextSpoutMsg.setId("");
}

@Override
public void activate() {
LOG.info("Start checking heartbeat...");
Expand Down Expand Up @@ -259,5 +279,4 @@ public void run() {
}
}
}

}
10 changes: 1 addition & 9 deletions jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,13 @@ public TaskSendTargets echoToSystemBolt() {
return sendTargets;
}

public boolean isSingleThread(Map conf) {
boolean isOnePending = JStormServerUtils.isOnePending(conf);
if (isOnePending == true) {
return true;
}
return ConfigExtension.isSpoutSingleThread(conf);
}

public BaseExecutors mkExecutor() {
BaseExecutors baseExecutor = null;

if (taskObj instanceof IBolt) {
baseExecutor = new BoltExecutors(this);
} else if (taskObj instanceof ISpout) {
if (isSingleThread(stormConf) == true) {
if (JStormServerUtils.isSingleThread(stormConf)) {
baseExecutor = new SingleThreadSpoutExecutors(this);
} else {
baseExecutor = new MultipleThreadSpoutExecutors(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,8 @@ public static String getHostName(Map conf) {
return hostName;
}

public static boolean isSingleThread(Map conf) {
boolean isOnePending = JStormServerUtils.isOnePending(conf);
return isOnePending || ConfigExtension.isSpoutSingleThread(conf);
}
};