Skip to content

Commit

Permalink
Merge pull request #14 from coroot/ebpf_based_tracing
Browse files Browse the repository at this point in the history
eBPF-based tracing
  • Loading branch information
apetruhin authored May 10, 2023
2 parents 8840fd9 + b0eaa0e commit 9960601
Show file tree
Hide file tree
Showing 18 changed files with 887 additions and 150 deletions.
20 changes: 15 additions & 5 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/coroot/coroot-node-agent/node"
"github.com/coroot/coroot-node-agent/pinger"
"github.com/coroot/coroot-node-agent/proc"
"github.com/coroot/coroot-node-agent/tracing"
"github.com/coroot/logparser"
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netns"
Expand Down Expand Up @@ -65,6 +66,8 @@ type ActiveConnection struct {
Fd uint64
Timestamp uint64
Closed time.Time

PreparedStatements map[string]string
}

type L7Stats struct {
Expand All @@ -73,6 +76,7 @@ type L7Stats struct {
}

type Container struct {
id ContainerID
cgroup *cgroup.Cgroup
metadata *ContainerMetadata

Expand Down Expand Up @@ -113,13 +117,14 @@ type Container struct {
done chan struct{}
}

func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
}
defer netNs.Close()
c := &Container{
id: id,
cgroup: cg,
metadata: md,

Expand Down Expand Up @@ -458,10 +463,11 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
}
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
PreparedStatements: map[string]string{},
}
}
c.connectLastAttempt[dst] = time.Now()
Expand Down Expand Up @@ -514,6 +520,10 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
stats = map[AddrPair]*L7Stats{}
c.l7Stats[r.Protocol] = stats
}
tracing.HandleL7Request(string(c.id), conn.ActualDest, r, conn.PreparedStatements)
if r.Method == ebpftracer.L7MethodStatementClose {
return
}
s := stats[key]
if s == nil {
constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}
Expand Down
2 changes: 1 addition & 1 deletion containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
r.containersByCgroupId[cg.Id] = c
return c
}
c, err := NewContainer(cg, md, r.hostConntrack, pid)
c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
if err != nil {
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
return nil
Expand Down
8 changes: 4 additions & 4 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

223 changes: 151 additions & 72 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "cassandra.c"
#include "rabbitmq.c"


#define PROTOCOL_UNKNOWN 0
#define PROTOCOL_HTTP 1
#define PROTOCOL_POSTGRES 2
Expand All @@ -20,9 +19,13 @@
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9

#define METHOD_UNKNOWN 0
#define METHOD_PRODUCE 1
#define METHOD_CONSUME 2
#define METHOD_UNKNOWN 0
#define METHOD_PRODUCE 1
#define METHOD_CONSUME 2
#define METHOD_STATEMENT_PREPARE 3
#define METHOD_STATEMENT_CLOSE 4

#define MAX_PAYLOAD_SIZE 512

struct l7_event {
__u64 fd;
Expand All @@ -32,8 +35,18 @@ struct l7_event {
__u64 duration;
__u8 protocol;
__u8 method;
__u16 padding;
__u32 statement_id;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct l7_event);
__uint(max_entries, 1);
} l7_event_heap SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(int));
Expand Down Expand Up @@ -63,9 +76,18 @@ struct l7_request {
__u64 ns;
__u8 protocol;
__u8 partial;
__u8 request_type;
__s32 request_id;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct l7_request);
__uint(max_entries, 1);
} l7_request_heap SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(key_size, sizeof(struct socket_key));
Expand Down Expand Up @@ -107,61 +129,101 @@ __u64 get_connection_timestamp(__u32 pid, __u64 fd) {
static inline __attribute__((__always_inline__))
int trace_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx, __u64 fd, char *buf, __u64 size) {
__u64 id = bpf_get_current_pid_tgid();
struct l7_request req = {};
req.protocol = PROTOCOL_UNKNOWN;
req.partial = 0;
req.request_id = 0;
req.ns = 0;
int zero = 0;
struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
if (!req) {
return 0;
}
req->protocol = PROTOCOL_UNKNOWN;
req->partial = 0;
req->request_id = 0;
req->ns = 0;
struct socket_key k = {};
k.pid = id >> 32;
k.fd = fd;
k.stream_id = -1;

if (is_http_request(buf)) {
req.protocol = PROTOCOL_HTTP;
} else if (is_postgres_query(buf, size)) {
req.protocol = PROTOCOL_POSTGRES;
req->protocol = PROTOCOL_HTTP;
} else if (is_postgres_query(buf, size, &req->request_type)) {
if (req->request_type == POSTGRES_FRAME_CLOSE) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_POSTGRES;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_STATEMENT_CLOSE;
e->status = 200;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
}
req->protocol = PROTOCOL_POSTGRES;
} else if (is_redis_query(buf)) {
req.protocol = PROTOCOL_REDIS;
req->protocol = PROTOCOL_REDIS;
} else if (is_memcached_query(buf, size)) {
req.protocol = PROTOCOL_MEMCACHED;
} else if (is_mysql_query(buf, size)) {
req.protocol = PROTOCOL_MYSQL;
req->protocol = PROTOCOL_MEMCACHED;
} else if (is_mysql_query(buf, size, &req->request_type)) {
if (req->request_type == MYSQL_COM_STMT_CLOSE) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_MYSQL;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_STATEMENT_CLOSE;
e->status = 200;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
}
req->protocol = PROTOCOL_MYSQL;
} else if (is_mongo_query(buf, size)) {
req.protocol = PROTOCOL_MONGO;
req->protocol = PROTOCOL_MONGO;
} else if (is_rabbitmq_produce(buf, size)) {
struct l7_event e = {};
e.protocol = PROTOCOL_RABBITMQ;
e.fd = k.fd;
e.pid = k.pid;
e.status = 200;
e.method = METHOD_PRODUCE;
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_RABBITMQ;
e->fd = k.fd;
e->pid = k.pid;
e->status = 200;
e->method = METHOD_PRODUCE;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
} else {
__s32 request_id = is_kafka_request(buf, size);
if (request_id > 0) {
req.request_id = request_id;
req.protocol = PROTOCOL_KAFKA;
req->request_id = request_id;
req->protocol = PROTOCOL_KAFKA;
struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
req.ns = prev_req->ns;
req->ns = prev_req->ns;
}
} else {
__s16 stream_id = is_cassandra_request(buf, size);
if (stream_id != -1) {
k.stream_id = stream_id;
req.protocol = PROTOCOL_CASSANDRA;
req->protocol = PROTOCOL_CASSANDRA;
}
}
}
if (req.protocol == PROTOCOL_UNKNOWN) {
if (req->protocol == PROTOCOL_UNKNOWN) {
return 0;
}
if (req.ns == 0) {
req.ns = bpf_ktime_get_ns();
if (req->ns == 0) {
req->ns = bpf_ktime_get_ns();
}
bpf_map_update_elem(&active_l7_requests, &k, &req, BPF_ANY);
bpf_probe_read(req->payload, MAX_PAYLOAD_SIZE, (void *)buf);
bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
return 0;
}

Expand All @@ -179,7 +241,7 @@ int trace_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
static inline __attribute__((__always_inline__))
int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
__u64 id = bpf_get_current_pid_tgid();

int zero = 0;
struct rw_args_t *args = bpf_map_lookup_elem(&active_reads, &id);
if (!args) {
return 0;
Expand All @@ -197,19 +259,23 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
return 0;
}

struct l7_event e = {};
e.fd = k.fd;
e.pid = k.pid;
e.connection_timestamp = 0;
e.status = 0;
e.method = METHOD_UNKNOWN;
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->fd = k.fd;
e->pid = k.pid;
e->connection_timestamp = 0;
e->status = 0;
e->method = METHOD_UNKNOWN;
e->statement_id = 0;

if (is_rabbitmq_consume(buf, size)) {
e.protocol = PROTOCOL_RABBITMQ;
e.status = 200;
e.method = METHOD_CONSUME;
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
e->protocol = PROTOCOL_RABBITMQ;
e->status = 200;
e->method = METHOD_CONSUME;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
}

Expand All @@ -226,42 +292,55 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
return 0;
}
}

bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, req->payload);
__s32 request_id = req->request_id;
e.protocol = req->protocol;
e->protocol = req->protocol;
__u64 ns = req->ns;
__u8 partial = req->partial;
__u8 request_type = req->request_type;
bpf_map_delete_elem(&active_l7_requests, &k);
if (e.protocol == PROTOCOL_HTTP) {
e.status = parse_http_status(buf);
} else if (e.protocol == PROTOCOL_POSTGRES) {
e.status = parse_postgres_status(buf, ctx->ret);
} else if (e.protocol == PROTOCOL_REDIS) {
e.status = parse_redis_status(buf, ctx->ret);
} else if (e.protocol == PROTOCOL_MEMCACHED) {
e.status = parse_memcached_status(buf, ctx->ret);
} else if (e.protocol == PROTOCOL_MYSQL) {
e.status = parse_mysql_status(buf, ctx->ret);
} else if (e.protocol == PROTOCOL_MONGO) {
e.status = parse_mongo_status(buf, ctx->ret, partial);
if (e.status == 1) {
struct l7_request r = {};
r.partial = 1;
r.protocol = e.protocol;
r.ns = ns;
bpf_map_update_elem(&active_l7_requests, &k, &r, BPF_ANY);
if (e->protocol == PROTOCOL_HTTP) {
e->status = parse_http_status(buf);
} else if (e->protocol == PROTOCOL_POSTGRES) {
e->status = parse_postgres_status(buf, ctx->ret);
if (request_type == POSTGRES_FRAME_PARSE) {
e->method = METHOD_STATEMENT_PREPARE;
}
} else if (e->protocol == PROTOCOL_REDIS) {
e->status = parse_redis_status(buf, ctx->ret);
} else if (e->protocol == PROTOCOL_MEMCACHED) {
e->status = parse_memcached_status(buf, ctx->ret);
} else if (e->protocol == PROTOCOL_MYSQL) {
e->status = parse_mysql_response(buf, ctx->ret, request_type, &e->statement_id);
if (request_type == MYSQL_COM_STMT_PREPARE) {
e->method = METHOD_STATEMENT_PREPARE;
}
} else if (e->protocol == PROTOCOL_MONGO) {
e->status = parse_mongo_status(buf, ctx->ret, partial);
if (e->status == 1) {
struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
if (!r) {
return 0;
}
r->partial = 1;
r->protocol = e->protocol;
r->ns = ns;
bpf_probe_read(r->payload, MAX_PAYLOAD_SIZE, e->payload);
bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
return 0;
}
} else if (e.protocol == PROTOCOL_KAFKA) {
e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
} else if (e.protocol == PROTOCOL_CASSANDRA) {
e.status = cassandra_status(cassandra_response);
} else if (e->protocol == PROTOCOL_KAFKA) {
e->status = parse_kafka_status(request_id, buf, ctx->ret, partial);
} else if (e->protocol == PROTOCOL_CASSANDRA) {
e->status = cassandra_status(cassandra_response);
}
if (e.status == 0) {
if (e->status == 0) {
return 0;
}
e.duration = bpf_ktime_get_ns() - ns;
e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
e->duration = bpf_ktime_get_ns() - ns;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
}

Expand Down
Loading

0 comments on commit 9960601

Please sign in to comment.