Skip to content

Commit

Permalink
Add error handling if non-healthcheck bucket is not empty (#178)
Browse files Browse the repository at this point in the history
* Add healthcheck flag

* modfiy

* address nilcheck

* Fix

* Add test for IsNotEmpty

* Add TestDeleteBucket

* Add test case

* address lint

* rebase and fix

* Move BucketNotEmptyError to helper

---------

Co-authored-by: Shunsuke Tokunaga <shtokuna@akamai.com>
  • Loading branch information
Shunpoco and Shunsuke Tokunaga authored Mar 14, 2024
1 parent c1c6351 commit f2bfec7
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 20 deletions.
20 changes: 19 additions & 1 deletion internal/controller/bucket/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
return err
}

if err := rgw.DeleteBucket(ctx, cl, aws.String(bucket.Name)); err != nil {
if err := rgw.DeleteBucket(ctx, cl, aws.String(bucket.Name), false); err != nil {
bucketBackends.setBucketCondition(bucket.Name, beName, xpv1.Deleting().WithMessage(err.Error()))

return err
Expand Down Expand Up @@ -123,6 +123,24 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
c.log.Info("Failed to delete bucket on one or more backends", "error", deleteErr.Error())
traces.SetAndRecordError(span, deleteErr)

// If the error is BucketNotEmpty error, the DeleteBucket operation should be failed
// and the client should be able to use the bucket with non-empty buckends.
if errors.Is(deleteErr, rgw.ErrBucketNotEmpty) {
if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired {
c.log.Info("Change 'disabled' flag to false", consts.KeyBucketName, bucket.Name)

bucketLatest.Spec.Disabled = false

return NeedsObjectUpdate
}); err != nil {
err = errors.Wrap(err, errUpdateBucketCR)
c.log.Info("Failed to change 'disabled' flag to false", consts.KeyBackendName, bucket.Name, "error", err.Error())
traces.SetAndRecordError(span, err)

return err
}
}

return deleteErr
}

Expand Down
106 changes: 106 additions & 0 deletions internal/controller/bucket/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bucket

import (
"context"
"fmt"
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/linode/provider-ceph/internal/backendstore"
"github.com/linode/provider-ceph/internal/backendstore/backendstorefakes"
"github.com/linode/provider-ceph/internal/controller/s3clienthandler"
"github.com/linode/provider-ceph/internal/rgw"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -615,6 +617,110 @@ func TestDelete(t *testing.T) {
},
},
},

"Error deleting bucket because one specified bucket is not empty": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
// DeleteBucket first calls HeadBucket to establish
// if a bucket exists, so return a random error
// to mimic a failed delete.
fakeClient := &backendstorefakes.FakeS3Client{}
fakeClient.HeadBucketReturns(
&s3.HeadBucketOutput{},
nil,
)
fakeClient.DeleteBucketReturns(
nil,
rgw.BucketNotEmptyError{},
)

// DeleteBucket first calls HeadBucket to establish
// if a bucket exists, so return not found
// error to short circuit a successful delete.
var notFoundError *s3types.NotFound
fakeClientOK := &backendstorefakes.FakeS3Client{}
fakeClientOK.HeadBucketReturns(
&s3.HeadBucketOutput{},
notFoundError,
)

bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend(s3Backend1, fakeClient, nil, true, apisv1alpha1.HealthStatusHealthy)
bs.AddOrUpdateBackend(s3Backend2, fakeClientOK, nil, true, apisv1alpha1.HealthStatusHealthy)

return bs
}(),
},
args: args{
mg: &v1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Name: "bucket",
Finalizers: []string{v1alpha1.InUseFinalizer},
Labels: map[string]string{
v1alpha1.BackendLabelPrefix + s3Backend1: "true",
v1alpha1.BackendLabelPrefix + s3Backend2: "true",
},
},
Spec: v1alpha1.BucketSpec{
Providers: []string{
s3Backend1,
s3Backend2,
},
},
Status: v1alpha1.BucketStatus{
AtProvider: v1alpha1.BucketObservation{
Backends: v1alpha1.Backends{
s3Backend1: &v1alpha1.BackendInfo{
BucketCondition: xpv1.Available(),
},
s3Backend2: &v1alpha1.BackendInfo{
BucketCondition: xpv1.Available(),
},
},
},
},
},
},
want: want{
err: rgw.ErrBucketNotEmpty,
statusDiff: func(t *testing.T, mg resource.Managed) {
t.Helper()
bucket, _ := mg.(*v1alpha1.Bucket)

// s3-backend-1 failed so is stuck in Deleting status.
assert.True(t,
bucket.Status.AtProvider.Backends[s3Backend1].BucketCondition.Equal(xpv1.Deleting().WithMessage(fmt.Errorf("%w: %w", rgw.ErrBucketNotEmpty, rgw.BucketNotEmptyError{}).Error())),
"unexpected bucket condition on s3-backend-1")

// s3-backend-2 was successfully deleted so was removed from status.
assert.False(t,
func(b v1alpha1.Backends) bool {
if _, ok := b[s3Backend2]; ok {
return true
}

return false
}(bucket.Status.AtProvider.Backends),
"s3-backend-2 should not exist in backends")

// If backend deletion fails due to BucketNotEmpty error, Disabled flag should be false.
assert.False(t,
bucket.Spec.Disabled,
"Disabled flag should be false",
)
},
finalizerDiff: func(t *testing.T, mg resource.Managed) {
t.Helper()
bucket, _ := mg.(*v1alpha1.Bucket)

assert.Equal(t,
[]string{v1alpha1.InUseFinalizer},
bucket.Finalizers,
"unexpeceted finalizers",
)
},
},
},
}
bk := &v1alpha1.Bucket{}
s := scheme.Scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *Controller) cleanup(ctx context.Context, req ctrl.Request, bucketName s
g := new(errgroup.Group)
g.Go(func() error {
c.log.Info("Deleting health check bucket", consts.KeyBucketName, bucketName, consts.KeyBackendName, req.Name)
if err := rgw.DeleteBucket(ctx, backendClient, aws.String(bucketName)); err != nil {
if err := rgw.DeleteBucket(ctx, backendClient, aws.String(bucketName), true); err != nil {
return errors.Wrap(err, errDeleteHealthCheckBucket)
}

Expand All @@ -221,7 +221,7 @@ func (c *Controller) cleanup(ctx context.Context, req ctrl.Request, bucketName s

g.Go(func() error {
c.log.Info("Deleting lifecycle configuration validation bucket", consts.KeyBucketName, v1alpha1.LifecycleConfigValidationBucketName, consts.KeyBackendName, req.Name)
if err := rgw.DeleteBucket(ctx, backendClient, aws.String(v1alpha1.LifecycleConfigValidationBucketName)); err != nil {
if err := rgw.DeleteBucket(ctx, backendClient, aws.String(v1alpha1.LifecycleConfigValidationBucketName), true); err != nil {
return errors.Wrap(err, errDeleteLCValidationBucket)
}

Expand Down
44 changes: 27 additions & 17 deletions internal/rgw/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rgw

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -23,6 +24,11 @@ const (
errHeadBucket = "failed to perform head bucket"
)

var (
// Define this error as an error type because bucket controller checks it
ErrBucketNotEmpty = errors.New("bucket is not empty")
)

func CreateBucket(ctx context.Context, s3Backend backendstore.S3Client, bucket *awss3.CreateBucketInput, o ...func(*awss3.Options)) (*awss3.CreateBucketOutput, error) {
ctx, span := otel.Tracer("").Start(ctx, "CreateBucket")
defer span.End()
Expand Down Expand Up @@ -61,7 +67,7 @@ func BucketExists(ctx context.Context, s3Backend backendstore.S3Client, bucketNa
return true, nil
}

func DeleteBucket(ctx context.Context, s3Backend backendstore.S3Client, bucketName *string, o ...func(*awss3.Options)) error {
func DeleteBucket(ctx context.Context, s3Backend backendstore.S3Client, bucketName *string, healthCheck bool, o ...func(*awss3.Options)) error {
ctx, span := otel.Tracer("").Start(ctx, "DeleteBucket")
defer span.End()

Expand All @@ -73,30 +79,34 @@ func DeleteBucket(ctx context.Context, s3Backend backendstore.S3Client, bucketNa
return nil
}

g := new(errgroup.Group)
if healthCheck {
g := new(errgroup.Group)

// Delete all objects from the bucket. This is sufficient for unversioned buckets.
g.Go(func() error {
return deleteBucketObjects(ctx, s3Backend, bucketName, o...)
})
// Delete all objects from the bucket. This is sufficient for unversioned buckets.
g.Go(func() error {
return deleteBucketObjects(ctx, s3Backend, bucketName, o...)
})

// Delete all object versions (required for versioned buckets).
g.Go(func() error {
return deleteBucketObjectVersions(ctx, s3Backend, bucketName, o...)
})
// Delete all object versions (required for versioned buckets).
g.Go(func() error {
return deleteBucketObjectVersions(ctx, s3Backend, bucketName, o...)
})

if err := g.Wait(); err != nil {
if NoSuchBucket(err) {
return nil
}
traces.SetAndRecordError(span, err)
if err := g.Wait(); err != nil {
if NoSuchBucket(err) {
return nil
}
traces.SetAndRecordError(span, err)

return errors.Wrap(err, errDeleteBucket)
return errors.Wrap(err, errDeleteBucket)
}
}

_, err = s3Backend.DeleteBucket(ctx, &awss3.DeleteBucketInput{Bucket: bucketName}, o...)
if resource.Ignore(IsNotFound, err) != nil {
traces.SetAndRecordError(span, err)
if IsNotEmpty(err) {
return fmt.Errorf("%w: %w", ErrBucketNotEmpty, err)
}

return errors.Wrap(err, errDeleteBucket)
}
Expand Down
30 changes: 30 additions & 0 deletions internal/rgw/bucket_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1"
)
Expand Down Expand Up @@ -82,3 +83,32 @@ func NoSuchBucket(err error) bool {

return errors.As(err, &noSuchBucketError)
}

func IsNotEmpty(err error) bool {
var ae smithy.APIError
if !errors.As(err, &ae) {
return false
}

return ae != nil && ae.ErrorCode() == "BucketNotEmpty"
}

// Unlike NoSuchBucket error or others, aws-sdk-go-v2 doesn't have a specific struct definition for BucketNotEmpty error.
// So we should define ourselves. This is currently only for testing.
type BucketNotEmptyError struct{}

func (e BucketNotEmptyError) Error() string {
return "BucketNotEmpty: some error"
}

func (e BucketNotEmptyError) ErrorCode() string {
return "BucketNotEmpty"
}

func (e BucketNotEmptyError) ErrorMessage() string {
return "some error"
}

func (e BucketNotEmptyError) ErrorFault() smithy.ErrorFault {
return smithy.FaultUnknown
}
43 changes: 43 additions & 0 deletions internal/rgw/bucket_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package rgw

import (
"testing"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestIsNotEmpty(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
err error
expected bool
}{
"true - BucketNotEmpty error": {
err: BucketNotEmptyError{},
expected: true,
},
"false - Error not implement AWS API error": {
err: errors.New("some error"),
expected: false,
},
"false - Other AWS API error": {
err: &s3types.NoSuchBucket{},
expected: false,
},
}

for name, tt := range testCases {
tt := tt

t.Run(name, func(t *testing.T) {
t.Parallel()

actual := IsNotEmpty(tt.err)

assert.Equal(t, tt.expected, actual, "result does not match")
})
}
}
Loading

0 comments on commit f2bfec7

Please sign in to comment.