Skip to content

Commit

Permalink
Migrate to OTel
Browse files Browse the repository at this point in the history
  • Loading branch information
kislaykishore committed Oct 11, 2024
1 parent 71edffd commit b8b031f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 235 deletions.
19 changes: 19 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import "go.opentelemetry.io/otel/metric"

var DefaultLatencyDistribution = metric.WithExplicitBucketBoundaries(1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/exporters/prometheus v0.52.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/sdk v1.30.0
go.opentelemetry.io/otel/sdk/metric v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
Expand Down Expand Up @@ -111,7 +112,6 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240530194437-404ba88c7ed0 // indirect
Expand Down
45 changes: 13 additions & 32 deletions internal/fs/wrappers/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"syscall"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"go.opentelemetry.io/otel"
Expand All @@ -30,9 +31,10 @@ import (
const name = "cloud.google.com/gcsfuse"

var (
meter = otel.Meter("fs")
opsCount, _ = meter.Int64Counter("fs/ops_count", metric.WithDescription("The number of ops processed by the file system."))
opsLatency, _ = meter.Int64Counter("fs/ops_latency", metric.WithDescription("The latency of a file system operation."), metric.WithUnit("ms"))
meter = otel.Meter("fs")
opsCount, _ = meter.Int64Counter("fs/ops_count", metric.WithDescription("The number of ops processed by the file system."))
opsLatency, _ = meter.Int64Histogram("fs/ops_latency", metric.WithDescription("The latency of a file system operation."), metric.WithUnit("us"),
common.DefaultLatencyDistribution)
opsErrorCount, _ = meter.Int64Counter("fs/ops_error_count", metric.WithDescription("The number of errors generated by file system operation."))
)

Expand Down Expand Up @@ -267,35 +269,14 @@ func categorize(err error) string {
func recordOp(ctx context.Context, method string, start time.Time, fsErr error) {
// Recording opCount.
opsCount.Add(ctx, 1, metric.WithAttributes(attribute.String("fs_op", method)))
/*
// Recording opErrorCount.
if fsErr != nil {
errCategory := categorize(fsErr)
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.FSOp, method),
tag.Upsert(tags.FSErrCategory, errCategory),
},
opsErrorCount.M(1),
); err != nil {
// Error in recording opErrorCount.
logger.Errorf("Cannot record error count of the file system failed operations: %v", err)
}
}
// Recording opLatency.
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.FSOp, method),
},
opsLatency.M(time.Since(start).Microseconds()),
); err != nil {
// Error in opLatency.
logger.Errorf("Cannot record file system operation latency: %v", err)
}
*/

// Recording opErrorCount.
if fsErr != nil {
errCategory := categorize(fsErr)
opsErrorCount.Add(ctx, 1, metric.WithAttributes(attribute.String("fs_op", method), attribute.String("fs_error_category", errCategory)))
}

opsLatency.Record(ctx, time.Since(start).Microseconds(), metric.WithAttributes(attribute.String("fs_op", method)))
}

// WithMonitoring takes a FileSystem, returns a FileSystem with monitoring
Expand Down
90 changes: 12 additions & 78 deletions internal/monitor/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,83 +20,25 @@ import (
"io"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor/tags"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
meter = otel.Meter("gcs")
// OpenCensus measures
readBytesCount = stats.Int64("gcs/read_bytes_count", "The number of bytes read from GCS objects.", stats.UnitBytes)
readerCount = stats.Int64("gcs/reader_count", "The number of GCS object readers opened or closed.", stats.UnitDimensionless)
requestCount = stats.Int64("gcs/request_count", "The number of GCS requests processed.", stats.UnitDimensionless)
requestLatency = stats.Float64("gcs/request_latency", "The latency of a GCS request.", stats.UnitMilliseconds)
readBytesCount, _ = meter.Int64Counter("gcs/read_bytes_count", metric.WithDescription("The number of bytes read from GCS objects."), metric.WithUnit("By"))
readerCount, _ = meter.Int64Counter("gcs/reader_count", metric.WithDescription("The number of GCS object readers opened or closed."))
requestCount, _ = meter.Int64Counter("gcs/request_count", metric.WithDescription("The number of GCS requests processed."))
requestLatency, _ = meter.Int64Histogram("gcs/request_latencies", metric.WithDescription("The latency of a GCS request."), metric.WithUnit("ms"))
)

// Initialize the metrics.
func init() {
// OpenCensus views (aggregated measures)
if err := view.Register(
&view.View{
Name: "gcs/read_bytes_count",
Measure: readBytesCount,
Description: "The cumulative number of bytes read from GCS objects.",
Aggregation: view.Sum(),
},
&view.View{
Name: "gcs/reader_count",
Measure: readerCount,
Description: "The cumulative number of GCS object readers opened or closed.",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.IOMethod},
},
&view.View{
Name: "gcs/request_count",
Measure: requestCount,
Description: "The cumulative number of GCS requests processed.",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.GCSMethod},
},
&view.View{
Name: "gcs/request_latencies",
Measure: requestLatency,
Description: "The cumulative distribution of the GCS request latencies.",
Aggregation: ochttp.DefaultLatencyDistribution,
TagKeys: []tag.Key{tags.GCSMethod},
}); err != nil {
fmt.Printf("Failed to register OpenCensus metrics for GCS client library: %v", err)
}
}

// recordRequest records a request and its latency.
func recordRequest(ctx context.Context, method string, start time.Time) {
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.GCSMethod, method),
},
requestCount.M(1),
); err != nil {
// The error should be caused by a bad tag
logger.Errorf("Cannot record request count: %v", err)
}

latencyUs := time.Since(start).Microseconds()
latencyMs := float64(latencyUs) / 1000.0
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.GCSMethod, method),
},
requestLatency.M(latencyMs),
); err != nil {
// The error should be caused by a bad tag
logger.Errorf("Cannot record request latency: %v", err)
}
requestCount.Add(ctx, 1, metric.WithAttributes(attribute.String("gcs_method", method)))
requestLatency.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attribute.String("gcs_method", method)))
}

// NewMonitoringBucket returns a gcs.Bucket that exports metrics for monitoring
Expand Down Expand Up @@ -225,15 +167,7 @@ func (mb *monitoringBucket) RenameFolder(ctx context.Context, folderName string,

// recordReader increments the reader count when it's opened or closed.
func recordReader(ctx context.Context, ioMethod string) {
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.IOMethod, ioMethod),
},
readerCount.M(1),
); err != nil {
logger.Errorf("Cannot record a reader %v: %v", ioMethod, err)
}
readerCount.Add(ctx, 1, metric.WithAttributes(attribute.String("io_method", ioMethod)))
}

// Monitoring on the object reader
Expand All @@ -255,7 +189,7 @@ type monitoringReadCloser struct {
func (mrc *monitoringReadCloser) Read(p []byte) (n int, err error) {
n, err = mrc.wrapped.Read(p)
if err == nil || err == io.EOF {
stats.Record(mrc.ctx, readBytesCount.M(int64(n)))
readBytesCount.Add(mrc.ctx, int64(n))
}
return
}
Expand Down
148 changes: 24 additions & 124 deletions internal/monitor/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,144 +15,44 @@
package monitor

import (
"log"
"strconv"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor/tags"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/net/context"
)

var (
gcsMeter = otel.Meter("gcs")
fileCacheMeter = otel.Meter("file_cache")
// When a first read call is made by the user, we either fetch entire file or x number of bytes from GCS based on the request.
// Now depending on the pagesize multiple read calls will be issued by user to read the entire file. These
// requests will be served from the downloaded data.
// This metric captures only the requests made to GCS, not the subsequent page calls.
gcsReadCount = stats.Int64("gcs/read_count",
"Specifies the number of gcs reads made along with type - Sequential/Random",
UnitDimensionless)
downloadBytesCount = stats.Int64("gcs/download_bytes_count",
"The cumulative number of bytes downloaded from GCS along with type - Sequential/Random",
UnitBytes)
fileCacheReadCount = stats.Int64("file_cache/read_count",
"Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false",
UnitDimensionless)
fileCacheReadBytesCount = stats.Int64("file_cache/read_bytes_count",
"The cumulative number of bytes read from file cache along with read type - Sequential/Random",
UnitBytes)
fileCacheReadLatency = stats.Int64("file_cache/read_latency",
"Latency of read from file cache along with cache hit - true/false",
UnitMicroseconds)
gcsReadCount, _ = gcsMeter.Int64Counter("gcs/read_count", metric.WithDescription("Specifies the number of gcs reads made along with type - Sequential/Random"))
downloadBytesCount, _ = gcsMeter.Int64Counter("gcs/download_bytes_count",
metric.WithDescription("The cumulative number of bytes downloaded from GCS along with type - Sequential/Random"),
metric.WithUnit("By"))
fileCacheReadCount, _ = fileCacheMeter.Int64Counter("file_cache/read_count",
metric.WithDescription("Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false"))
fileCacheReadBytesCount, _ = fileCacheMeter.Int64Counter("file_cache/read_bytes_count",
metric.WithDescription("The cumulative number of bytes read from file cache along with read type - Sequential/Random"),
metric.WithUnit("By"))
fileCacheReadLatency, _ = fileCacheMeter.Int64Histogram("file_cache/read_latency",
metric.WithDescription("Latency of read from file cache along with cache hit - true/false"),
metric.WithUnit("us"),
common.DefaultLatencyDistribution)
)

const NanosecondsInOneMillisecond = 1000000

// Initialize the metrics.
func init() {
// GCS related metrics
if err := view.Register(
&view.View{
Name: "gcs/read_count",
Measure: gcsReadCount,
Description: "Specifies the number of gcs reads made along with type - Sequential/Random",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.ReadType},
},
&view.View{
Name: "gcs/download_bytes_count",
Measure: downloadBytesCount,
Description: "The cumulative number of bytes downloaded from GCS along with type - Sequential/Random",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.ReadType},
},
// File cache related metrics
&view.View{
Name: "file_cache/read_count",
Measure: fileCacheReadCount,
Description: "Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.ReadType, tags.CacheHit},
},
&view.View{
Name: "file_cache/read_bytes_count",
Measure: fileCacheReadBytesCount,
Description: "The cumulative number of bytes read from file cache along with read type - Sequential/Random",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tags.ReadType},
},
&view.View{
Name: "file_cache/read_latencies",
Measure: fileCacheReadLatency,
Description: "The cumulative distribution of the file cache read latencies along with cache hit - true/false",
Aggregation: ochttp.DefaultLatencyDistribution,
TagKeys: []tag.Key{tags.CacheHit},
},
); err != nil {
log.Fatalf("Failed to register the reader view: %v", err)
}
}

func CaptureGCSReadMetrics(ctx context.Context, readType string, requestedDataSize int64) {
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.ReadType, readType),
},
gcsReadCount.M(1),
); err != nil {
// Error in recording gcsReadCount.
logger.Errorf("Cannot record gcsReadCount %v", err)
}

if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.ReadType, readType),
},
downloadBytesCount.M(requestedDataSize),
); err != nil {
// Error in recording downloadBytesCount.
logger.Errorf("Cannot record downloadBytesCount %v", err)
}
gcsReadCount.Add(ctx, 1, metric.WithAttributes(attribute.String("read_type", readType)))
downloadBytesCount.Add(ctx, requestedDataSize, metric.WithAttributes(attribute.String("read_type", readType)))
}

func CaptureFileCacheMetrics(ctx context.Context, readType string, readDataSize int, cacheHit bool, readLatency time.Duration) {
if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.ReadType, readType),
tag.Upsert(tags.CacheHit, strconv.FormatBool(cacheHit)),
},
fileCacheReadCount.M(1),
); err != nil {
// Error in recording fileCacheReadCount.
logger.Errorf("Cannot record fileCacheReadCount %v", err)
}

if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.ReadType, readType),
},
fileCacheReadBytesCount.M(int64(readDataSize)),
); err != nil {
// Error in recording fileCacheReadBytesCount.
logger.Errorf("Cannot record fileCacheReadBytesCount %v", err)
}

if err := stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(tags.CacheHit, strconv.FormatBool(cacheHit)),
},
fileCacheReadLatency.M(readLatency.Microseconds()),
); err != nil {
// Error in recording fileCacheReadLatency.
logger.Errorf("Cannot record fileCacheReadLatency %v", err)
}
fileCacheReadCount.Add(ctx, 1, metric.WithAttributes(attribute.String("read_type", readType), attribute.Bool("cache_hit", cacheHit)))
fileCacheReadBytesCount.Add(ctx, int64(readDataSize), metric.WithAttributes(attribute.String("read_type", readType)))
fileCacheReadLatency.Record(ctx, readLatency.Microseconds(), metric.WithAttributes(attribute.Bool("cache_hit", cacheHit)))
}

0 comments on commit b8b031f

Please sign in to comment.