In today's data-driven market, organizations are collecting data points round the clock about people, processes, and technology systems, resulting in enormous amounts of actionable information. Collecting and storing these large volumes of data is just the start. The real challenge lies in figuring out a way to process these large swathes of data in a cost efficient and timely manner to achieve the most meaningful insights from them.
Since throwing money at the problem is never a solution, it is of utmost importance that we optimize every stage of our data pipelines to run as efficiently as possible. This article discusses some of the optimizations that we can incorporate when developing our HIVE jobs. These can broadly be categorized into 2 categories, Table level and Query level optimizations.
Table level optimization
These are proactive optimizations that are made when creating the tables themselves. Keeping these ideas in mind before developing table schemas can significantly help improve performance. Few techniques that can be used for table level optimization are as below:
Partitioning is the data sharding scheme that we can use to break the data set into smaller and more clustered chunks such that queries are able to run more efficiently.
For instance, consider an “Orders” table that an e-commerce website might use to store order data across India. Let’s assume this data set is 500MB in size and is laid out on a 4-node cluster. When the SQL query SELECT * FROM Orders WHERE State=’Karnataka’ is run, data would be scanned from every single node to filter out the requested condition. A proverbial needle in the haystack operation. As you can imagine, this becomes more and more challenging when the size of the table grows and the frequency of such queries increases.
Whereas, if we partition the table on the basis of State, we would end up with a dataset with 29 partitions (one for each state in India). Each partition holds data from a given state and the on disk layout may look something like so:
In this scheme, the exact same query can then be served without filtering through the entire 500MB and can be served by a single node even. Since the querying is done in a near optimal fashion, even as the size of data increases, the time to run the query does not increase as much since we are doing a lot less wasted work.
Few guidelines while using partitioning:
- Use on tables that are most commonly queried to get the most benefit
- Check the “WHERE” clause of such queries to determine the partitioning column and if that column is being filtered frequently or not, to evaluate the cost benefit
An exception to the rule is when the frequently used query is on a column with high cardinality (such as the primary key or any numbered identifier). For instance, if we employ this technique on a column such as “CustomerId”, then we would end up creating way too many partitions for this technique to be viable
If the frequently used query targets a column with a high cardinality (where in a regular partitioning scheme would not be ideal), we usually go with Bucketing. In this technique, we create multiple buckets (or files) to bin the high cardinality values, effectively reducing its cardinality. The number of buckets to bin into is determined based on experimentation and trials based on the query patterns.
To better illustrate, this technique, let’s take the degenerate case of optimizing queries on the “CustomerId” column. Let’s assume that this column has 1 Million distinct customer ids and we wish to bin them into 10 buckets. We could bin all customer ids based on the last digit and CustomerId=12 would land in bucket 2 and CustomerId=819 would land in bucket 9. While this is a simplistic example, Hive uses internal hashing technique to distribute these data into buckets.
At the time of the query, the bucket where our record lies is calculated and data is fetched from there instead of scanning the entire column.
3. Partitioning with Bucketing
In some cases, we might chose to employ both techniques together. In such a mode, data is partitioned on one column and bucketed on another column. Each partition creates a directory inside of which we have files representing buckets. Queries then navigate this file structure to effectively retrieve the data.
Query level optimization
While the previous techniques optimize data retrieval by optimizing how the data is store, these techniques do so by optimizing how the stored data is queried. Before getting to query level optimization techniques, it is necessary to understand MapReduce. This is a data processing framework that the Apache Hadoop engine implements. Data processing jobs in this model are written in two phases:
- Map - Involves reading a chunk of data and applying one or more transformations before emitting it out to the next stage
- Reduce - Collecting transformed data based on a common key and often coalescing data into a desired output
Reduce phase is expensive as it involves shuffling i.e., movement of data from mapper machine to reducer machine and sorting i.e., the output coming from mapper is sorted in ascending order to be coalesced.
A SQL “JOIN” gets translated under the hood into a MapReduce job, and if we were to eliminate the reduce phase, the job runs faster and therefore improves query time.
Few frequently used query level optimizations are:
1. Map side join
A technique in which the smaller table being JOIN’ed is copied over to each Mapper node. This eliminates the need for the Reduce phase altogether since the entire job is handled by the Mapper. These JOINs are very performant since shuffle and sort stages are eliminated. This also removes network bottlenecks associated with these stages and allows for the cluster to scale better.
Few guidelines when using map side join:
- When two tables are involved, one table has to be small in size and one table may be big in size
- When multiple tables are involved, only one table may be big and the rest should be small
In map side join, bigger table is distributed among different nodes and smaller table is copied in each node. If one of the tables isn’t small, we should not go with this optimization technique as it then becomes inefficient to perform these copies.
2. Bucket map join
This technique is ideal if we have 2 big tables. For this to work, both the tables should be bucketed on join columns. And number of buckets in one table has to be integral multiple of number of buckets in another table.
3. Sort merge bucket join
Few conditions to remember when we go with sort merge bucket:
- Number of buckets in both tables should exactly match
- Both the table should be bucketed and sorted on the JOIN column
There will be one to one mapping between buckets in both tables and a quick joining can be performed as both the buckets have sorted data.
4. Using window function
Window functions should be used wherever feasible as it saves time on processing.
To improve the performance of operations like scans, aggregations, filters and joins, we use vectorized query execution. These operations are then executed in batches of 1024 rows at once instead of a single row each time and on modern processors this can often result in a meaningful performance boost.
Few other optimization techniques that can keep be considered while optimizing the code are:
- Using of compression techniques like snappy, bzip, gzip, lzo , etc. to reduce the data footprint and thereby allow faster data retrieval. Though it should be noted that data savings come at the cost of increased compute resource usage
- Using file formats like parquet, orc, avro, etc. based on our requirement.
- Choosing spark or tez engine over map reduce in Hive.
While data analysis has been democratized over the years, in order to differentiate our businesses and get an edge in the market, we need to build data pipelines that are not only capable of handling scale but are as efficient as possible. Use of optimization techniques has become a mandate to build a scalable, cost-effective and efficient data driven business. Having a partner with extensive knowledge working on large scale data systems and a proven track record of success is therefore imperative on this journey.