Skip to content

Commit

Permalink
grpc/schema: factor out schema related logic from bigquery
Browse files Browse the repository at this point in the history
In order to opt-in for schema support, a derived
class must:

 * include grpc_dest_schema_option in the grammar
 * implement the string -> descriptor type mapping
 * store and init a Schema instance in the driver
 * implement the get_schema() method to return
   the stored Schema instance

The derived class can interface with the Schema
by these functions:
  * init()
  * empty()
  * format()
  * get_schema_descriptor()

Signed-off-by: Attila Szakacs <attila.szakacs@axoflow.com>
  • Loading branch information
alltilla committed Oct 12, 2024
1 parent a15a2e0 commit 7072ae4
Show file tree
Hide file tree
Showing 13 changed files with 562 additions and 456 deletions.
237 changes: 25 additions & 212 deletions modules/grpc/bigquery/bigquery-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,128 +31,61 @@
#include "messages.h"
#include "compat/cpp-end.h"

#include <absl/strings/string_view.h>

#include <cstring>
#include <strings.h>

using syslogng::grpc::bigquery::DestinationDriver;

static void
_template_unref(gpointer data)
{
LogTemplate *tpl = (LogTemplate *) data;
log_template_unref(tpl);
}

namespace {
class ErrorCollector : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
ErrorCollector() {}
~ErrorCollector() override {}

// override is missing for compatibility with older protobuf versions
void RecordError(absl::string_view filename, int line, int column, absl::string_view message)
{
std::string file{filename};
std::string msg{message};

msg_error("Error parsing protobuf-schema() file",
evt_tag_str("filename", file.c_str()), evt_tag_int("line", line), evt_tag_int("column", column),
evt_tag_str("error", msg.c_str()));
}

// override is missing for compatibility with older protobuf versions
void RecordWarning(absl::string_view filename, int line, int column, absl::string_view message)
{
std::string file{filename};
std::string msg{message};

msg_error("Warning during parsing protobuf-schema() file",
evt_tag_str("filename", file.c_str()), evt_tag_int("line", line), evt_tag_int("column", column),
evt_tag_str("warning", msg.c_str()));
}

private:
/* deprecated interface */
void AddError(const std::string &filename, int line, int column, const std::string &message)
{
this->RecordError(filename, line, column, message);
}

void AddWarning(const std::string &filename, int line, int column, const std::string &message)
{
this->RecordWarning(filename, line, column, message);
}
};
}

DestinationDriver::DestinationDriver(GrpcDestDriver *s)
: syslogng::grpc::DestDriver(s)
: syslogng::grpc::DestDriver(s),
schema(2, "bigquery_record.proto", "BigQueryRecord", map_schema_type,
&this->template_options, &this->super->super.super.super.super)
{
this->url = "bigquerystorage.googleapis.com";
this->credentials_builder.set_mode(GCAM_ADC);
}

DestinationDriver::~DestinationDriver()
{
g_list_free_full(this->protobuf_schema.values, _template_unref);
}

bool
DestinationDriver::add_field(std::string name, std::string type, LogTemplate *value)
DestinationDriver::map_schema_type(const std::string &type_in, google::protobuf::FieldDescriptorProto::Type &type_out)
{
/* https://cloud.google.com/bigquery/docs/write-api#data_type_conversions */

google::protobuf::FieldDescriptorProto::Type proto_type;
const char *type_str = type.c_str();
if (type.empty() || strcasecmp(type_str, "STRING") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
const char *type_str = type_in.c_str();
if (type_in.empty() || strcasecmp(type_str, "STRING") == 0)
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "BYTES") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_BYTES;
type_out = google::protobuf::FieldDescriptorProto::TYPE_BYTES;
else if (strcasecmp(type_str, "INTEGER") == 0 || strcasecmp(type_str, "INT64") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "FLOAT") == 0 || strcasecmp(type_str, "FLOAT64") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_DOUBLE;
type_out = google::protobuf::FieldDescriptorProto::TYPE_DOUBLE;
else if (strcasecmp(type_str, "BOOLEAN") == 0 || strcasecmp(type_str, "BOOL") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_BOOL;
type_out = google::protobuf::FieldDescriptorProto::TYPE_BOOL;
else if (strcasecmp(type_str, "TIMESTAMP") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "DATE") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT32;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT32;
else if (strcasecmp(type_str, "TIME") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "DATETIME") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "JSON") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "NUMERIC") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "BIGNUMERIC") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "GEOGRAPHY") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "RECORD") == 0 || strcasecmp(type_str, "STRUCT") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_MESSAGE;
type_out = google::protobuf::FieldDescriptorProto::TYPE_MESSAGE;
else if (strcasecmp(type_str, "INTERVAL") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else
return false;

this->fields.push_back(Field{name, proto_type, value});

return true;
}

void
DestinationDriver::set_protobuf_schema(std::string proto_path, GList *values)
{
this->protobuf_schema.proto_path = proto_path;

g_list_free_full(this->protobuf_schema.values, _template_unref);
this->protobuf_schema.values = values;
}

bool
DestinationDriver::init()
{
Expand All @@ -164,15 +97,10 @@ DestinationDriver::init()
return false;
}

if (this->protobuf_schema.proto_path.empty())
this->construct_schema_prototype();
else
{
if (!this->protobuf_schema.loaded && !this->load_protobuf_schema())
return false;
}
if (!this->schema.init())
return false;

if (this->fields.size() == 0)
if (this->schema.empty())
{
msg_error("Error initializing BigQuery destination, schema() or protobuf-schema() is empty",
log_pipe_location_tag(&this->super->super.super.super.super));
Expand Down Expand Up @@ -224,105 +152,6 @@ DestinationDriver::construct_worker(int worker_index)
return &worker->super;
}

void
DestinationDriver::construct_schema_prototype()
{
this->msg_factory = std::make_unique<google::protobuf::DynamicMessageFactory>();
this->descriptor_pool.~DescriptorPool();
new (&this->descriptor_pool) google::protobuf::DescriptorPool();

google::protobuf::FileDescriptorProto file_descriptor_proto;
file_descriptor_proto.set_name("bigquery_record.proto");
file_descriptor_proto.set_syntax("proto2");
google::protobuf::DescriptorProto *descriptor_proto = file_descriptor_proto.add_message_type();
descriptor_proto->set_name("BigQueryRecord");

int32_t num = 1;
for (auto &field : this->fields)
{
google::protobuf::FieldDescriptorProto *field_desc_proto = descriptor_proto->add_field();
field_desc_proto->set_name(field.nv.name);
field_desc_proto->set_type(field.type);
field_desc_proto->set_number(num++);
}


const google::protobuf::FileDescriptor *file_descriptor = this->descriptor_pool.BuildFile(file_descriptor_proto);
this->schema_descriptor = file_descriptor->message_type(0);

for (int i = 0; i < this->schema_descriptor->field_count(); ++i)
{
this->fields[i].field_desc = this->schema_descriptor->field(i);
}

this->schema_prototype = this->msg_factory->GetPrototype(this->schema_descriptor);
}

bool
DestinationDriver::load_protobuf_schema()
{
this->protobuf_schema.loaded = false;
this->msg_factory = std::make_unique<google::protobuf::DynamicMessageFactory>();
this->protobuf_schema.importer.reset(nullptr);

this->protobuf_schema.src_tree = std::make_unique<google::protobuf::compiler::DiskSourceTree>();
this->protobuf_schema.src_tree->MapPath(this->protobuf_schema.proto_path, this->protobuf_schema.proto_path);

this->protobuf_schema.error_coll = std::make_unique<ErrorCollector>();

this->protobuf_schema.importer =
std::make_unique<google::protobuf::compiler::Importer>(this->protobuf_schema.src_tree.get(),
this->protobuf_schema.error_coll.get());

const google::protobuf::FileDescriptor *file_descriptor =
this->protobuf_schema.importer->Import(this->protobuf_schema.proto_path);

if (!file_descriptor || file_descriptor->message_type_count() == 0)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file can't be loaded",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

this->schema_descriptor = file_descriptor->message_type(0);

this->fields.clear();

GList *current_value = this->protobuf_schema.values;
for (int i = 0; i < this->schema_descriptor->field_count(); ++i)
{
auto field = this->schema_descriptor->field(i);

if (!current_value)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file has more fields than "
"values listed in the config",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

LogTemplate *value = (LogTemplate *) current_value->data;

this->fields.push_back(Field{field->name(), (google::protobuf::FieldDescriptorProto::Type) field->type(), value});
this->fields[i].field_desc = field;

current_value = current_value->next;
}

if (current_value)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file has less fields than "
"values listed in the config",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}


this->schema_prototype = this->msg_factory->GetPrototype(this->schema_descriptor);
this->protobuf_schema.loaded = true;
return true;
}


/* C Wrappers */

Expand Down Expand Up @@ -355,22 +184,6 @@ void bigquery_dd_set_table(LogDriver *d, const gchar *table)
cpp->set_table(table);
}

gboolean
bigquery_dd_add_field(LogDriver *d, const gchar *name, const gchar *type, LogTemplate *value)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
DestinationDriver *cpp = bigquery_dd_get_cpp(self);
return cpp->add_field(name, type ? type : "", value);
}

void
bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *values)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
DestinationDriver *cpp = bigquery_dd_get_cpp(self);
cpp->set_protobuf_schema(proto_path, values);
}

LogDriver *
bigquery_dd_new(GlobalConfig *cfg)
{
Expand Down
65 changes: 8 additions & 57 deletions modules/grpc/bigquery/bigquery-dest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,21 @@
#include "stats/stats-cluster-key-builder.h"
#include "compat/cpp-end.h"

#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/message.h>
#include <google/protobuf/compiler/importer.h>

#include <string>
#include <memory>
#include <vector>

namespace syslogng {
namespace grpc {
namespace bigquery {

struct Field
{
NameValueTemplatePair nv;
google::protobuf::FieldDescriptorProto::Type type;
const google::protobuf::FieldDescriptor *field_desc;

Field(std::string name_, google::protobuf::FieldDescriptorProto::Type type_, LogTemplate *value_)
: nv(name_, value_), type(type_), field_desc(nullptr) {}

Field(const Field &a)
: nv(a.nv), type(a.type), field_desc(a.field_desc) {}

Field &operator=(const Field &a)
{
nv = a.nv;
type = a.type;
field_desc = a.field_desc;

return *this;
}

};

class DestinationDriver final : public syslogng::grpc::DestDriver
{
public:
DestinationDriver(GrpcDestDriver *s);
~DestinationDriver();
bool init();
const gchar *generate_persist_name();
const gchar *format_stats_key(StatsClusterKeyBuilder *kb);
LogThreadedDestWorker *construct_worker(int worker_index);

bool add_field(std::string name, std::string type, LogTemplate *value);
void set_protobuf_schema(std::string proto_path, GList *values);
static bool map_schema_type(const std::string &type_in, google::protobuf::FieldDescriptorProto::Type &type_out);

void set_project(std::string p)
{
Expand Down Expand Up @@ -113,35 +79,20 @@ class DestinationDriver final : public syslogng::grpc::DestDriver
return this->table;
}

Schema *get_schema()
{
return &this->schema;
}

private:
friend class DestinationWorker;
void construct_schema_prototype();
bool load_protobuf_schema();

private:
Schema schema;

std::string project;
std::string dataset;
std::string table;

struct
{
std::string proto_path;
GList *values = nullptr;

std::unique_ptr<google::protobuf::compiler::DiskSourceTree> src_tree;
std::unique_ptr<google::protobuf::compiler::MultiFileErrorCollector> error_coll;
std::unique_ptr<google::protobuf::compiler::Importer> importer;
bool loaded = false;
} protobuf_schema;

std::vector<Field> fields;

google::protobuf::DescriptorPool descriptor_pool;

/* A given descriptor_pool/importer instance should outlive msg_factory, as msg_factory caches prototypes */
std::unique_ptr<google::protobuf::DynamicMessageFactory> msg_factory;
const google::protobuf::Descriptor *schema_descriptor = nullptr;
const google::protobuf::Message *schema_prototype = nullptr;
};


Expand Down
Loading

0 comments on commit 7072ae4

Please sign in to comment.