Home

Awesome

Developing Spark External Data Sources using the V2 API

This project illustrates the new V2 Apache Spark External Data Source API as introduced in Spark 2.3.0.

It consists of:

The project has spun out of the following older projects:

Goals and Assumptions

Target Audience

This project is targeted at developers who are designing and implementing a new Spark external data source for some data store with which they are reasonably familiar, and need information on Spark's new (V2) model for integrating external data sources.

Designing ExampleDB

This project is rather unusual in the decision to develop, from scratch, a simple in-memory database system (ExampleDB) for the sole purpose of providing a simple example integration point for a wide range of Spark external data source examples. It's important to understand the goals and (especially) non-goals of ExampleDB.

Goals

Non-goals

The Data Sources

These can be found under src/main/java/datasources.

Read-only data sources

<table> <tr><th>File</th><th>What's Illustrated</th></tr> <tr> <td><a href="src/main/java/datasources/SimpleRowDataSource.java">SimpleRowDataSource.java</a></td> <td> <p>An extremely simple DataSource that supports sequential reads (i.e.: on just one executor) from ExampleDB. It only supports reads from a single, pre-defined table with a pre-defined schema. This DataSource is probably about a simple as one that reads from a remote database can get.</p> </td> </tr> <tr> <td><a href="src/main/java/datasources/FlexibleRowDataSource.java">FlexibleRowDataSource.java</a></td> <td> <p>Another simple DataSource that supports sequential reads (i.e.: on just one executor) from ExampleDB. It gets a table name from its configuration and infers a schema from that table.</p> </td> </tr> <tr> <td><a href="src/main/java/datasources/ParallelRowDataSource.java">ParallelRowDataSource.java</a></td> <td> <p>Another simple DataSource that supports parallel reads (i.e.: on multiple executors) from ExampleDB. It gets a table name from its configuration and infers a schema from that table. If a number of partitions is specified in properties, it is used. Otherwise, the table's default partition count (always 4 in ExampleDB) is used.</p> </td> </tr> <tr> <td><a href="src/main/java/datasources/PartitioningRowDataSource.java">PartitioningRowDataSource.java</a></td> <td> <p>This also supports parallel reads (i.e.: on multiple executors) from the ExampleDB. The interesting feature of this example is that it supports informing the Spark SQL optimizer whether the table is partitioned in the right way to avoid shuffles in certain queries. One example is grouping queries, where shuffles can be avoided if the table is clustered in such a way that each group (cluster) is fully contained in a single partition. Since ExampleDB only supports clustered indexes on single columns, in practice a shuffle can be avoided if the table is clustered on one of the grouping (In ExampleDB clustered tables, splits always respect clustering.) </p> </td> </tr> </table>

Read/write data sources

<table> <tr><th>File</th><th>What's Illustrated</th></tr> <tr> <td><a href="src/main/java/datasources/ParallelRowReadWriteDataSource.java">ParallelRowReadWriteDataSource.java</a></td> <td> <p>This data source adds the ability to write data, and does so in parallel. The various classes for reading are identical to those of ParallelRowDataSource. All four values of SaveMode are supported. Each task writes to its own temporary table, and on global commit all of these temporary tables are copied into the destination table in a single ExampleDB transaction.</p> </td> </tr> </table>

The Spark Examples

These can be found under src/main/java/examples and src/main/scala/examples.

Read-only data sources

<table> <tr><th>File</th><th>What's Illustrated</th></tr> <tr> <td><a href="src/main/java/examples/JBasic.java">JBasic.java</a><br/> <a href="src/main/scala/examples/SBasic.scala">SBasic.scala</a></td> <td> <p>Simplest example that uses direct ExampleDB calls to populate a table and then uses the SimpleRowDataSource to query it from Spark. Since that data source is sequential the resulting Dataset has just one partition. Since the data source reads from a single, hard coded table with a hard coded schema, the table name is not specified int he Spark code.</p> </td> </tr> <tr> <td><a href="src/main/java/examples/JReadNamedTable.java">JReadNamedTable.java</a><br/> <a href="src/main/scala/examples/SReadNamedTable.scala">SReadNamedTable.scala</a></td> <td> <p>Instead uses the FlexibleRowDataSource to infer the schema of a specified table and query it, again sequentially, again resulting in a Dataset with a single partition.</p> </td> </tr> <tr> <td><a href="src/main/java/examples/JReadParallel.java">JReadParallel.java</a><br/> <a href="src/main/scala/examples/SReadParallel.scala">SReadParallel.scala</a></td> <td> <p>Uses the ParallelRowDataSource to infer the schema of a specified table and query it, this time in parallel, resulting in Datasets with multiple partitions. The example shows both taking the default number of partitions and specifying a partition count.</p> </td> </tr> <tr> <td><a href="src/main/java/examples/JReadPartitionAware.java">JReadPartitionAware.java</a><br/> <a href="src/main/scala/examples/SReadPartitionAware.scala">SReadPartitionAware.scala</a></td> <td> <p>Uses the PartitioningRowDataSource to avoid a shuffle in a grouping/aggregation query against a table that is clustered ont he grouping column. It achieves this by using the SupportsReportPartitioning mixin for the DataSourceReader interface.</p> </td> </tr> <tr> <td><a href="src/main/java/examples/JReadPartitionAware_Mismatch.java">JReadPartitionAware_Mismatch.java</a><br/> <a href="src/main/scala/examples/SReadPartitionAware_Mismatch.scala">SReadPartitionAware_Mismatch.scala</a></td> <td> <p>This uses the same data source as the previous example but doesn't cluster the table, thus illustrating the shuffle that takes place. .</p> </td> </tr> </table>

Read/write data sources

<table> <tr><th>File</th><th>What's Illustrated</th></tr> <tr> <td><a href="src/main/java/examples/JReadWriteParallel.java">JReadWriteParallel.java</a><br> <a href="src/main/scala/examples/SReadWriteParallel.scala">SReadWriteParallel.scala</a> </td> <td> <p>This illustrates updates using the simplest update-capable data source example, the ParallelRowReadWriteDataSource.</p> <p>First a dataframe is created that is used to populate a table for the first time. At that point the newly created table's database schema is calculated from the dataframe schema. Notice that even though we create a dataframe with 6 partitions, later when we read from the table we always obtain dataframes with 4 partitions. This is because all tables in ExampleDB advertise 4 partitions by default, and we would have to override that default when reading to obtain different partitioning. However, the partitioning of the dataframe DOES impact update parallelism -- notice from the log output that six tasks write to six temporary tables -- and these would have run in parallel had we not specified only 4 executors as we do in all these examples.</p> <p>We then put all four settings of SaveMode through their paces and see their impact.</p> </td> </tr> </table>

Logging

Consider adjusting the log levels in src/main/resources/log4j.properties to adjust verbosity as needed.

Notice that the data sources and the ExampleDB components both have entries there.