Skip to content

Commit

Permalink
Merge pull request axoflow#310 from MrAnno/fix-otlp-multiworker-iw
Browse files Browse the repository at this point in the history
otel: fix multi-worker window and fetch-limit calculation
  • Loading branch information
alltilla authored Sep 26, 2024
2 parents 16909c1 + 9ae0439 commit faa4cc0
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
11 changes: 9 additions & 2 deletions lib/logthrsource/logthrsourcedrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,15 @@ log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions *opti

void
log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions *options, GlobalConfig *cfg,
const gchar *group_name)
const gchar *group_name, gint num_workers)
{
if (options->super.init_window_size == -1)
{
options->super.init_window_size = 100 * num_workers;
}

options->super.init_window_size /= num_workers;

log_source_options_init(&options->super, cfg, group_name);
msg_format_options_init(&options->parse_options, cfg);
}
Expand Down Expand Up @@ -285,7 +292,7 @@ _init_workers(LogThreadedSourceDriver *self)

GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super);

log_threaded_source_worker_options_init(&self->worker_options, cfg, self->super.super.group);
log_threaded_source_worker_options_init(&self->worker_options, cfg, self->super.super.group, self->num_workers);

for (size_t i = 0; i < self->num_workers; i++)
{
Expand Down
2 changes: 1 addition & 1 deletion lib/logthrsource/logthrsourcedrv.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct _LogThreadedSourceDriver

void log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions *options);
void log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions *options, GlobalConfig *cfg,
const gchar *group_name);
const gchar *group_name, gint num_workers);
void log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions *options);

void log_threaded_source_driver_set_transport_name(LogThreadedSourceDriver *self, const gchar *transport_name);
Expand Down
9 changes: 6 additions & 3 deletions modules/grpc/otel/otel-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ syslogng::grpc::otel::SourceDriver::init()

msg_info("OpenTelemetry server accepting connections", evt_tag_int("port", port));

super->super.worker_options.super.init_window_size /= super->super.num_workers;

if (fetch_limit == -1)
fetch_limit = super->super.worker_options.super.init_window_size;
{
if (super->super.worker_options.super.init_window_size != -1)
fetch_limit = super->super.worker_options.super.init_window_size / super->super.num_workers;
else
fetch_limit = 100;
}

/*
* syslog-ng-otlp(): the original HOST is always kept
Expand Down
1 change: 1 addition & 0 deletions news/bugfix-310.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry()`, `axosyslog-otlp()` sources: fix crash when `workers()` is set to `> 1`

0 comments on commit faa4cc0

Please sign in to comment.