Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1807] Implement Program Failure Exception Handling in GCS plugins to catch known errors #1450

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import java.io.IOException;
import java.util.List;

/**
* Utility class to handle exceptions.
*/
public class ExceptionUtils {

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private static ProgramFailureException getProgramFailureException(HttpResponseException e) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());

String errorMessage = e.getMessage();
if (e instanceof GoogleJsonResponseException) {
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
exception.getMessage();
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, pair.getErrorType(), true, e);
}

/**
* Get a ProgramFailureException with the given error
* information from generic exceptions like {@link IOException}.
*
* @param e The Throwable to get the error information from.
* @return A ProgramFailureException with the given error information, otherwise null.
*/
public static ProgramFailureException getProgramFailureException(Throwable e) {

List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
return (ProgramFailureException) t;
}
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t);
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
} catch (IOException e) {
throw new RuntimeException(e);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
}
}

@Override
public void refresh() throws IOException {
getCredentials().refresh();
try {
getCredentials().refresh();
} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to refresh service account access token.", e.getMessage(),
ErrorType.UNKNOWN, true, e);
}
}

private GoogleCredentials getCredentials() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -41,7 +43,8 @@
* <p>
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
*/
public class DelegatingGCSOutputCommitter extends OutputCommitter {
public class DelegatingGCSOutputCommitter extends OutputCommitter implements
ErrorDetailsProvider<Void> {

private final TaskAttemptContext taskAttemptContext;
private boolean firstTable = true;
Expand Down Expand Up @@ -244,4 +247,8 @@ private String getPendingDirPath(JobID jobId) {
return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
}

@Override
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
return ExceptionUtils.getProgramFailureException(throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -32,7 +34,8 @@
/**
* Output Format used to handle Schemaless Records as input.
*/
public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {
public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, StructuredRecord> implements
ErrorDetailsProvider<Configuration> {

public static final String PARTITION_FIELD = "delegating_output_format.partition.field";
public static final String DELEGATE_CLASS = "delegating_output_format.delegate";
Expand Down Expand Up @@ -75,4 +78,9 @@ public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext contex
return new DelegatingGCSOutputCommitter(context);
}

@Override
public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) {
return ExceptionUtils.getProgramFailureException(throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand All @@ -31,7 +33,8 @@
* <p>
* This Record Writer will initialize record writes and Output Committers as needed.
*/
public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
public class DelegatingGCSRecordWriter extends
RecordWriter<NullWritable, StructuredRecord> implements ErrorDetailsProvider<Void> {
private final TaskAttemptContext context;
private final String partitionField;
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
Expand Down Expand Up @@ -78,4 +81,8 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
}
}

@Override
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
return ExceptionUtils.getProgramFailureException(throwable);
}
}
55 changes: 39 additions & 16 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception {
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
return;
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Bucket bucket;
String location;
String location = null;
try {
bucket = storage.get(config.getBucket());
bucket = storage.get(bucketName);
if (bucket != null) {
location = bucket.getLocation();
} else {
location = config.getLocation();
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
}
if (bucket != null) {
location = bucket.getLocation();
} else {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
location = config.getLocation();
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

this.outputPath = getOutputDir(context);
// create asset for lineage
asset = Asset.builder(config.getReferenceName())
Expand Down Expand Up @@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) {
}
}

public String getBucket() {
return GCSPath.from(path).getBucket();
public String getBucket(FailureCollector collector) {
try {
return GCSPath.from(path).getBucket();
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
return null;
}

@Override
Expand Down
44 changes: 29 additions & 15 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,33 +129,47 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());

CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();

Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
if (isServiceAccountFilePath == null) {
context.getFailureCollector().addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
context.getFailureCollector().getOrThrowException();
return;
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
try {
if (storage.get(config.getBucket()) == null) {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
if (config.getAllowFlexibleSchema()) {
//Configure MultiSink with support for flexible schemas.
configureSchemalessMultiSink(context, baseProperties, argumentCopy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.plugin.gcp.common.ExceptionUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.StorageClient;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -40,7 +42,8 @@
/**
* OutputCommitter for GCS
*/
public class GCSOutputCommitter extends OutputCommitter {
public class GCSOutputCommitter extends OutputCommitter implements
ErrorDetailsProvider<Void> {

private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
Expand Down Expand Up @@ -161,4 +164,9 @@ public boolean isRecoverySupported() {
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
delegate.recoverTask(taskContext);
}

@Override
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
return ExceptionUtils.getProgramFailureException(throwable);
}
}
Loading