diff --git a/bpf/bpf.go b/bpf/bpf.go index 04701f7..8ccf504 100644 --- a/bpf/bpf.go +++ b/bpf/bpf.go @@ -6,7 +6,9 @@ import ( "encoding/binary" "fmt" "log" + "slices" "strings" + "time" "unsafe" "github.com/cilium/ebpf" @@ -201,14 +203,14 @@ func (o *Bpf) KprobeKfree() *ebpf.Program { } func (o *Bpf) PollSkb(ctx context.Context) (_ <-chan Skbdump, err error) { + dataReader, err := perf.NewReader(o.objs.PerfOutput, 1500*1000) + if err != nil { + log.Printf("Failed to open perf: %+v", err) + } + ch := make(chan Skbdump) go func() { defer close(ch) - - dataReader, err := perf.NewReader(o.objs.PerfOutput, 1500*1000) - if err != nil { - log.Printf("Failed to open perf: %+v", err) - } defer dataReader.Close() go func() { @@ -216,14 +218,37 @@ func (o *Bpf) PollSkb(ctx context.Context) (_ <-chan Skbdump, err error) { dataReader.Close() }() + records := make(chan perf.Record) + go func() { + defer close(records) + for { + rec, err := dataReader.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + return + } + log.Printf("Failed to read ringbuf: %+v", err) + continue + } + records <- rec + } + }() + + var pool []Skbdump for { - rec, err := dataReader.Read() - if err != nil { - if errors.Is(err, ringbuf.ErrClosed) { - return + var ok bool + var rec perf.Record + select { + case <-time.After(10 * time.Millisecond): + for _, skb := range pool { + ch <- skb } - log.Printf("Failed to read ringbuf: %+v", err) + pool = nil continue + case rec, ok = <-records: + if !ok { + return + } } skb := Skbdump{} @@ -233,7 +258,20 @@ func (o *Bpf) PollSkb(ctx context.Context) (_ <-chan Skbdump, err error) { } copy(skb.Payload[:], rec.RawSample[unsafe.Offsetof(skb.Payload):]) - ch <- skb + if len(pool) > 100 { + ch <- pool[0] + pool = pool[1:] + } + + idx, _ := slices.BinarySearchFunc(pool, skb, func(a, b Skbdump) int { + if a.Meta.TimeNs < b.Meta.TimeNs { + return -1 + } else if a.Meta.TimeNs > b.Meta.TimeNs { + return 1 + } + return 0 + }) + pool = slices.Insert(pool, idx, skb) } }() return ch, nil diff --git a/main.go b/main.go index 6371e0c..d040f36 100644 --- a/main.go +++ b/main.go @@ -120,10 +120,7 @@ func main() { if config.Oneshot { if ksym, _ := utils.Addr2ksym(skb.Meta.At); ksym == "kfree_skbmem" { - go func() { - time.Sleep(1 * time.Second) - cancel() - }() + cancel() } } }