Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FilterX: fix invalid LogMessage variables access #333

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion lib/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static void
log_driver_init_instance(LogDriver *self, GlobalConfig *cfg)
{
log_pipe_init_instance(&self->super, cfg);
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG;
self->super.free_fn = log_driver_free;
self->super.pre_init = log_driver_pre_init_method;
self->super.init = log_driver_init_method;
Expand Down
2 changes: 1 addition & 1 deletion lib/filter/filter-pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ log_filter_pipe_new(FilterExprNode *expr, GlobalConfig *cfg)
LogFilterPipe *self = g_new0(LogFilterPipe, 1);

log_pipe_init_instance(&self->super, cfg);
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG;
self->super.init = log_filter_pipe_init;
self->super.queue = log_filter_pipe_queue;
self->super.free_fn = log_filter_pipe_free;
Expand Down
5 changes: 4 additions & 1 deletion lib/filterx/expr-variable.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ _update_repr(FilterXExpr *s, FilterXObject *new_repr)
{
FilterXVariableExpr *self = (FilterXVariableExpr *) s;
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

FilterXVariable *variable = NULL;
gboolean success = filterx_scope_lookup_variable_without_validation(scope, self->handle, &variable);

g_assert(success);
g_assert(variable != NULL);
filterx_variable_set_value(variable, new_repr);
}
Expand Down
3 changes: 3 additions & 0 deletions lib/filterx/filterx-pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o
path_options = log_path_options_chain(&local_path_options, path_options);
filterx_eval_init_context(&eval_context, path_options->filterx_context);

if (filterx_scope_has_log_msg_changes(eval_context.scope))
filterx_scope_invalidate_log_msg_cache(eval_context.scope);

msg_trace(">>>>>> filterx rule evaluation begin",
evt_tag_str("rule", self->name),
log_pipe_location_tag(s),
Expand Down
66 changes: 57 additions & 9 deletions lib/filterx/filterx-scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ struct _FilterXScope
{
GAtomicCounter ref_cnt;
GArray *variables;
guint32 generation:20, write_protected, dirty, syncable;
guint32 generation:20, write_protected, dirty, syncable, log_msg_has_changes;
};

static gboolean
_lookup_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXVariable **v_slot)
gboolean
filterx_scope_lookup_variable_without_validation(FilterXScope *self, FilterXVariableHandle handle,
FilterXVariable **v_slot)
{
gint l, h, m;

Expand Down Expand Up @@ -138,6 +139,24 @@ _lookup_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXVariab
return FALSE;
}

void
filterx_scope_set_log_msg_has_changes(FilterXScope *self)
{
self->log_msg_has_changes = TRUE;
}

void
filterx_scope_clear_log_msg_has_changes(FilterXScope *self)
{
self->log_msg_has_changes = FALSE;
}

gboolean
filterx_scope_has_log_msg_changes(FilterXScope *self)
{
return self->log_msg_has_changes;
}

void
filterx_scope_set_dirty(FilterXScope *self)
{
Expand All @@ -163,15 +182,25 @@ filterx_scope_map_variable_to_handle(const gchar *name, FilterXVariableType type
return (FilterXVariableHandle) nv_handle | FILTERX_HANDLE_FLOATING_BIT;
}

static gboolean
filterx_scope_validate_variable(FilterXScope *self, FilterXVariable *variable)
{
if (filterx_variable_handle_is_floating(variable->handle) &&
!variable->declared && variable->generation != self->generation)
return FALSE;
if(!filterx_variable_handle_is_floating(variable->handle) && filterx_scope_has_log_msg_changes(self))
return FALSE;
return TRUE;
}

FilterXVariable *
filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle)
{
FilterXVariable *v;

if (_lookup_variable(self, handle, &v))
if (filterx_scope_lookup_variable_without_validation(self, handle, &v))
{
if (filterx_variable_handle_is_floating(handle) &&
!v->declared && v->generation != self->generation)
if (!filterx_scope_validate_variable(self, v))
return NULL;
return v;
}
Expand All @@ -185,7 +214,7 @@ _register_variable(FilterXScope *self,
{
FilterXVariable v, *v_slot;

if (_lookup_variable(self, handle, &v_slot))
if (filterx_scope_lookup_variable_without_validation(self, handle, &v_slot))
{
/* already present */
if (v_slot->generation != self->generation)
Expand Down Expand Up @@ -252,8 +281,7 @@ filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func,
if (!variable->value)
continue;

if (filterx_variable_handle_is_floating(variable->handle) &&
!variable->declared && variable->generation != self->generation)
if (!filterx_scope_validate_variable(self, variable))
continue;

if (!func(variable, user_data))
Expand Down Expand Up @@ -382,6 +410,7 @@ filterx_scope_clone(FilterXScope *other)
if (other->variables->len > 0)
self->dirty = other->dirty;
self->syncable = other->syncable;
self->log_msg_has_changes = other->log_msg_has_changes;
msg_trace("Filterx clone finished",
evt_tag_printf("scope", "%p", self),
evt_tag_printf("other", "%p", other),
Expand Down Expand Up @@ -436,3 +465,22 @@ filterx_scope_unref(FilterXScope *self)
if (self && (g_atomic_counter_dec_and_test(&self->ref_cnt)))
_free(self);
}

void
filterx_scope_invalidate_log_msg_cache(FilterXScope *self)
{
g_assert(filterx_scope_has_log_msg_changes(self));
gint i = 0;

while (i < self->variables->len)
{
FilterXVariable *v = &g_array_index(self->variables, FilterXVariable, i);

if (!filterx_variable_is_floating(v))
g_array_remove_index(self->variables, i);
else
i++;
}

filterx_scope_clear_log_msg_has_changes(self);
}
9 changes: 8 additions & 1 deletion lib/filterx/filterx-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ gboolean filterx_variable_is_set(FilterXVariable *v);
* upon the exit from the scope.
*
*/

typedef struct _FilterXScope FilterXScope;

typedef gboolean (*FilterXScopeForeachFunc)(FilterXVariable *variable, gpointer user_data);

void filterx_scope_set_log_msg_has_changes(FilterXScope *self);
void filterx_scope_clear_log_msg_has_changes(FilterXScope *self);
gboolean filterx_scope_has_log_msg_changes(FilterXScope *self);
void filterx_scope_set_dirty(FilterXScope *self);
gboolean filterx_scope_is_dirty(FilterXScope *self);
void filterx_scope_sync(FilterXScope *self, LogMessage *msg);
Expand All @@ -71,6 +75,9 @@ FilterXVariable *filterx_scope_register_declared_variable(FilterXScope *self,
FilterXVariableHandle handle,
FilterXObject *initial_value);
gboolean filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, gpointer user_data);
void filterx_scope_invalidate_log_msg_cache(FilterXScope *self);
gboolean filterx_scope_lookup_variable_without_validation(FilterXScope *self, FilterXVariableHandle handle,
FilterXVariable **v_slot);

/* copy on write */
void filterx_scope_write_protect(FilterXScope *self);
Expand All @@ -80,4 +87,4 @@ FilterXScope *filterx_scope_new(void);
FilterXScope *filterx_scope_ref(FilterXScope *self);
void filterx_scope_unref(FilterXScope *self);

#endif
#endif
5 changes: 5 additions & 0 deletions lib/logmsg/logmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ log_msg_make_writable(LogMessage **pself, const LogPathOptions *path_options)
log_msg_unref(*pself);
*pself = new;
}
if(path_options->filterx_context && path_options->filterx_context->scope)
{
filterx_scope_make_writable(&path_options->filterx_context->scope);
filterx_scope_set_log_msg_has_changes(path_options->filterx_context->scope);
}
return *pself;
}

Expand Down
10 changes: 5 additions & 5 deletions lib/logpipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
/* node created directly by the user */
#define PIF_CONFIG_RELATED 0x0100

/* sync filterx state and message in right before calling queue() */
#define PIF_SYNC_FILTERX 0x0200
/* sync filterx state to message in right before calling queue() */
#define PIF_SYNC_FILTERX_TO_MSG 0x0200

/* private flags range, to be used by other LogPipe instances for their own purposes */

Expand Down Expand Up @@ -222,8 +222,8 @@ struct _LogPathOptions
FilterXEvalContext *filterx_context;
};

#define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL }
#define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL }
#define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL, NULL}
#define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL, NULL }

/*
* Embed a step in our LogPathOptions chain.
Expand Down Expand Up @@ -459,7 +459,7 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
}
}

if ((s->flags & PIF_SYNC_FILTERX))
if ((s->flags & PIF_SYNC_FILTERX_TO_MSG))
filterx_eval_sync_message(path_options->filterx_context, &msg, path_options);

if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL | PIF_JUNCTION_END | PIF_CONDITIONAL_MIDPOINT)))
Expand Down
2 changes: 1 addition & 1 deletion lib/parser/parser-expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void
log_parser_init_instance(LogParser *self, GlobalConfig *cfg)
{
log_pipe_init_instance(&self->super, cfg);
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG;
self->super.init = log_parser_init_method;
self->super.deinit = log_parser_deinit_method;
self->super.free_fn = log_parser_free_method;
Expand Down
2 changes: 1 addition & 1 deletion lib/rewrite/rewrite-expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ log_rewrite_init_instance(LogRewrite *self, GlobalConfig *cfg)
{
log_pipe_init_instance(&self->super, cfg);
/* indicate that the rewrite rule is changing the message */
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX_TO_MSG;
self->super.free_fn = log_rewrite_free_method;
self->super.queue = log_rewrite_queue;
self->super.init = log_rewrite_init_method;
Expand Down
3 changes: 2 additions & 1 deletion modules/csvparser/tests/test_csvparser.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ ParameterizedTest(CsvParserTestParam *param, parser, test_csv_parser)
cr_assert(log_pipe_init(&pclone->super));

nvtable = nv_table_ref(logmsg->payload);
success = log_parser_process(pclone, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
success = log_parser_process(pclone, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
nv_table_unref(nvtable);

cr_assert_not((success && !param->expected_values[0]), "unexpected match; msg=%s\n", param->msg);
Expand Down
16 changes: 12 additions & 4 deletions modules/timestamp/tests/test_date.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ ParameterizedTest(struct date_params *params, date, test_date_parser)
GString *res = g_string_sized_new(128);

logmsg = _construct_logmsg(params->msg);
success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);

cr_assert(success, "unable to parse format=%s msg=%s", params->format, params->msg);

Expand All @@ -203,11 +204,13 @@ ParameterizedTest(struct date_params *params, date, test_date_parser)

Test(date, test_date_with_additional_text_at_the_end)
{
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
const gchar *msg = "2015-01-26T16:14:49+0300 Disappointing log file";

LogParser *parser = _construct_parser(NULL, NULL, LM_TS_STAMP);
LogMessage *logmsg = _construct_logmsg(msg);
gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL),
-1);

cr_assert_not(success, "successfully parsed but expected failure, msg=%s", msg);

Expand Down Expand Up @@ -247,8 +250,10 @@ ParameterizedTest(struct date_with_multiple_formats_params *params, date, test_d
date_parser_set_time_stamp(parser, LM_TS_STAMP);

LogMessage *logmsg = _construct_logmsg(params->msg);
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;

gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL),
-1);

cr_assert(success, "unable to parse msg=%s with a list of formats", params->msg);

Expand All @@ -269,7 +274,10 @@ Test(date, test_date_with_guess_timezone)
date_parser_process_flag(parser, "guess-timezone");

LogMessage *logmsg = _construct_logmsg(msg);
gboolean success = log_parser_process(parser, &logmsg, NULL, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL), -1);
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;

gboolean success = log_parser_process(parser, &logmsg, &path_options, log_msg_get_value(logmsg, LM_V_MESSAGE, NULL),
-1);

cr_assert(success, "failed to parse timestamp, msg=%s", msg);
append_format_unix_time(&logmsg->timestamps[LM_TS_STAMP], res, TS_FMT_ISO, -1, 0);
Expand Down
52 changes: 52 additions & 0 deletions tests/light/functional_tests/filterx/test_filterx.py
Original file line number Diff line number Diff line change
Expand Up @@ -2395,3 +2395,55 @@ def test_parse_leef(config, syslog_ng):
r"""}""" + "\n"
)
assert file_true.read_log() == exp


def test_proper_generation_counter(config, syslog_ng):
file_true = config.create_file_destination(file_name="dest-true.log", template="'$MSG\n'")
file_false = config.create_file_destination(file_name="dest-false.log", template="'$MSG\n'")

raw_conf = f"""
@version: {config.get_version()}

options {{ stats(level(1)); }};

source genmsg {{
example-msg-generator(num(1) template("dummy message") values("bar" => "bar_value"));
}};

destination dest_true {{
{render_statement(file_true)};
}};

destination dest_false {{
{render_statement(file_false)};
}};

log {{
source(genmsg);
if {{
filterx {{
$foo = "foovalue"; # Could have come from the log message as well, doesn't matter
$ISODATE; # Special case for macro resolution
unset(${{values.str}}); # Must come from the log message
}};

rewrite {{ set("almafa", value("foo")); }};
parser {{ date-parser(format("%Y-%m-%dT%H:%M:%S%z") template("2000-01-01T00:00:00+01:00")); }};
rewrite {{ set("kortefa", value("values.str")); }};

filterx {{
$MSG = {{"from_nvtable": $foo, "from_a_macro": $ISODATE, "unset_then_set": ${{values.str}} ?? "not found"}};
}};
destination(dest_true);
}} else {{
destination(dest_false);
}};
}};
"""
config.set_raw_config(raw_conf)

syslog_ng.start(config)

assert "processed" in file_true.get_stats()
assert file_true.get_stats()["processed"] == 1
assert file_true.read_log() == '{"from_nvtable":"almafa","from_a_macro":"2000-01-01T00:00:00+01:00","unset_then_set":"kortefa"}\n'
Loading