Skip to content

Commit

Permalink
Fix context returns in telemetry. (#149)
Browse files Browse the repository at this point in the history
* add debugging

* More debuggiong

* More debugging

* Fix context returns in telemetry.

We need to return the stream context and then add our logger into it, not cache the original calling context and return it.

If we do that we return the wrong context peer info eventually

* We should include everything from the peer context (cert, principal)
  • Loading branch information
sfc-gh-jchacon authored Jul 12, 2022
1 parent 5a5b4a2 commit d5aba5a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
4 changes: 3 additions & 1 deletion proxy/server/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ func (s *TargetStream) Run(nonce uint32, replyChan chan *pb.ProxyReply) {
}
streamPeerInfo := s.PeerAuthInfo()
authinput.Host = &rpcauth.HostAuthInput{
Net: streamPeerInfo.Net,
Net: streamPeerInfo.Net,
Cert: streamPeerInfo.Cert,
Principal: streamPeerInfo.Principal,
}

// If authz fails, close immediately with an error
Expand Down
14 changes: 8 additions & 6 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func StreamClientLogInterceptor(logger logr.Logger) grpc.StreamClientInterceptor
}
return &loggedClientStream{
ClientStream: stream,
ctx: logCtx,
logger: l,
}, nil
}
Expand Down Expand Up @@ -107,13 +106,15 @@ func passAlongMetadata(ctx context.Context) context.Context {

type loggedClientStream struct {
grpc.ClientStream
ctx context.Context
logger logr.Logger
}

// See: grpc.ClientStream.Context()
func (l *loggedClientStream) Context() context.Context {
return l.ctx
// Get the stream context and make sure our logger is attached.
ctx := l.ClientStream.Context()
ctx = logr.NewContext(ctx, l.logger)
return ctx
}

// See: grpc.ClientStream.SendMsg()
Expand Down Expand Up @@ -184,7 +185,6 @@ func StreamServerLogInterceptor(logger logr.Logger) grpc.StreamServerInterceptor
stream := &loggedStream{
ServerStream: ss,
logger: l,
logCtx: logr.NewContext(ss.Context(), l),
}
err := handler(srv, stream)
if err != nil {
Expand All @@ -198,11 +198,13 @@ func StreamServerLogInterceptor(logger logr.Logger) grpc.StreamServerInterceptor
type loggedStream struct {
grpc.ServerStream
logger logr.Logger
logCtx context.Context
}

func (l *loggedStream) Context() context.Context {
return l.logCtx
// Get the stream context and make sure our logger is attached.
ctx := l.ServerStream.Context()
ctx = logr.NewContext(ctx, l.logger)
return ctx
}

func (l *loggedStream) SendMsg(m interface{}) error {
Expand Down

0 comments on commit d5aba5a

Please sign in to comment.