From 83a009a720517e1392afd2e260874fea074e6c40 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Mon, 7 Oct 2024 14:20:05 +0000 Subject: [PATCH] Implement Program Failure Exception Handling in GCS plugins to catch known errors --- pom.xml | 4 +- .../plugin/gcp/common/ExceptionUtils.java | 79 ++++++++ .../ServiceAccountAccessTokenProvider.java | 15 +- .../sink/DelegatingGCSOutputCommitter.java | 9 +- .../gcs/sink/DelegatingGCSOutputFormat.java | 10 +- .../gcs/sink/DelegatingGCSRecordWriter.java | 9 +- .../plugin/gcp/gcs/sink/GCSBatchSink.java | 55 ++++-- .../gcp/gcs/sink/GCSMultiBatchSink.java | 44 +++-- .../gcp/gcs/sink/GCSOutputCommitter.java | 10 +- .../gcp/gcs/sink/GCSOutputFormatProvider.java | 34 +++- .../gcs/sink/RecordFilterOutputFormat.java | 18 +- .../gcs/source/GCSInputFormatProvider.java | 176 ++++++++++++++++++ .../cdap/plugin/gcp/gcs/source/GCSSource.java | 66 +++++-- .../plugin/gcp/gcs/source/TinkDecryptor.java | 30 ++- 14 files changed, 497 insertions(+), 62 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java diff --git a/pom.xml b/pom.xml index f3b98ceb3d..1dfa6a226e 100644 --- a/pom.xml +++ b/pom.xml @@ -1010,8 +1010,8 @@ 1.1.0 - system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT) diff --git a/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java b/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java new file mode 100644 index 0000000000..b5d8665dbb --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.common; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpResponseException; +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import java.io.IOException; +import java.util.List; + +/** + * Utility class to handle exceptions. + */ +public class ExceptionUtils { + + /** + * Get a ProgramFailureException with the given error + * information from {@link HttpResponseException}. + * + * @param e The HttpResponseException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private static ProgramFailureException getProgramFailureException(HttpResponseException e) { + Integer statusCode = e.getStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(), + pair.getCorrectiveAction()); + + String errorMessage = e.getMessage(); + if (e instanceof GoogleJsonResponseException) { + GoogleJsonResponseException exception = (GoogleJsonResponseException) e; + errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() : + exception.getMessage(); + } + + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, pair.getErrorType(), true, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from generic exceptions like {@link IOException}. + * + * @param e The Throwable to get the error information from. + * @return A ProgramFailureException with the given error information, otherwise null. + */ + public static ProgramFailureException getProgramFailureException(Throwable e) { + + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + return (ProgramFailureException) t; + } + if (t instanceof HttpResponseException) { + return getProgramFailureException((HttpResponseException) t); + } + } + + return null; + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java index 0a64c52dd9..4cedc83c8e 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java @@ -21,6 +21,10 @@ import com.google.bigtable.repackaged.com.google.gson.Gson; import com.google.cloud.hadoop.util.AccessTokenProvider; import com.google.cloud.hadoop.util.CredentialFactory; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; @@ -50,13 +54,20 @@ public AccessToken getAccessToken() { } return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime()); } catch (IOException e) { - throw new RuntimeException(e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e); } } @Override public void refresh() throws IOException { - getCredentials().refresh(); + try { + getCredentials().refresh(); + } catch (IOException e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to refresh service account access token.", e.getMessage(), + ErrorType.UNKNOWN, true, e); + } } private GoogleCredentials getCredentials() throws IOException { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java index 72d9dd0e69..b5ba33187d 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java @@ -16,6 +16,8 @@ package io.cdap.plugin.gcp.gcs.sink; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.gcp.common.ExceptionUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -41,7 +43,8 @@ *

* Delegated instances are created based on a supplied Output Format and Destination Table Names. */ -public class DelegatingGCSOutputCommitter extends OutputCommitter { +public class DelegatingGCSOutputCommitter extends OutputCommitter implements + ErrorDetailsProvider { private final TaskAttemptContext taskAttemptContext; private boolean firstTable = true; @@ -244,4 +247,8 @@ private String getPendingDirPath(JobID jobId) { return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId); } + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java index 2fe4eeff03..ac559825a5 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java @@ -17,6 +17,8 @@ package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.gcp.common.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -32,7 +34,8 @@ /** * Output Format used to handle Schemaless Records as input. */ -public class DelegatingGCSOutputFormat extends OutputFormat { +public class DelegatingGCSOutputFormat extends OutputFormat implements + ErrorDetailsProvider { public static final String PARTITION_FIELD = "delegating_output_format.partition.field"; public static final String DELEGATE_CLASS = "delegating_output_format.delegate"; @@ -75,4 +78,9 @@ public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext contex return new DelegatingGCSOutputCommitter(context); } + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } + } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java index 3bd6a13cc0..0414345801 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java @@ -17,6 +17,8 @@ package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.gcp.common.ExceptionUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; @@ -31,7 +33,8 @@ *

* This Record Writer will initialize record writes and Output Committers as needed. */ -public class DelegatingGCSRecordWriter extends RecordWriter { +public class DelegatingGCSRecordWriter extends + RecordWriter implements ErrorDetailsProvider { private final TaskAttemptContext context; private final String partitionField; private final Map> delegateMap; @@ -78,4 +81,8 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc } } + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index 129b6a297f..c55f5e0f57 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception { collector.addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`"); collector.getOrThrowException(); - return; } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; Bucket bucket; - String location; + String location = null; try { - bucket = storage.get(config.getBucket()); + bucket = storage.get(bucketName); + if (bucket != null) { + location = bucket.getLocation(); + } else { + location = config.getLocation(); + GCPUtils.createBucket(storage, bucketName, location, cmekKeyName); + } } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); - } - if (bucket != null) { - location = bucket.getLocation(); - } else { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); - location = config.getLocation(); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + this.outputPath = getOutputDir(context); // create asset for lineage asset = Asset.builder(config.getReferenceName()) @@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) { } } - public String getBucket() { - return GCSPath.from(path).getBucket(); + public String getBucket(FailureCollector collector) { + try { + return GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + return null; } @Override diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 7b15839c10..77e4c86f24 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -129,33 +129,47 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati config.validate(collector, context.getArguments().asMap()); collector.getOrThrowException(); - Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, - config.getPath(), new HashMap<>()); Map argumentCopy = new HashMap<>(context.getArguments().asMap()); - CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); if (isServiceAccountFilePath == null) { - context.getFailureCollector().addFailure("Service account type is undefined.", - "Must be `filePath` or `JSON`"); - context.getFailureCollector().getOrThrowException(); - return; + collector.addFailure("Service account type is undefined.", + "Must be `filePath` or `JSON`"); + collector.getOrThrowException(); } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; try { - if (storage.get(config.getBucket()) == null) { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); + if (storage.get(bucketName) == null) { + GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName); } } catch (StorageException e) { - // Add more descriptive error message - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, + config.getPath(), new HashMap<>()); if (config.getAllowFlexibleSchema()) { //Configure MultiSink with support for flexible schemas. configureSchemalessMultiSink(context, baseProperties, argumentCopy); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java index cccef18ad9..d3e2f4ba0f 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java @@ -21,6 +21,8 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.gcp.common.ExceptionUtils; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.StorageClient; import org.apache.hadoop.conf.Configuration; @@ -40,7 +42,8 @@ /** * OutputCommitter for GCS */ -public class GCSOutputCommitter extends OutputCommitter { +public class GCSOutputCommitter extends OutputCommitter implements + ErrorDetailsProvider { private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class); public static final String RECORD_COUNT_FORMAT = "recordcount.%s"; @@ -161,4 +164,9 @@ public boolean isRecoverySupported() { public void recoverTask(TaskAttemptContext taskContext) throws IOException { delegate.recoverTask(taskContext); } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java index 1d57e6d28e..42e2db120a 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java @@ -1,8 +1,26 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; +import io.cdap.plugin.gcp.common.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -49,7 +67,8 @@ public Map getOutputFormatConfiguration() { /** * OutputFormat for GCS Sink */ - public static class GCSOutputFormat extends OutputFormat { + public static class GCSOutputFormat extends OutputFormat + implements ErrorDetailsProvider { private OutputFormat delegateFormat; private OutputFormat getDelegateFormatInstance(Configuration configuration) throws IOException { @@ -89,12 +108,18 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) .getOutputCommitter(taskAttemptContext); return new GCSOutputCommitter(delegateCommitter); } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Configuration configuration) { + return ExceptionUtils.getProgramFailureException(throwable); + } } /** * RecordWriter for GCSSink */ - public static class GCSRecordWriter extends RecordWriter { + public static class GCSRecordWriter extends RecordWriter + implements ErrorDetailsProvider { private final RecordWriter originalWriter; private long recordCount; @@ -117,5 +142,10 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException, Int taskAttemptContext.getConfiguration() .setLong(String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()), recordCount); } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java index f4f5d6caca..8387d9b30c 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java @@ -18,6 +18,8 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.gcp.common.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -34,7 +36,8 @@ /** * An OutputFormat that filters records before sending them to a delegate */ -public class RecordFilterOutputFormat extends OutputFormat { +public class RecordFilterOutputFormat extends OutputFormat implements + ErrorDetailsProvider { public static final String FILTER_FIELD = "record.filter.field"; public static final String PASS_VALUE = "record.filter.val"; public static final String ORIGINAL_SCHEMA = "record.original.schema"; @@ -93,10 +96,16 @@ private OutputFormat getDelegateFormat(Configuration hConf) throws IOException { } } + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } + /** * Filters records before writing them out using a delegate. */ - public static class FilterRecordWriter extends RecordWriter { + public static class FilterRecordWriter extends RecordWriter + implements ErrorDetailsProvider { private final String filterField; private final String passthroughValue; private final RecordWriter delegate; @@ -132,5 +141,10 @@ public void write(NullWritable key, StructuredRecord record) throws IOException, public void close(TaskAttemptContext context) throws IOException, InterruptedException { delegate.close(context); } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java new file mode 100644 index 0000000000..308b245fc0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java @@ -0,0 +1,176 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.source; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.validation.FormatContext; +import io.cdap.cdap.etl.api.validation.InputFiles; +import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; +import io.cdap.plugin.gcp.common.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * InputFormatProvider for GCSSource. + */ +public class GCSInputFormatProvider implements ValidatingInputFormat { + public static final String DELEGATE_INPUTFORMAT_CLASSNAME = + "gcssource.delegate.inputformat.classname"; + private final ValidatingInputFormat delegate; + + public GCSInputFormatProvider(ValidatingInputFormat delegate) { + this.delegate = delegate; + } + + @Override + public void validate(FormatContext context) { + delegate.validate(context); + } + + @Override + public @Nullable Schema getSchema(FormatContext formatContext) { + return delegate.getSchema(formatContext); + } + + @Override + public @Nullable Schema detectSchema(FormatContext context, InputFiles inputFiles) + throws IOException { + return ValidatingInputFormat.super.detectSchema(context, inputFiles); + } + + @Override + public String getInputFormatClassName() { + return GCSInputFormat.class.getName(); + } + + @Override + public Map getInputFormatConfiguration() { + Map inputFormatConfiguration = + new HashMap<>(delegate.getInputFormatConfiguration()); + inputFormatConfiguration.put(DELEGATE_INPUTFORMAT_CLASSNAME, + delegate.getInputFormatClassName()); + return inputFormatConfiguration; + } + + /** + * InputFormat for GCSSource. + */ + public static class GCSInputFormat extends InputFormat + implements ErrorDetailsProvider { + private InputFormat delegateFormat; + + private InputFormat getDelegateFormatInstance(Configuration configuration) throws IOException { + if (delegateFormat != null) { + return delegateFormat; + } + + String delegateClassName = configuration.get(DELEGATE_INPUTFORMAT_CLASSNAME); + try { + delegateFormat = (InputFormat) ReflectionUtils + .newInstance(configuration.getClassByName(delegateClassName), configuration); + return delegateFormat; + } catch (ClassNotFoundException e) { + throw new IOException( + String.format("Unable to instantiate output format for class %s", delegateClassName), + e); + } + } + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + return getDelegateFormatInstance(jobContext.getConfiguration()).getSplits(jobContext); + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + RecordReader originalReader = + getDelegateFormatInstance(taskAttemptContext.getConfiguration()) + .createRecordReader(inputSplit, taskAttemptContext); + return new GCSRecordReader(originalReader); + } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Configuration configuration) { + return ExceptionUtils.getProgramFailureException(throwable); + } + } + + /** + * RecordReader for GCSSource. + */ + public static class GCSRecordReader extends RecordReader + implements ErrorDetailsProvider { + + private final RecordReader originalReader; + + public GCSRecordReader(RecordReader originalReader) { + this.originalReader = originalReader; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + originalReader.initialize(inputSplit, taskAttemptContext); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return originalReader.nextKeyValue(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return originalReader.getCurrentKey(); + } + + @Override + public StructuredRecord getCurrentValue() throws IOException, InterruptedException { + return originalReader.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return originalReader.getProgress(); + } + + @Override + public void close() throws IOException { + originalReader.close(); + } + + @Override + public RuntimeException getExceptionDetails(Throwable throwable, Void conf) { + return ExceptionUtils.getProgramFailureException(throwable); + } + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index 7bfec4dd81..47533f796d 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -33,6 +33,7 @@ import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; @@ -73,6 +74,12 @@ public GCSSource(GCSSourceConfig config) { this.config = config; } + @Override + public ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipelineConfigurer) { + ValidatingInputFormat delegate = super.getValidatingInputFormat(pipelineConfigurer); + return new GCSInputFormatProvider(delegate); + } + @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); @@ -83,25 +90,62 @@ protected String getEmptyInputFormatClassName() { return GCSEmptyInputFormat.class.getName(); } + @Override + public ValidatingInputFormat getInputFormatForRun(BatchSourceContext context) + throws InstantiationException { + ValidatingInputFormat inputFormatForRun = super.getInputFormatForRun(context); + return new GCSInputFormatProvider(inputFormatForRun); + } + @Override public void prepareRun(BatchSourceContext context) throws Exception { // Get location of the source for lineage - String location; - String bucketName = GCSPath.from(config.getPath()).getBucket(); - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), - config.connection.isServiceAccountFilePath()); + String location = null; + String bucketName = null; + String path = config.getPath(); + FailureCollector collector = context.getFailureCollector(); + + try { + bucketName = GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); + if (isServiceAccountFilePath == null) { + collector.addFailure("Service account type is undefined.", + "Must be `filePath` or `JSON`"); + collector.getOrThrowException(); + } + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); try { location = storage.get(bucketName).getLocation(); } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access bucket %s. ", bucketName) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format("Error code: %s, Unable to access GCS bucket '%s'. ", + e.getCode(), bucketName); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), + "Ensure you entered the correct bucket path and have permissions for it.") + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } // create asset for lineage - String fqn = GCSPath.getFQN(config.getPath()); + String fqn = GCSPath.getFQN(path); String referenceName = Strings.isNullOrEmpty(config.getReferenceName()) ? ReferenceNames.normalizeFqn(fqn) : config.getReferenceName(); @@ -142,7 +186,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { @Override protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) { - lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.", + lineageRecorder.recordRead("Read", String.format("Read %s from Google Cloud Storage.", config.isEncrypted() ? " and decrypt " : " "), outputFields); } @@ -257,7 +301,7 @@ public void validate(FailureCollector collector) { @Override public String getPath() { - return path; + return GCSPath.from(path).toString(); } @Nullable diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java index f6092f66f5..84165cff3c 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java @@ -24,6 +24,10 @@ import com.google.crypto.tink.StreamingAead; import com.google.crypto.tink.config.TinkConfig; import com.google.crypto.tink.integration.gcpkms.GcpKmsClient; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.crypto.Decryptor; import io.cdap.plugin.gcp.crypto.FSInputSeekableByteChannel; import org.apache.hadoop.conf.Configurable; @@ -66,20 +70,22 @@ public TinkDecryptor() throws GeneralSecurityException { @Override public SeekableByteChannel open(FileSystem fs, Path path, int bufferSize) throws IOException { DecryptInfo decryptInfo = getDecryptInfo(fs, path); + Path metadataPath = new Path(path.getParent(), path.getName() + metadataSuffix); if (decryptInfo == null) { - throw new IllegalArgumentException("Missing encryption metadata for file '" + path - + "'. Expected metadata path is '" - + new Path(path.getParent(), path.getName() + metadataSuffix) + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing encryption metadata for file '%s'. " + + "Expected metadata path is '%s'.", path, metadataPath), null, + ErrorType.USER, false, null); } try { StreamingAead streamingAead = decryptInfo.getKeysetHandle().getPrimitive(StreamingAead.class); return streamingAead.newSeekableDecryptingChannel(new FSInputSeekableByteChannel(fs, path, bufferSize), decryptInfo.getAad()); - } catch (IOException e) { - throw e; } catch (Exception e) { - throw new IOException(e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.", + path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e); } } @@ -88,7 +94,9 @@ public void setConf(Configuration configuration) { this.configuration = configuration; this.metadataSuffix = configuration.get(METADATA_SUFFIX); if (metadataSuffix == null) { - throw new IllegalArgumentException("Missing configuration '" + METADATA_SUFFIX + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing configuration '%s'.", METADATA_SUFFIX), null, + ErrorType.USER, false, null); } } @@ -112,7 +120,13 @@ private DecryptInfo getDecryptInfo(FileSystem fs, Path path) throws IOException } // Create the DecryptInfo - return getDecryptInfo(metadata); + try { + return getDecryptInfo(metadata); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.", + path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e); + } } static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException {