Skip to content

Commit

Permalink
ProcessCache: Delay removal for custom refcnts
Browse files Browse the repository at this point in the history
The process cache uses reference counting to ensure we keep hold of
entries that we still need to refer to, but allows us to remove those
that we no longer need. Beyond "process" and "parent", messages could
set custom refcnts on processes. For these we need to keep the process
in the cache until all its descendants have exited (to allow events from
any of the descendants to decrement the reference count on the process).
This is to prevent the stale entry garbage collector from removing the
process while it still has active descendants.

This commit adds a flag on the internal process that specifies that the
process has custom refcnts. It avoids removing the process or its
descendants until all have exited by not decrementing the parent's
refcnt until the process' refcnt is 0.

Signed-off-by: Kevin Sheldrake <kevin.sheldrake@isovalent.com>
  • Loading branch information
kevsecurity committed Sep 26, 2024
1 parent c313ccc commit 5169550
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
18 changes: 14 additions & 4 deletions pkg/grpc/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func GetProcessExit(event *MsgExitEventUnix) *tetragon.ProcessExit {
ec.Add(nil, tetragonEvent, event.Common.Ktime, event.ProcessKey.Ktime, event)
return nil
}
if parent != nil {
if proc != nil && !proc.HasCustomRefCnt() && parent != nil {
parent.RefDec("parent")
}
if proc != nil {
Expand All @@ -427,10 +427,14 @@ func (msg *MsgExitEventUnix) Notify() bool {
func (msg *MsgExitEventUnix) RetryInternal(ev notify.Event, timestamp uint64) (*process.ProcessInternal, error) {
internal, parent := process.GetParentProcessInternal(msg.ProcessKey.Pid, timestamp)
var err error
hasCustomeRefCnt := false
if internal != nil && internal.HasCustomRefCnt() {
hasCustomeRefCnt = true
}

if parent != nil {
ev.SetParent(parent.UnsafeGetProcess())
if !msg.RefCntDone[ParentRefCnt] {
if !hasCustomeRefCnt && !msg.RefCntDone[ParentRefCnt] {
parent.RefDec("parent")
msg.RefCntDone[ParentRefCnt] = true
}
Expand Down Expand Up @@ -494,9 +498,13 @@ func (msg *MsgProcessCleanupEventUnix) Notify() bool {
func (msg *MsgProcessCleanupEventUnix) RetryInternal(_ notify.Event, timestamp uint64) (*process.ProcessInternal, error) {
internal, parent := process.GetParentProcessInternal(msg.PID, timestamp)
var err error
hasCustomeRefCnt := false
if internal != nil && internal.HasCustomRefCnt() {
hasCustomeRefCnt = true
}

if parent != nil {
if !msg.RefCntDone[ParentRefCnt] {
if !hasCustomeRefCnt && !msg.RefCntDone[ParentRefCnt] {
parent.RefDec("parent")
msg.RefCntDone[ParentRefCnt] = true
}
Expand Down Expand Up @@ -528,7 +536,9 @@ func (msg *MsgProcessCleanupEventUnix) Retry(_ *process.ProcessInternal, _ notif
func (msg *MsgProcessCleanupEventUnix) HandleMessage() *tetragon.GetEventsResponse {
msg.RefCntDone = [2]bool{false, false}
if process, parent := process.GetParentProcessInternal(msg.PID, msg.Ktime); process != nil && parent != nil {
parent.RefDec("parent")
if !process.HasCustomRefCnt() {
parent.RefDec("parent")
}
process.RefDec("process")
} else {
if ec := eventcache.Get(); ec != nil {
Expand Down
23 changes: 23 additions & 0 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (pc *Cache) cleanStaleEntries() {
}
for _, d := range deleteProcesses {
processCacheRemovedStale.Inc()
parent, err := pc.get(d.process.ParentExecId)
if err != nil {
parent.RefDec("parent")
}
pc.remove(d.process)
}
}
Expand All @@ -159,6 +163,13 @@ func processOrChildExit(reason string) bool {
return false
}

func processOrChildExec(reason string) bool {
if reason == "process++" || reason == "parent++" {
return true
}
return false
}

func (pc *Cache) deletePending(process *ProcessInternal) {
pc.deleteChan <- process
}
Expand All @@ -176,13 +187,25 @@ func (pc *Cache) refDec(p *ProcessInternal, reason string) {
ref := atomic.AddUint32(&p.refcnt, ^uint32(0))
if ref == 0 {
pc.deletePending(p)
// If the process has a custom refcnt then we need to reduce the parent's
// refcnt now we're finally deleting the process.
if p.HasCustomRefCnt() {
parent, err := pc.get(p.process.ParentExecId)
if err == nil {
parent.RefDec("parent")
}
}
}
}

func (pc *Cache) refInc(p *ProcessInternal, reason string) {
p.refcntOpsLock.Lock()
// count number of times refcnt is increamented for a specific reason (i.e. process, parent, etc.)
p.refcntOps[reason]++
// Check if this is a custom refcnt. If so, set the flag.
if !processOrChildExec(reason) {
p.customRefcnt = true
}
p.refcntOpsLock.Unlock()
atomic.AddUint32(&p.refcnt, 1)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ type ProcessInternal struct {
cgID uint64
// the time the process and all children exited
exitTime time.Time
// Whether this process has custom refcnts (not "process" or "parent").
// This cascades on fork, but is reset on exec. The purpose is to delay reducing the
// refcnt on a processes' parent on exit, to when the process itself has a refcnt of 0.
// The result is that any process that has this flag set will keep itself and all its
// forked (but not exec) descendants in the cache after they have exited, until all the
// (non-exec) descendants have exited (as any forked descendant could potentially cause
// an action that is attributed to the process and we need it in the cache in order to
// report it). This serves to prevent entries being removed by the stale entry GC until
// all (non-exec) descendants have exited.
customRefcnt bool
}

var (
Expand Down Expand Up @@ -123,6 +133,7 @@ func (pi *ProcessInternal) cloneInternalProcessCopy() *ProcessInternal {
namespaces: pi.namespaces,
refcnt: 1, // Explicitly initialize refcnt to 1
refcntOps: map[string]int32{"process++": 1},
customRefcnt: pi.customRefcnt,
}
}

Expand All @@ -149,6 +160,10 @@ func (pi *ProcessInternal) GetCgID() uint64 {
return pi.cgID
}

func (pi *ProcessInternal) HasCustomRefCnt() bool {
return pi.customRefcnt
}

// UpdateExecOutsideCache() checks if we must augment the ProcessExec.Process
// with more fields without propagating again those fields into the process
// cache. This means that those added fields will only show up for the
Expand Down Expand Up @@ -401,6 +416,7 @@ func initProcessInternalExec(
refcnt: 1,
cgID: event.Kube.Cgrpid,
refcntOps: map[string]int32{"process++": 1},
customRefcnt: false,
}
}

Expand Down

0 comments on commit 5169550

Please sign in to comment.