Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1807] Implement error details provider to get more information about exceptions from GCP plugins #1452

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.util.List;

/**
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {

/**
* Get a ProgramFailureException with the given error
* information from generic exceptions like {@link java.io.IOException}.
*
* @param e The Throwable to get the error information from.
* @return A ProgramFailureException with the given error information, otherwise null.
*/
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t, errorContext);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

String errorMessage = e.getMessage();
if (e instanceof GoogleJsonResponseException) {
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
exception.getMessage();
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
pair.getErrorType(), true, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
} catch (IOException e) {
throw new RuntimeException(e);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
}
}

@Override
public void refresh() throws IOException {
getCredentials().refresh();
try {
getCredentials().refresh();
} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to refresh service account access token.", e.getMessage(),
ErrorType.UNKNOWN, true, e);
}
}

private GoogleCredentials getCredentials() throws IOException {
Expand Down
70 changes: 52 additions & 18 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand All @@ -51,6 +52,7 @@
import io.cdap.plugin.format.plugin.FileSinkProperties;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.Formats;
import io.cdap.plugin.gcp.gcs.GCSPath;
Expand Down Expand Up @@ -121,30 +123,50 @@ public void prepareRun(BatchSinkContext context) throws Exception {
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
return;
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Bucket bucket;
String location;
String location = null;
try {
bucket = storage.get(config.getBucket());
bucket = storage.get(bucketName);
if (bucket != null) {
location = bucket.getLocation();
} else {
location = config.getLocation();
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
}
if (bucket != null) {
location = bucket.getLocation();
} else {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
location = config.getLocation();
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

this.outputPath = getOutputDir(context);
// create asset for lineage
asset = Asset.builder(config.getReferenceName())
.setFqn(GCSPath.getFQN(config.getPath())).setLocation(location).build();

// set error details provider
context.setErrorDetailsProvider(
new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName()));

// super is called down here to avoid instantiating the lineage recorder with a null asset
super.prepareRun(context);
Expand Down Expand Up @@ -532,8 +554,20 @@ public void validateContentType(FailureCollector failureCollector) {
}
}

public String getBucket() {
return GCSPath.from(path).getBucket();
/**
* Get the bucket name from the path.
* @param collector failure collector
* @return bucket name as {@link String} if found, otherwise null.
*/
public String getBucket(FailureCollector collector) {
try {
return GCSPath.from(path).getBucket();
} catch (IllegalArgumentException e) {
sahusanket marked this conversation as resolved.
Show resolved Hide resolved
collector.addFailure(e.getMessage(), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
return null;
}

@Override
Expand Down Expand Up @@ -718,8 +752,8 @@ public Builder setCustomContentType(@Nullable String customContentType) {
return this;
}

public GCSBatchSink.GCSBatchSinkConfig build() {
return new GCSBatchSink.GCSBatchSinkConfig(
public GCSBatchSinkConfig build() {
return new GCSBatchSinkConfig(
referenceName,
project,
fileSystemProperties,
Expand Down
47 changes: 34 additions & 13 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.connector.GCSConnector;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -129,33 +131,52 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());

CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();

Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
if (isServiceAccountFilePath == null) {
context.getFailureCollector().addFailure("Service account type is undefined.",
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
context.getFailureCollector().getOrThrowException();
return;
collector.getOrThrowException();
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
try {
if (storage.get(config.getBucket()) == null) {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

// set error details provider
context.setErrorDetailsProvider(
new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName()));

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
if (config.getAllowFlexibleSchema()) {
//Configure MultiSink with support for flexible schemas.
configureSchemalessMultiSink(context, baseProperties, argumentCopy);
Expand Down
Loading
Loading