Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB]: Enhance queue consumption monitor alarm log with additional details #420

Merged
merged 46 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f02419e
Fix macro redefinition warnings, add bmqstoragetool.td target
alexander-e1off Mar 28, 2024
122344f
Merge branch 'bloomberg:main' into main
alexander-e1off May 31, 2024
9e23230
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 6, 2024
9117ac7
Fix order of objects creation in unit test
alexander-e1off Jun 6, 2024
87cdf0b
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 7, 2024
8e88dc2
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 12, 2024
6dfd1fa
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 14, 2024
4ba1056
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 18, 2024
dcae5bf
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 27, 2024
7482e16
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 28, 2024
81a0854
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 28, 2024
2e29ae8
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 1, 2024
924c207
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 2, 2024
f279efd
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 15, 2024
8142686
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 23, 2024
2b5eaad
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 30, 2024
917e8a1
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 7, 2024
b1f9170
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 20, 2024
9bf33e6
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 29, 2024
66e3001
Merge branch 'bloomberg:main' into main
alexander-e1off Sep 6, 2024
3c6651f
Enhance queue alarm log with subscriptions info
alexander-e1off Sep 11, 2024
69da6c1
Debug message properties
alexander-e1off Sep 12, 2024
493b459
Add logging of oldest message in put aside queue and its properties
alexander-e1off Sep 12, 2024
b972874
Fix mqbblp_queueconsumptionmonitor.t
alexander-e1off Sep 13, 2024
763563f
Rename alarm label
alexander-e1off Sep 13, 2024
d092e76
Remove CI debug
alexander-e1off Sep 13, 2024
d2a5164
Cleanup
alexander-e1off Sep 13, 2024
e54b1b8
Fix subscription expressions printing
alexander-e1off Sep 17, 2024
070baec
Add printing of numMessages/numBytes per appId
alexander-e1off Sep 18, 2024
d4cf81d
Get rid of headCb
alexander-e1off Sep 23, 2024
1982cc2
Fix UT
alexander-e1off Sep 23, 2024
76744df
Cleanup
alexander-e1off Sep 24, 2024
a95ed92
Fix Solaris build
alexander-e1off Sep 24, 2024
265b77c
Merge branch 'bloomberg:main' into enhance-alarm-log
alexander-e1off Sep 24, 2024
b29a25c
Add test_alarms_subscription_mismatch
alexander-e1off Sep 24, 2024
4e11d2d
Cleanup
alexander-e1off Sep 24, 2024
6f7238f
Merge branch 'bloomberg:main' into enhance-alarm-log
alexander-e1off Sep 27, 2024
1616eb0
Fix review comments
alexander-e1off Sep 30, 2024
85a3eef
Refactor mqbblp_queueconsumptionmonitor.t
alexander-e1off Oct 11, 2024
88cd3df
Print appId in queue uri
alexander-e1off Oct 11, 2024
c6c332e
Merge branch 'bloomberg:main' into main
alexander-e1off Oct 11, 2024
f853ebf
Merge from main, fix merge conflicts
alexander-e1off Oct 14, 2024
24d2027
Fix code formatting
alexander-e1off Oct 14, 2024
e1b55b8
Fix review comments
alexander-e1off Oct 15, 2024
047955c
Merge branch 'main' into enhance-alarm-log
alexander-e1off Oct 15, 2024
b4e100f
Fix failed IT
alexander-e1off Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 6 additions & 113 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@
#include <mqbscm_version.h>
// MBQ
#include <mqbblp_queuehandlecatalog.h>
#include <mqbcmd_humanprinter.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storage.h>
#include <mqbs_storageprintutil.h>
#include <mqbu_capacitymeter.h>

// BMQ
#include <bmqp_ctrlmsg_messages.h>
#include <bmqt_queueflags.h>
#include <bmqt_uri.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwcu_memoutstream.h>
#include <mwcu_printutil.h>

Expand Down Expand Up @@ -150,14 +146,17 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(

// CREATORS
QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
bslma::Allocator* allocator)
: d_queueState_p(queueState)
, d_maxIdleTime(0)
, d_currentTimer(0)
, d_subStreamInfos(allocator)
, d_loggingCb(loggingCb)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p);
BSLS_ASSERT_SAFE(d_loggingCb);
}

// MANIPULATORS
Expand Down Expand Up @@ -331,118 +330,12 @@ void QueueConsumptionMonitor::onTransitionToIdle(
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));
BSLS_ASSERT_SAFE(d_loggingCb);

subStreamInfo->d_state = State::e_IDLE;

bdlma::LocalSequentialAllocator<2048> localAllocator(0);
bsl::vector<mqbi::QueueHandle*> handles(&localAllocator);
d_queueState_p->handleCatalog().loadHandles(&handles);

bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
bsl::string appId;

if (appKey.isNull()) {
appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID;
}
else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
uriBuilder.setId(appId);
}

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);

mwcu::MemOutStream ss(&localAllocator);

int idx = 1;
int numConsumers = 0;

const bool isFanoutValue =
d_queueState_p->queue()->hasMultipleSubStreams();

for (bsl::vector<mqbi::QueueHandle*>::const_iterator it = handles.begin(),
last = handles.end();
it != last;
++it) {
const mqbi::QueueHandle::SubStreams& subStreamInfos =
(*it)->subStreamInfos();

for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter =
subStreamInfos.begin();
infoCiter != subStreamInfos.end();
++infoCiter) {
const bsl::string& itemAppId = infoCiter->first;

bool isReader = !isFanoutValue &&
bmqt::QueueFlagsUtil::isReader(
(*it)->handleParameters().flags());
// Non-fanout mode consumer in the default subStream ?
isReader |= isFanoutValue && !itemAppId.empty();

if (!isReader) {
continue; // CONTINUE
}

if (itemAppId != appId) {
continue; // CONTINUE
}

numConsumers += infoCiter->second.d_counts.d_readCount;

const int level = 2, spacesPerLevel = 2;

ss << "\n " << idx++ << ". " << (*it)->client()->description()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Handle Parameters .....: " << (*it)->handleParameters()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "UnconfirmedMonitors ....:";

const bsl::vector<const mqbu::ResourceUsageMonitor*> monitors =
(*it)->unconfirmedMonitors(appId);
for (size_t i = 0; i < monitors.size(); ++i) {
ss << "\n " << monitors[i];
}
}
}

mwcu::MemOutStream out;
out << "Queue '" << uri << "' ";
d_queueState_p->storage()->capacityMeter()->printShortSummary(out);
out << ", max idle time "
<< mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime)
<< " appears to be stuck. It currently has " << numConsumers
<< " consumers." << ss.str() << "\n";

// Print the 10 oldest messages in the queue
static const int k_NUM_MSGS = 10;
const int level = 0, spacesPerLevel = 2;

out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< k_NUM_MSGS << " oldest messages in the queue:\n";

mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(),
appId,
0,
k_NUM_MSGS,
d_queueState_p->storage());
mqbcmd::HumanPrinter::print(out, result);

if (!head) {
return; // RETURN
}

// Print the current head of the queue
mqbi::Storage* const storage = d_queueState_p->storage();
out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Current head of the queue:\n";

mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head);

mqbcmd::HumanPrinter::print(out, result);
out << "\n";

MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR")
<< out.str() << MWCTSK_ALARMLOG_END;
// Call logging callback to log alarm info.
d_loggingCb(appKey, head);
}

} // close package namespace
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ class QueueConsumptionMonitor {
typedef bsl::function<bslma::ManagedPtr<mqbi::StorageIterator>(void)>
HeadCb;

typedef bsl::function<void(
const mqbu::StorageKey& appKey,
const bslma::ManagedPtr<mqbi::StorageIterator>& head)>
LoggingCb;

private:
// PRIVATE TYPES

Expand Down Expand Up @@ -265,6 +270,9 @@ class QueueConsumptionMonitor {

SubStreamInfoMap d_subStreamInfos;

LoggingCb d_loggingCb;
// Callback to log alarm info.

// NOT IMPLEMENTED
QueueConsumptionMonitor(const QueueConsumptionMonitor&) BSLS_CPP11_DELETED;
QueueConsumptionMonitor&
Expand Down Expand Up @@ -306,6 +314,7 @@ class QueueConsumptionMonitor {
/// `basicAllocator` to supply memory. If `basicAllocator` is 0, the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// `basicAllocator` to supply memory. If `basicAllocator` is 0, the
/// `allocator` to supply memory. If `allocator` is 0, the

And also need to mention loggingCb

/// currently installed default allocator is used.
QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
bslma::Allocator* allocator);

// MANIPULATORS
Expand Down
61 changes: 18 additions & 43 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <bmqt_queueflags.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwctst_scopedlogobserver.h>
#include <mwctst_testhelper.h>
#include <mwcu_memoutstream.h>
Expand Down Expand Up @@ -93,6 +94,17 @@ ClientContext::~ClientContext()
// NOTHING
}

static void loggingCb(BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& appKey,
BSLS_ANNOTATION_UNUSED const
bslma::ManagedPtr<mqbi::StorageIterator>& head)
{
BALL_LOG_SET_CATEGORY("MQBBLP.QUEUECONSUMPTIONMONITORTEST");

mwcu::MemOutStream out(s_allocator_p);
out << "Test Alarm";
MWCTSK_ALARMLOG_ALARM("QUEUE_STUCK") << out.str() << MWCTSK_ALARMLOG_END;
}

struct Test : mwctst::Test {
typedef bsl::vector<
bsl::pair<mqbi::QueueHandle*, bmqp_ctrlmsg::QueueHandleParameters> >
Expand Down Expand Up @@ -152,7 +164,7 @@ Test::Test()
d_partitionId,
&d_domain,
s_allocator_p)
, d_monitor(&d_queueState, s_allocator_p)
, d_monitor(&d_queueState, &loggingCb, s_allocator_p)
, d_storage(d_queue.uri(),
mqbu::StorageKey::k_NULL_KEY,
mqbs::DataStore::k_INVALID_PARTITION_ID,
Expand Down Expand Up @@ -386,11 +398,7 @@ TEST_F(Test, logFormat)
ASSERT_EQ(logObserver.records().size(), 1u);
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"Queue '.*'",
"ALARM \\[QUEUE_STUCK\\]",
s_allocator_p));
}

Expand Down Expand Up @@ -440,11 +448,7 @@ TEST_F(Test, putAliveIdleSendAlive)
ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords);
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"0 consumers",
"ALARM \\[QUEUE_STUCK\\]",
s_allocator_p));

d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2);
Expand Down Expand Up @@ -503,19 +507,7 @@ TEST_F(Test, putAliveIdleWithConsumer)
ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords);
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\].*It currently has 2 consumers",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, there is no "test consumer 1/2" log because it is outside of the QueueConsumptionMonitor now.
That is fine.
Question, maybe we do not need d_consumer1/2 now?
Maybe, the Test can shrink now since the state of monitored objects is factored out by the Test::LoggingCb?
If that is the case, we can remove Test::d_queue, Test::d_queueState, Test::d_domain, Test::d_cluster, and Test::createClient. Possibly, d_storage as well, if Test::putMessage() changes the state of Test instead of the Test::d_storage (is that Test::d_advance?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the test by simplifying logic and removing consumers, but still need Test::d_queueState and its dependencies, because it is required for d_monitor.

s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test consumer 1",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test consumer 2",
s_allocator_p));
ASSERT(!mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test producer",
"ALARM \\[QUEUE_STUCK\\]",
s_allocator_p));
}

Expand Down Expand Up @@ -689,11 +681,7 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams)
for (int i = 0; i < 2; ++i) {
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().rbegin()[i],
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().rbegin()[i],
"0 consumers",
"ALARM \\[QUEUE_STUCK\\]",
s_allocator_p));
}

Expand Down Expand Up @@ -797,21 +785,8 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreamsTwoConsumers)
++iter) {
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
*iter,
"ALARM \\[QUEUE_CONSUMER_MONITOR\\] Queue "
"'bmq://bmq.test.local/test_queue\\?id=app\\d'",
s_allocator_p));
ASSERT(
mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter,
"1 consumers",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
*iter,
"test consumer \\d",
"ALARM \\[QUEUE_STUCK\\]",
s_allocator_p));
ASSERT(
!mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter,
"test producer",
s_allocator_p));
}

d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2);
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ struct QueueEngineUtil_AppState {
// ACCESSORS
size_t redeliveryListSize() const;

size_t putAsideListSize() const;

Routers::Consumer* findQueueHandleContext(mqbi::QueueHandle* handle);

unsigned int upstreamSubQueueId() const;
Expand Down Expand Up @@ -695,6 +697,11 @@ inline size_t QueueEngineUtil_AppState::redeliveryListSize() const
return d_redeliveryList.size();
}

inline size_t QueueEngineUtil_AppState::putAsideListSize() const
{
return d_putAsideList.size();
}

inline Routers::Consumer*
QueueEngineUtil_AppState::findQueueHandleContext(mqbi::QueueHandle* handle)
{
Expand Down
Loading
Loading