Skip to content

Commit

Permalink
debug hung consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
laxmanchekka committed Oct 17, 2024
1 parent 48318b6 commit 2d78650
Showing 1 changed file with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@ThreadSafe
public class RealtimeTableDataManager extends BaseTableDataManager {
private static final Logger LOG = LoggerFactory.getLogger(RealtimeTableDataManager.class);

private SegmentBuildTimeLeaseExtender _leaseExtender;
private RealtimeSegmentStatsHistory _statsHistory;
private final Semaphore _segmentBuildSemaphore;
Expand Down Expand Up @@ -481,7 +485,8 @@ private void doAddConsumingSegment(String segmentName)
// Generates only one semaphore for every partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new Semaphore(1));
Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId,
k -> new TrackableBooleanSemaphore(partitionGroupId, true));

// Create the segment data manager and register it
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
Expand Down Expand Up @@ -697,4 +702,62 @@ private void validate(TableConfig tableConfig, Schema schema) {
// 2. Validate the schema itself
SchemaUtils.validate(schema);
}

private static class TrackableBooleanSemaphore extends Semaphore {
private final int _partitionGroupId;
private volatile String _owner;
private static final int SEMAPHORE_LOCK_WAIT_SECONDS;

static {
SEMAPHORE_LOCK_WAIT_SECONDS = Integer.parseInt(System.getProperty("semaphoreLockWaitSeconds", "60"));
LOG.info("semaphoreLockWaitSeconds: {}", SEMAPHORE_LOCK_WAIT_SECONDS);
}

public TrackableBooleanSemaphore(int partitionGroupId, boolean fair) {
super(1, fair);
_partitionGroupId = partitionGroupId;
}

@Override
public void acquire()
throws InterruptedException {
boolean acquired = tryAcquireWithinLimitedTime();
// Acquire only when first attempt is not successful
if (!acquired) {
super.acquire();
}
_owner = Thread.currentThread().getName();
LOG.debug("semaphore acquired. semaphore: [{}]", this);
}

private boolean tryAcquireWithinLimitedTime() {
boolean acquired = false;
try {
acquired = tryAcquire(SEMAPHORE_LOCK_WAIT_SECONDS, TimeUnit.SECONDS);
if (!acquired) {
LOG.warn("failed to acquire in limited time. semaphore: [{}]", this);
}
} catch (InterruptedException e) {
// Need to maintain the same behavior as existing.
// So, resetting the interrupt flag and propagating to the caller
Thread.currentThread().interrupt();
}
return acquired;
}

@Override
public void release() {
super.release();
if (availablePermits() > 0) {
_owner = null;
}
LOG.debug("semaphore released. sem``aphore: [{}]", this);
}

@Override
public String toString() {
return "Semaphore@" + Integer.toHexString(hashCode()) + "[_partitionGroupId=" + _partitionGroupId + ", _owner='"
+ _owner + "', available permits=" + availablePermits() + ']';
}
}
}

0 comments on commit 2d78650

Please sign in to comment.