Skip to content

Commit

Permalink
Merge branch 'master' into sahnib_master_rocks_hardening
Browse files Browse the repository at this point in the history
  • Loading branch information
micheal-o committed Sep 23, 2024
2 parents 6a8639b + 3b5c1d6 commit e13abd1
Show file tree
Hide file tree
Showing 227 changed files with 6,593 additions and 1,196 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ github:
squash: true
rebase: true
ghp_branch: master
ghp_path: /docs/_site
ghp_path: /docs

notifications:
pullrequests: reviews@spark.apache.org
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_python_connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
python packaging/connect/setup.py sdist
cd dist
pip install pyspark*connect-*.tar.gz
pip install 'six==1.16.0' 'pandas<=2.2.2' scipy 'plotly>=4.8' 'mlflow>=2.8.1' coverage matplotlib openpyxl 'memory-profiler>=0.61.0' 'scikit-learn>=1.3.2' 'graphviz==0.20.3' torch torchvision torcheval deepspeed unittest-xml-reporting
pip install 'six==1.16.0' 'pandas<=2.2.2' scipy 'plotly>=4.8' 'mlflow>=2.8.1' coverage matplotlib openpyxl 'memory-profiler>=0.61.0' 'scikit-learn>=1.3.2' 'graphviz==0.20.3' torch torchvision torcheval deepspeed unittest-xml-reporting 'plotly>=4.8'
- name: Run tests
env:
SPARK_TESTING: 1
Expand Down
11 changes: 9 additions & 2 deletions .github/workflows/pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ on:

concurrency:
group: 'docs preview'
cancel-in-progress: true
cancel-in-progress: false

jobs:
docs:
Expand All @@ -35,6 +35,8 @@ jobs:
permissions:
id-token: write
pages: write
environment:
name: github-pages # https://github.com/actions/deploy-pages/issues/271
env:
SPARK_TESTING: 1 # Reduce some noise in the logs
RELEASE_VERSION: 'In-Progress'
Expand All @@ -56,7 +58,12 @@ jobs:
architecture: x64
cache: 'pip'
- name: Install Python dependencies
run: pip install --upgrade -r dev/requirements.txt
run: |
pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' \
ipython ipython_genutils sphinx_plotly_directive 'numpy>=1.20.0' pyarrow 'pandas==2.2.2' 'plotly>=4.8' 'docutils<0.18.0' \
'flake8==3.9.0' 'mypy==1.8.0' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' 'black==23.9.1' \
'pandas-stubs==1.2.0.53' 'grpcio==1.62.0' 'grpcio-status==1.62.0' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' \
'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5'
- name: Install Ruby for documentation generation
uses: ruby/setup-ruby@v1
with:
Expand Down
Empty file added .nojekyll
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static int lowercaseMatchLengthFrom(
}
// Compare the characters in the target and pattern strings.
int matchLength = 0, codePointBuffer = -1, targetCodePoint, patternCodePoint;
while (targetIterator.hasNext() && patternIterator.hasNext()) {
while ((targetIterator.hasNext() || codePointBuffer != -1) && patternIterator.hasNext()) {
if (codePointBuffer != -1) {
targetCodePoint = codePointBuffer;
codePointBuffer = -1;
Expand Down Expand Up @@ -211,7 +211,7 @@ private static int lowercaseMatchLengthUntil(
}
// Compare the characters in the target and pattern strings.
int matchLength = 0, codePointBuffer = -1, targetCodePoint, patternCodePoint;
while (targetIterator.hasNext() && patternIterator.hasNext()) {
while ((targetIterator.hasNext() || codePointBuffer != -1) && patternIterator.hasNext()) {
if (codePointBuffer != -1) {
targetCodePoint = codePointBuffer;
codePointBuffer = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ protected CollationMeta buildCollationMeta() {
ICULocaleMap.get(locale).getDisplayCountry(),
VersionInfo.ICU_VERSION.toString(),
COLLATION_PAD_ATTRIBUTE,
caseSensitivity == CaseSensitivity.CS,
accentSensitivity == AccentSensitivity.AS);
accentSensitivity == AccentSensitivity.AS,
caseSensitivity == CaseSensitivity.CS);
}

/**
Expand Down Expand Up @@ -921,6 +921,18 @@ public static int collationNameToId(String collationName) throws SparkException
return Collation.CollationSpec.collationNameToId(collationName);
}

/**
* Returns whether the ICU collation is not Case Sensitive Accent Insensitive
* for the given collation id.
* This method is used in expressions which do not support CS_AI collations.
*/
public static boolean isCaseSensitiveAndAccentInsensitive(int collationId) {
return Collation.CollationSpecICU.fromCollationId(collationId).caseSensitivity ==
Collation.CollationSpecICU.CaseSensitivity.CS &&
Collation.CollationSpecICU.fromCollationId(collationId).accentSensitivity ==
Collation.CollationSpecICU.AccentSensitivity.AI;
}

public static void assertValidProvider(String provider) throws SparkException {
if (!SUPPORTED_PROVIDERS.contains(provider.toLowerCase())) {
Map<String, String> params = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ public void testStartsWith() throws SparkException {
assertStartsWith("İonic", "Io", "UTF8_LCASE", false);
assertStartsWith("İonic", "i\u0307o", "UTF8_LCASE", true);
assertStartsWith("İonic", "İo", "UTF8_LCASE", true);
assertStartsWith("oİ", "oİ", "UTF8_LCASE", true);
assertStartsWith("oİ", "oi̇", "UTF8_LCASE", true);
// Conditional case mapping (e.g. Greek sigmas).
assertStartsWith("σ", "σ", "UTF8_BINARY", true);
assertStartsWith("σ", "ς", "UTF8_BINARY", false);
Expand Down Expand Up @@ -880,6 +882,8 @@ public void testEndsWith() throws SparkException {
assertEndsWith("the İo", "Io", "UTF8_LCASE", false);
assertEndsWith("the İo", "i\u0307o", "UTF8_LCASE", true);
assertEndsWith("the İo", "İo", "UTF8_LCASE", true);
assertEndsWith("İo", "İo", "UTF8_LCASE", true);
assertEndsWith("İo", "i̇o", "UTF8_LCASE", true);
// Conditional case mapping (e.g. Greek sigmas).
assertEndsWith("σ", "σ", "UTF8_BINARY", true);
assertEndsWith("σ", "ς", "UTF8_BINARY", false);
Expand Down
35 changes: 19 additions & 16 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
{
"ADD_DEFAULT_UNSUPPORTED" : {
"message" : [
"Failed to execute <statementType> command because DEFAULT values are not supported when adding new columns to previously existing target data source with table provider: \"<dataSource>\"."
],
"sqlState" : "42623"
},
"AGGREGATE_FUNCTION_WITH_NONDETERMINISTIC_EXPRESSION" : {
"message" : [
"Non-deterministic expression <sqlExpr> should not appear in the arguments of an aggregate function."
Expand Down Expand Up @@ -1049,7 +1055,7 @@
"message" : [
"Encountered error when saving to external data source."
],
"sqlState" : "KD00F"
"sqlState" : "KD010"
},
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
Expand Down Expand Up @@ -1096,6 +1102,12 @@
],
"sqlState" : "42608"
},
"DEFAULT_UNSUPPORTED" : {
"message" : [
"Failed to execute <statementType> command because DEFAULT values are not supported for target data source with table provider: \"<dataSource>\"."
],
"sqlState" : "42623"
},
"DISTINCT_WINDOW_FUNCTION_UNSUPPORTED" : {
"message" : [
"Distinct window functions are not supported: <windowExpr>."
Expand Down Expand Up @@ -1444,6 +1456,12 @@
],
"sqlState" : "2203G"
},
"FAILED_TO_LOAD_ROUTINE" : {
"message" : [
"Failed to load routine <routineName>."
],
"sqlState" : "38000"
},
"FAILED_TO_PARSE_TOO_COMPLEX" : {
"message" : [
"The statement, including potential SQL functions and referenced views, was too complex to parse.",
Expand Down Expand Up @@ -6673,21 +6691,6 @@
"Sinks cannot request distribution and ordering in continuous execution mode."
]
},
"_LEGACY_ERROR_TEMP_1344" : {
"message" : [
"Invalid DEFAULT value for column <fieldName>: <defaultValue> fails to parse as a valid literal value."
]
},
"_LEGACY_ERROR_TEMP_1345" : {
"message" : [
"Failed to execute <statementType> command because DEFAULT values are not supported for target data source with table provider: \"<dataSource>\"."
]
},
"_LEGACY_ERROR_TEMP_1346" : {
"message" : [
"Failed to execute <statementType> command because DEFAULT values are not supported when adding new columns to previously existing target data source with table provider: \"<dataSource>\"."
]
},
"_LEGACY_ERROR_TEMP_2000" : {
"message" : [
"<message>. If necessary set <ansiConfig> to false to bypass this error."
Expand Down
2 changes: 1 addition & 1 deletion common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -7417,7 +7417,7 @@
"standard": "N",
"usedBy": ["Databricks"]
},
"KD00F": {
"KD010": {
"description": "external data source failure",
"origin": "Databricks",
"standard": "N",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private[spark] object LogKeys {
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FIELD_TYPE extends LogKey
case object FILES extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
Expand Down Expand Up @@ -652,6 +653,7 @@ private[spark] object LogKeys {
case object RECEIVER_IDS extends LogKey
case object RECORDS extends LogKey
case object RECOVERY_STATE extends LogKey
case object RECURSIVE_DEPTH extends LogKey
case object REDACTED_STATEMENT extends LogKey
case object REDUCE_ID extends LogKey
case object REGEX extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private[sql] case class AvroDataToCatalyst(
val dt = SchemaConverters.toSqlType(
expectedSchema,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType).dataType
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth).dataType
parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
Expand All @@ -69,7 +70,8 @@ private[sql] case class AvroDataToCatalyst(
dataType,
avroOptions.datetimeRebaseModeInRead,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType)
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth)

@transient private var decoder: BinaryDecoder = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@ private[sql] class AvroDeserializer(
datetimeRebaseSpec: RebaseSpec,
filters: StructFilters,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String) {
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int) {

def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String) = {
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int) = {
this(
rootAvroType,
rootCatalystType,
positionalFieldMatch = false,
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
new NoopFilters,
useStableIdForUnionType,
stableIdPrefixForUnionType)
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
}

private lazy val decimalConversions = new DecimalConversion()
Expand Down Expand Up @@ -128,7 +131,8 @@ private[sql] class AvroDeserializer(
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"

val realDataType = SchemaConverters.toSqlType(
avroType, useStableIdForUnionType, stableIdPrefixForUnionType).dataType
avroType, useStableIdForUnionType, stableIdPrefixForUnionType,
recursiveFieldMaxDepth).dataType

(avroType.getType, catalystType) match {
case (NULL, NullType) => (updater, ordinal, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ private[sql] class AvroFileFormat extends FileFormat
datetimeRebaseMode,
avroFilters,
parsedOptions.useStableIdForUnionType,
parsedOptions.stableIdPrefixForUnionType)
parsedOptions.stableIdPrefixForUnionType,
parsedOptions.recursiveFieldMaxDepth)
override val stopPosition = file.start + file.length

override def hasNext: Boolean = hasNextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -136,6 +137,15 @@ private[sql] class AvroOptions(

val stableIdPrefixForUnionType: String = parameters
.getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_")

val recursiveFieldMaxDepth: Int =
parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1)

if (recursiveFieldMaxDepth > RECURSIVE_FIELD_MAX_DEPTH_LIMIT) {
throw QueryCompilationErrors.avroOptionsException(
RECURSIVE_FIELD_MAX_DEPTH,
s"Should not be greater than $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.")
}
}

private[sql] object AvroOptions extends DataSourceOptions {
Expand Down Expand Up @@ -170,4 +180,25 @@ private[sql] object AvroOptions extends DataSourceOptions {
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure the prefix for fields
// of Avro Union type.
val STABLE_ID_PREFIX_FOR_UNION_TYPE = newOption("stableIdentifierPrefixForUnionType")

/**
* Adds support for recursive fields. If this option is not specified or is set to 0, recursive
* fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive
* fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15.
* Values larger than 15 are not allowed in order to avoid inadvertently creating very large
* schemas. If an avro message has depth beyond this limit, the Spark struct returned is
* truncated after the recursion limit.
*
* Examples: Consider an Avro schema with a recursive field:
* {"type" : "record", "name" : "Node", "fields" : [{"name": "Id", "type": "int"},
* {"name": "Next", "type": ["null", "Node"]}]}
* The following lists the parsed schema with different values for this setting.
* 1: `struct<Id: int>`
* 2: `struct<Id: int, Next: struct<Id: int>>`
* 3: `struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>`
* and so on.
*/
val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth")

val RECURSIVE_FIELD_MAX_DEPTH_LIMIT: Int = 15
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[sql] object AvroUtils extends Logging {
SchemaConverters.toSqlType(
avroSchema,
parsedOptions.useStableIdForUnionType,
parsedOptions.stableIdPrefixForUnionType).dataType match {
parsedOptions.stableIdPrefixForUnionType,
parsedOptions.recursiveFieldMaxDepth).dataType match {
case t: StructType => Some(t)
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
Expand Down
Loading

0 comments on commit e13abd1

Please sign in to comment.