Skip to content

alexandru-uta/IndexedDF

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

78 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

IndexedDF

1. General Description

This project extends the Spark Dataframe API by adding indexing capabilities. The index implemented is an equality index, but that can theoretically be changed to range indexing. The supported operations are:

  1. Index Creation
  2. Point Lookups
  3. Appends

2. Building the Project

This project is a standalone sbt project, and can be easily built by executing sbt compile, or sbt package from the command line. Additionally, the test suite can be run by executing sbt test.

3. The API

Assuming we have a Spark data frame df, the API is as follows:

  1. indexedDF = df.createIndex(columnNumber: Int); here, the returned object is an Indexed Dataframe.
  2. regularDF = indexedDF.getRows(key: AnyVal); this method returns a regular dataframe containing the rows that are indexed by key key.
  3. newIndexedDF = indexedDF.appendRows(df: Dataframe); this method returns an indexed dataframe where the rows of the df dataframe have been appended to indexedDF

4. Example Code

val sparkSession = SparkSession.builder.
   master("local")
   .appName("indexed df test app")
   // this is the number of partitions the indexed data frame will have, so use this judiciously 
   .config("spark.sql.shuffle.partitions", "4")
   .getOrCreate()
    
// we have to make sure to add the [IndexedOperator] strategies and the [ConvertToIndexedOperators] rules
// otherwise, Spark wouldn't know how to deal with the operators on the indexed dataframes
sparkSession.experimental.extraStrategies = (Seq(IndexedOperators) ++ 
                                            sparkSession.experimental.extraStrategies)
sparkSession.experimental.extraOptimizations = (Seq(ConvertToIndexedOperators) ++
                                               sparkSession.experimental.extraOptimizations)
    
    
// read a dataframe
val df = sparkSession.read
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .load("/path/to/data")

// index it on column 0 and cache the indexed result
val indexedDF = df.createIndex(0).cache()

// load another dataframe
val anotherDF = sparkSession.read
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .load("/path/to/data2")
    
indexedDF.createOrReplaceTempView("indexedtable")
anotherDF.createOrReplaceTempView("nonindexedtable")

// assuming that the indexedDF has column col1 and anotherDF has column col2, we can write the following join:
val result = sparkSession.sql("SELECT * from indexedtable JOIN nonindexedtable ON indexedtable.col1 = nonindexedtable.col2")

result.show()
   

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published