Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB]: Enhance queue consumption monitor alarm log with additional details #420

Merged
merged 46 commits into from
Oct 15, 2024

Conversation

alexander-e1off
Copy link
Collaborator

@alexander-e1off alexander-e1off commented Sep 13, 2024

When a queue starts to fill up, it is valuable to see information about which AppIds are impacted, and information about the messages in the queue.
Especially in the case of subscriptions (which we are enabling for everyone now), messages that match no subscription expression will build up in the put aside list.
To help make this situation clearer to operators and users (what apps are impacted, why are messages building up, how old is the head of the queue for each app, etc), we can log more information when the watermark alarm is triggered:

  • Sizes of put-aside list and redelivery list for each app for that queue;
  • Oldest message's timestamp in put aside list & its message properties;
  • Number of unconfirmed messages;
  • Total size of messages for each app (but seems it is already done by storage()->capacityMeter()->printShortSummary() );

This is to help debug why a message doesn't match a subscription.

Alarm log looks like this:

ERROR mqbblp_rootqueueengine.cpp:1765 ALARM [QUEUE_STUCK] Queue 'bmq://bmq.test.mem.fanout/my1?id=foo' Messages [current: 2 / 1,000], Bytes [current: 110  B / 1.00 MB], max idle time 20.00 s appears to be stuck. It currently has 1 consumers.
  1. bmqtool.tsk:199639@127.0.0.1~localhost:37430
    Handle Parameters .....: [ uri = "bmq://bmq.test.mem.fanout/my1" qId = 0 subIdInfo = NULL flags = 2 readCount = 1 writeCount = 0 adminCount = 0 ]
    Number of unconfirmed messages .....: 1
    UnconfirmedMonitors ....:
   STATE_NORMAL [Messages (STATE_NORMAL): 1 (820 - 820 - 1,024), Bytes (STATE_NORMAL): 55  B (25.60 MB - 25.60 MB - 32.00 MB)]
   STATE_NORMAL [Messages (STATE_NORMAL): 0 (820 - 820 - 1,024), Bytes (STATE_NORMAL): 0  B (25.60 MB - 25.60 MB - 32.00 MB)]

For appId: foo
Put aside list size: 1
Redelivery list size: 0
Number of messages: 2
Number of bytes: 110

Consumer subscription expressions:
x == 2
y == 4
y == 3
x == 1

Oldest message in a 'Put aside' list:
GUID                              Size        Timestamp (UTC)
4000000000033E07AD91F4D9C7E5D709       55  B  24SEP2024_09:09:06.777472+0000
Message Properties: [ sample_str (STRING) = "foo bar" x (INT32) = 10 ]

10 oldest messages in the queue:
Printing 2 message(s) [0-1 / 2] (total: 110  B)
       GUID                              Size        Timestamp (UTC)
    0: 4000000000033E07AD91F4D9C7E5D709       55  B  24SEP2024_09:09:06.777471+0000
    1: 4000010000344E57D5C6F4D9C7E5D709       55  B  24SEP2024_09:12:37.504283+0000

Current head of the queue:
GUID                              Size        Timestamp (UTC)
4000000000033E07AD91F4D9C7E5D709       55  B  24SEP2024_09:09:06.777471+0000

Implementation details:

  • Log alarm logic is moved from QueueConsumptionMonitor into RootQueueEngine class, where more data is available;
  • Callback is passed to QueueConsumptionMonitor and called in case of alarm to log alarm data;

alexander-e1off and others added 27 commits March 28, 2024 18:28
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Copy link
Collaborator

@dorjesinpo dorjesinpo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Few questions.

  • Maybe, we can print Storage::numMessages and Storage::numBytes as well?

  • Can we get rid of QueueEngineUtil_AppState::head() and QueueConsumptionMonitor::SubStreamInfo::d_headCb now?

  • Is QueueConsumptionMonitor::onTransitionToIdle "level triggered" (vs "edge triggered")? It can be noisy, how often we want that log?

@chrisbeard please take a look at the output

@alexander-e1off
Copy link
Collaborator Author

Regarding Maybe, we can print Storage::numMessages and Storage::numBytes as well?. CapacityMeter::printShortSummary() already prints numMessages and numBytes, e.g.

Messages [current: 2 / 1,000], Bytes [current: 66  B / 1.00 MB]

Aren't they the same as Storage::numMessages() and Storage::numBytes? In my test they printed the same values. Are there any possible scenarios when they will differ?

@alexander-e1off
Copy link
Collaborator Author

alexander-e1off commented Sep 16, 2024

Regarding Can we get rid of QueueEngineUtil_AppState::head() and QueueConsumptionMonitor::SubStreamInfo::d_headCb now? Now we print oldest message from put aside list, but what happens if we have empty put aside list and not empty redelivery list? For example, there are 4 messages, and I limited to print only 3 oldest messages. Without printing head we don't know what is the last message in queue:

ERROR mqbblp_rootqueueengine.cpp:1766 ALARM [QUEUE_STUCK] Queue 'bmq://bmq.test.mem.fanout/my1?id=foo' Messages [current: 4 / 1,000], Bytes [current: 44  B / 1.00 MB], max idle time 20.00 s appears to be stuck. It currently has 0 consumers.

Put aside list size: 0
Redelivery list size: 3

3 oldest messages in the queue:
Printing 3 message(s) [0-2 / 4] (total: 33  B)
       GUID                              Size        Timestamp (UTC)
    0: 400000000011896EDA51C2376057798E       11  B  17SEP2024_12:23:02.458416+0000
    1: 400001000012F52671B0C2376057798E       11  B  17SEP2024_12:23:08.559553+0000
    2: 4000020000132AB6F3D6C2376057798E       11  B  17SEP2024_12:23:09.458243+0000

Current head of the queue:
GUID                              Size        Timestamp (UTC)
40000300001A6876353BC2376057798E       11  B  17SEP2024_12:23:40.559572+0000

Is head info valuable in this scenario?

Regarding Is QueueConsumptionMonitor::onTransitionToIdle "level triggered" (vs "edge triggered")? - QueueConsumptionMonitor::onTransitionToIdle is edge triggered, so onTransitionToIdle is called only once when state is transitioned from Active to Idle.

@dorjesinpo
Copy link
Collaborator

QueueConsumptionMonitor::onTransitionToIdle is edge triggered, so onTransitionToIdle is called only once when state is transitioned from Active to Idle.

Ok. This can flap, we often see a lot of subsequent logs. We are increasing the size of what's logged, so we may want to throttle the logging.

alexander-e1off and others added 7 commits September 23, 2024 06:21
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
@alexander-e1off alexander-e1off marked this pull request as ready for review September 24, 2024 14:49
@alexander-e1off alexander-e1off requested a review from a team as a code owner September 24, 2024 14:49
Copy link
Collaborator

@dorjesinpo dorjesinpo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a possibility to reduce mqbblp_queueconsumptionmonitor.t dependencies. Let's explore it.

@@ -503,19 +498,7 @@ TEST_F(Test, putAliveIdleWithConsumer)
ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords);
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\].*It currently has 2 consumers",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, there is no "test consumer 1/2" log because it is outside of the QueueConsumptionMonitor now.
That is fine.
Question, maybe we do not need d_consumer1/2 now?
Maybe, the Test can shrink now since the state of monitored objects is factored out by the Test::LoggingCb?
If that is the case, we can remove Test::d_queue, Test::d_queueState, Test::d_domain, Test::d_cluster, and Test::createClient. Possibly, d_storage as well, if Test::putMessage() changes the state of Test instead of the Test::d_storage (is that Test::d_advance?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the test by simplifying logic and removing consumers, but still need Test::d_queueState and its dependencies, because it is required for d_monitor.

BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

// Construct AppId from appKey
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace the App lookup code with
Apps::const_iterator cItApp = d_apps.findByKey2(AppKeyCount(appKey, 0));

AppKeyCount is an (hopefully, not for too long) artefact of our transitioning from non-CSL to CSL way of registering Apps. In short, only registered Apps are supposed to have storage and consumption monitoring. And registered Apps should have 0 as the "count" .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced as proposed.

bdlma::LocalSequentialAllocator<4096> localAllocator(d_allocator_p);

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d_queueState_p->uri()

<< " consumers." << ss.str() << '\n';

// Log un-delivered messages info
mqbi::Storage* const storage = d_queueState_p->storage();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use storage or d_queueState_p->storage() everywhere?

alexander-e1off and others added 6 commits September 30, 2024 17:45
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
QueueConsumptionMonitor::State::e_ALIVE);
ASSERT_EQ(logObserver.records().size(), expectedLogRecords);
}

TEST_F(Test, logFormat)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test originally tested the log format, but now logging is done by the callback, so this test makes no sense.

@@ -727,122 +497,6 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams)
ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE);
}

TEST_F(Test, putAliveIdleSendAliveTwoSubstreamsTwoConsumers)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since consumers are removed, this test becomes the same as above, remove it.

dorjesinpo
dorjesinpo previously approved these changes Oct 14, 2024
Copy link
Collaborator

@dorjesinpo dorjesinpo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

Copy link
Collaborator

@678098 678098 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

/// `enableLog` is `true` it logs alarm data. Return `true` if there are
/// un-delivered messages and `false` otherwise.
bool logAlarmCb(const mqbu::StorageKey& appKey,
const bool enableLog) const;
Copy link
Collaborator

@678098 678098 Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const bool enableLog) const;
bool enableLog) const;

Not necessary to qualify as const for a basic type in arguments

@@ -229,6 +215,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));
BSLS_ASSERT_SAFE(d_loggingCb);
Copy link
Collaborator

@678098 678098 Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
BSLS_ASSERT_SAFE(d_loggingCb);

Might remove this precondition since we set this up in the only constructor and check it right there already. We don't have setters or other way to change d_loggingCb to an invalid value

@@ -210,8 +210,7 @@ class QueueConsumptionMonitor {
static const char* toAscii(Transition::Enum value);
};

typedef bsl::function<bslma::ManagedPtr<mqbi::StorageIterator>(void)>
HeadCb;
typedef bsl::function<bool(const mqbu::StorageKey&, bool)> LoggingCb;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to explain here what the first and the second args mean

@@ -220,8 +219,7 @@ class QueueConsumptionMonitor {
struct SubStreamInfo {
// CREATORS

SubStreamInfo(const HeadCb& headCb);
SubStreamInfo(const SubStreamInfo& other);
SubStreamInfo();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good simplification

Comment on lines 281 to 282
/// Update the specified 'subStreamInfo', associated to the specified
/// 'appKey', and write log, upon transition to alive state.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Update the specified 'subStreamInfo', associated to the specified
/// 'appKey', and write log, upon transition to alive state.
/// Update the specified `subStreamInfo`, associated to the specified
/// `appKey`, and write log, upon transition to alive state.

int idx = 1;
int numConsumers = 0;

QueueEngineUtil_AppState::Consumers& consumers = app->consumers();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
QueueEngineUtil_AppState::Consumers& consumers = app->consumers();
const QueueEngineUtil_AppState::Consumers& consumers = app->consumers();

Comment on lines 1741 to 1742
out << k_EXPR_NUM_LIMIT << " of "
<< " consumer subscription expressions: ";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
out << k_EXPR_NUM_LIMIT << " of "
<< " consumer subscription expressions: ";
out << k_EXPR_NUM_LIMIT << " of "
<< "consumer subscription expressions: ";

Double space here, also, from the log itself it's not clear that not all of the existing expressions were printed due to limit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added First word in the beginning for clarity, e.g. First 50 of consumer subscription expressions:

app->putAsideList().first());
if (rc == mqbi::StorageResult::e_SUCCESS) {
// Log timestamp
out << "Oldest message in a 'Put aside' list:\n";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
out << "Oldest message in a 'Put aside' list:\n";
out << "Oldest message in the 'Put aside' list:\n";

Since we are logging info about concrete queue

Comment on lines +1777 to +1778
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
out << "Message Properties: Failed to acquire [rc: " << rc << "]\n";

Do we want to print this in the alarm record itself?

Comment on lines +1782 to +1783
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->putAsideList().first() << ", rc = " << rc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->putAsideList().first() << ", rc = " << rc;
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->putAsideList().first() << ", rc = " << rc;
out << "'Put aside' list: Failed to acquire [rc: " << rc << "]\n";

Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
@alexander-e1off
Copy link
Collaborator Author

@678098 thank you, applied your suggestions.

Signed-off-by: Aleksandr Ivanov <aivanov71@bloomberg.net>
Copy link
Collaborator

@678098 678098 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@alexander-e1off alexander-e1off merged commit 71b4dab into bloomberg:main Oct 15, 2024
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants