05/21/20

Lessons Learned for Efficiently Ingesting Data into a Dake Lake

By: Edward Wiener, Architect

One of our clients in the financial services space uses SimCorp Dimension as its accounting system of record for booking transactions, positions, portfolios, and security instruments. Given the centrality of SimCorp in the company’s data lake strategy, it was imperative to develop an efficient, robust technique to ingest SimCorp data into the data lake.

The Environment

To alleviate potential stress on a production system, the SimCorp team extracts data twice daily from Dimension into an Oracle-based data warehouse. A file generated on completion of the data warehouse load triggers the ingestion process into the data lake, which is hosted on a production cluster running the Hortonworks Data Platform (HDP) installed on Azure VMs. The SimCorp load comprises roughly 100 dimensional and fact tables of differing size and significance to the downstream data lake consumers.

Technology Choices

Given that the SimCorp pipeline described above represented a classic pattern of ingesting data from a relational database into Hadoop, we considered several different solutions to this problem. One possible solution was to make use of Apache NiFi, bundled in our client’s Hortonworks environment as HDF. NiFi is a kind of Swiss Army knife of out-of-the-box components for automating data flow between different systems and is certainly up to the task of moving data between an RDBMS system and Hadoop. NiFi also allows for some degree of parallelism, especially on a multi-node cluster, but the large number of SimCorp tables and the relatively large size of a few tables that our ingestion process would have to target made the Hadoop-native combination of Sqoop and Oozie a more attractive choice.

For each table, Sqoop gives us the ability to open multiple concurrent database connections, known as mappers, to ingest several chunks of the same table in parallel. This feature, combined with Oozie’s ability to run multiple concurrent Sqoop jobs, provides a degree of parallelism limited only by the number of concurrent database connections supported by the SimCorp data warehouse.

Lessons Learned

Once the ingestion pattern was designed, we went through iterations of a redesign, resulting in incremental improvement in performance. These stages of development and customer support allowed us to learn a few lessons, among which were the following:

Ingest data incrementally

A well-designed relational database schema should include a field to persist the record’s last change timestamp. Provided that the timestamp in this field is reliable, we should always strive to ingest data incrementally instead of repopulating the entire target table in the data lake.

Use columnar data lake storage

This statement may be a cliché, but it is one for a reason. Columnar storage, be it ORC or Parquet, lends itself better to analytics queries, offering a performance increase of 5x or more to downstream data transformations. Because Parquet is more popular than ORC, Sqoop can write Parquet files natively. Using Sqoop with ORC requires more work; a possible option is either to use the HCatalog or to ingest text data into HDFS and then convert the result set into ORC. We opted for the second approach.

Avoid SELECT *

Even when it makes sense to ingest all columns into the data lake (and it probably does), avoid doing so with SELECT *. A new column may be added unexpectedly in the middle of the source schema, resulting in a SELECT * ingestion query that returns more columns than have been defined in the target table, leading to ingestion failures. An alternative option is to name all columns in the SELECT statement, which results in less aesthetically pleasing queries but more stable results.

Exploit parallelism

Parallelism is critical to a low-latency ingestion process, but knowing how much parallelism to use can be challenging. One factor to consider is that Sqoop, by default, uses four mappers. This means that each Sqoop import will spin up four concurrent MapReduce jobs. For small source tables, the upfront costs of requesting containers from Yarn will almost certainly exceed the benefit of concurrency, so the lesson should be that data analysis should be used to determine the number of mappers assigned to each table. In short, it is best to use one mapper for small tables and more than one for larger tables. The performance of our ingestion benefited when we reduced the number of mappers for small tables to one and increased the mapper count for the few bottleneck fact tables.

As discussed above, there is another opportunity to exploit parallelism at the Oozie workflow level. When possible, it makes sense to kick off several concurrent Sqoop import jobs using Oozie’s fork/join construct, as seen in the example below:

<fork name=”sqoopIngestionWave1″>

<path start=”sqoopTable1″/>

<path start=”sqoopTable2″/>

<path start=”sqoopTable3″/>

</fork>

How does one identify the optimal number of concurrent Sqoop import jobs? It is essential to keep in mind that because each mapper is a distinct JDBC connection to the source database, the more mappers and concurrent actions in an Oozie fork, the more concurrent connections to the source system will be required. If too much concurrency is used, the performance and stability of the source system may suffer. After assigning mappers to each table based on expected data size, we grouped the tables into concurrent ingestion waves based on the goal of maintaining 60 connections to the SimCorp data warehouse in each wave.

Consider writing custom Sqoop boundary queries

Related to the discussion of parallelism is how Sqoop uses boundary queries. To parallelize ingestion from a table, Sqoop splits the table into pieces based on the desired number of mappers. Each of those pieces is then ingested by its own MapReduce job. To identify which source records should be ingested in which concurrent job, Sqoop runs a boundary query that wraps the ingestion query provided to it and divvies up the results across the mappers assigned to carry out the ingestion. While this default boundary value mechanism works out of the box, even a small tweak in the boundary query can yield performance gains.

Use partitioning prudently

Given that downstream of ingestion, a transformation process will curate the data newly ingested into the data lake into a form more useful to consumers; a prudent strategy is to partition the largest target tables in the data lake, especially fact tables, by date. While partitioning will not improve the performance of ingestion, it will allow the transformations downstream to use Hive or Spark partition pruning to avoid performing a full-table scan of the ingested table.

Develop an optimal ingestion schedule

Of the roughly 100 SimCorp tables identified for ingestion into the data lake, only 35 were required downstream, while ingesting the remaining 65 into the data lake was deemed a nice-to-have. Given this, we decided to include only the required 35 tables in the daily batch jobs while deprioritizing the remaining tables into a separate Oozie workflow scheduled to run on Sunday morning. As a result, both objectives were achieved – the nonessential tables continue to be regularly ingested into the data lake while the performance of the critical weekday jobs improved substantially.

Leave a Reply