Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] fix deadlock in concurrent connection disconnection #302

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rtt/base/ChannelElement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace RTT { namespace base {
}

/** Writes a new sample on this connection. \a sample is the sample to
* write.
* write.
*
* @returns false if an error occured that requires the channel to be invalidated. In no ways it indicates that the sample has been received by the other side of the channel.
*/
Expand Down Expand Up @@ -243,6 +243,12 @@ namespace RTT { namespace base {
MultipleInputsChannelElementBase::removeInput(input);
}

virtual void removedInputs(Inputs const& inputs)
{
if (find(inputs.begin(), inputs.end(), last) != inputs.end())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing namespace qualifier: std::find?

last = 0;
}

private:
ChannelElement<T> *last;
};
Expand Down
21 changes: 17 additions & 4 deletions rtt/base/ChannelElementBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,11 @@ namespace RTT { namespace base {
}

/**
* This function may be used to identify,
* This function may be used to identify,
* if the current element uses a network
* transport, to send the data to the next
* Element in the logical chain.
*
*
* @return true if a network transport is used.
* */
virtual bool isRemoteElement() const;
Expand All @@ -264,7 +264,7 @@ namespace RTT { namespace base {
* E.g: In the local case output->getLocalURI()
* In the remote case the URI of the remote
* channel element.
*
*
* @return URI of the next element.
* */
virtual std::string getRemoteURI() const;
Expand All @@ -278,7 +278,7 @@ namespace RTT { namespace base {

/**
* Returns the class name of this
* element. This is primary useful
* element. This is primary useful
* for special case handling in the
* connection tracking.
* @return The name of the class of the ChannelElement
Expand Down Expand Up @@ -372,6 +372,8 @@ namespace RTT { namespace base {
bool signalFrom(ChannelElementBase *caller);

protected:
bool disconnectSingleInputChannel(ChannelElementBase::shared_ptr const& channel, bool forward);

/**
* Sets the new input channel element of this element or adds a channel to the inputs list.
* @param input the previous element in chain.
Expand All @@ -384,6 +386,8 @@ namespace RTT { namespace base {
* @param input the element to be removed
*/
virtual void removeInput(ChannelElementBase::shared_ptr const& input);

virtual void removedInputs(Inputs const& inputs) = 0;
};

/**
Expand Down Expand Up @@ -435,6 +439,15 @@ namespace RTT { namespace base {
virtual bool disconnect(ChannelElementBase::shared_ptr const& channel, bool forward = false);

protected:
/**
* Disconnects a single channel
*
* \ref disconnect's channel argument may be null, which is meant to indicate that
* all channels must be disconnected. This method handles the non-null case for
* clarity.
*/
bool disconnectSingleOutputChannel(ChannelElementBase::shared_ptr const& channel, bool forward);

/**
* Sets the new output channel element of this element or adds a channel to the outputs list.
* @param output the next element in chain.
Expand Down
155 changes: 93 additions & 62 deletions rtt/base/ChannelInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,45 +272,59 @@ void MultipleInputsChannelElementBase::clear()
bool MultipleInputsChannelElementBase::disconnect(ChannelElementBase::shared_ptr const& channel, bool forward)
{
if (channel) {
bool was_last = false;
return disconnectSingleInputChannel(channel, forward);
} else if (!forward) {
Inputs removedInputs;
{
// Remove the channel from the inputs list
RTT::os::MutexLock lock(inputs_lock);
Inputs::iterator found = std::find(inputs.begin(), inputs.end(), channel);
if (found == inputs.end()) {
return false;
}
ChannelElementBase::shared_ptr input = *found;

if (!forward) {
if (!input->disconnect(this, forward)) {
return false;
}
}

removeInput(input.get()); // invalidates input
was_last = inputs.empty();
removedInputs.splice(removedInputs.end(), inputs);
this->removedInputs(removedInputs);
}

// If the removed input was the last channel and forward is true, disconnect output side, too.
if (was_last && forward) {
return disconnect(0, true);
for (Inputs::iterator it = removedInputs.begin(); it != removedInputs.end(); ++it) {
(*it)->disconnect(this, false);
}
}

return true;
return ChannelElementBase::disconnect(channel, forward);
}

} else if (!forward) {
// Disconnect and remove all inputs
bool MultipleInputsChannelElementBase::disconnectSingleInputChannel(ChannelElementBase::shared_ptr const& channel, bool forward)
{
// Remove the channel from the outputs list
bool was_last = false;

// Must remove first the item from the output list before we attempt the
// disconnection, or another thread might try to remove it as well
Inputs removedInputs;
{
RTT::os::MutexLock lock(inputs_lock);
for (Inputs::iterator it = inputs.begin(); it != inputs.end(); ) {
const ChannelElementBase::shared_ptr &input = *it++;
input->disconnect(this, false);
removeInput(input.get()); // invalidates input
Inputs::const_iterator found = std::find(inputs.begin(), inputs.end(), channel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently the iterator needs to be non-const for std::list::splice for compatibility with C++98.

if (found == inputs.end())
return false;

removedInputs.splice(removedInputs.end(), inputs, found);
this->removedInputs(removedInputs);
was_last = inputs.empty();
}

if (!forward) {
if (!channel->disconnect(this, false)) {
{
// Disconnection failed, re-add to the output list
RTT::os::MutexLock lock(inputs_lock);
inputs.splice(inputs.end(), removedInputs);
}
return false;
}
assert(inputs.empty());
}

return ChannelElementBase::disconnect(channel, forward);
// If the removed output was the last channel, disconnect input side, too.
if (was_last && forward) {
return disconnect(0, true);
}

return true;
}

bool MultipleInputsChannelElementBase::signalFrom(ChannelElementBase *)
Expand Down Expand Up @@ -374,58 +388,75 @@ bool MultipleOutputsChannelElementBase::channelReady(ChannelElementBase::shared_
bool MultipleOutputsChannelElementBase::disconnect(ChannelElementBase::shared_ptr const& channel, bool forward)
{
if (channel) {
// Remove the channel from the outputs list
bool was_last = false;
return disconnectSingleOutputChannel(channel, forward);
}

if (forward) {
// Disconnect and remove all outputs
Outputs outputs;
{
RTT::os::MutexLock lock(outputs_lock);
Outputs::iterator found = std::find(outputs.begin(), outputs.end(), channel);
if (found == outputs.end()) {
return false;
}
const Output &output = *found;
outputs.splice(outputs.end(), this->outputs);
}
for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ++it) {
it->channel->disconnect(this, true);
}
}

if (forward) {
if (!output.channel->disconnect(this, forward)) {
return false;
}
}
return ChannelElementBase::disconnect(channel, forward);
}

removeOutput(output.channel.get()); // invalidates output
was_last = outputs.empty();
}
bool MultipleOutputsChannelElementBase::disconnectSingleOutputChannel(ChannelElementBase::shared_ptr const& channel, bool forward)
{
// Remove the channel from the outputs list
bool was_last = false;

// If the removed output was the last channel, disconnect input side, too.
if (was_last && !forward) {
return disconnect(0, false);
}
// Must remove first the item from the output list before we attempt the
// disconnection, or another thread might try to remove it as well
Outputs removedOutput;
{
RTT::os::MutexLock lock(outputs_lock);
Outputs::const_iterator found = std::find(outputs.begin(), outputs.end(), channel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently the iterator needs to be non-const for std::list::splice for compatibility with C++98.

if (found == outputs.end())
return false;

return true;
removedOutput.splice(removedOutput.end(), outputs, found);
was_last = outputs.empty();
}

if (forward) {
// Disconnect and remove all outputs
RTT::os::MutexLock lock(outputs_lock);
for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ) {
const Output &output = *it++;
output.channel->disconnect(this, true);
removeOutput(output.channel.get()); // invalidates output
if (!channel->disconnect(this, true)) {
{
// Disconnection failed, re-add to the output list
RTT::os::MutexLock lock(outputs_lock);
outputs.splice(outputs.end(), removedOutput);
}
return false;
}
assert(outputs.empty());
}

return ChannelElementBase::disconnect(channel, forward);
// If the removed output was the last channel, disconnect input side, too.
if (was_last && !forward) {
return disconnect(0, false);
}

return true;
}

void MultipleOutputsChannelElementBase::removeDisconnectedOutputs()
{
RTT::os::MutexLock lock(outputs_lock);
for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ) {
const Output &output = *it++;
if (output.disconnected) {
output.channel->disconnect(this, true);
removeOutput(output.channel.get()); // invalidates output
Outputs disconnectedOutputs;
{
RTT::os::MutexLock lock(outputs_lock);
for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ++it) {
if (it->disconnected)
disconnectedOutputs.splice(disconnectedOutputs.end(), this->outputs, it);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is invalid because removing element it from outputs in splice() invalidates the iterator, at least as an iterator of outputs. it would need to be incremented before the element is removed from this->outputs.

}
}

for (Outputs::iterator it = disconnectedOutputs.begin(); it != disconnectedOutputs.end(); ++it) {
it->channel->disconnect(this, true);
}
}

bool MultipleInputsMultipleOutputsChannelElementBase::connected()
Expand Down
Loading