Sunday, 9 June 2019

All about partitioning


All about partitioning

Dataflows is an ETL feature option in Azure Data Factory. Dataflow can express any data extract, transform/translate and load intent using a configuration and metadata driven approach. Copy activity handles ingestion of data into Azure data stores. Pipelines are instrumental in orchestrating these activities to complete the full data integration cycle.

Let’s cover what data partitioning in dataflow by a Q&A method.
1.     You: What is partitioning and what does it benefit me?
Ninja: Partitioning is the way to split the data during read, transform and write to sink operations. It helps your dataflows run more optimally, exploit the compute to the maximum and minimize your load cycles

2.     You: I already have physical partitions in my SQL database. Is that not enough?
Ninja: Not quite but it can help. You can use your table partitions to partition your dataflow accordingly. You can have a higher grain of control over type of partitioning and number of partitions within dataflow

3.     You: I see I can partition at the source, every transform and then at the sink. Do I need to configure all of them to get any benefit? 
Ninja: Depends. For the best benefit you need to start at your source. Make sure your source is adequately partitioned. Let’s look at how dataflow works when you have 16 partitions on the source on a cluster with 4 cores. Derive and Sink are left unspecified.

1
5
9
13
2
6
10
14
3
7
11
15
4
8
12
16

       Since there are 4 cores, a partition is assigned to a core. So batches of 4 partitions (1-4, 5-8, 9 -12, 13-16) are processed at a time. Since there is no further partitioning specified at the derive or sink the 16 partitions are carried through to the sink. If this was a file sink, you will see 16 files in the output folder. If it was a SQL source, 4 partitions will be read at any time and concurrent reads will happen in 4 connections. If it was a SQL sink, 16 partitions will write to the table though only 4 partitions will be writing at a time.

4.     You: Can’t dataflow do partitioning at the source automatically? 
Ninja: For most file sources, there is automatic partitioning that happens. Depending on the storage type, it can range from 64mb – 256mb splits for large files. It can also be dependent on the number of files. Parquet format is most efficient for this.
 However, for SQL sources, there is no inherent partitioning available. You are required to explicitly go to the optimize tab and choose ‘Source’ partitioning method.

5.     You:  Are you telling me if I don’t partition the SQL source everything will be read into 1 partition?
Ninja: Yes. Even though it may work by spill to disk on the cluster for smaller to medium tables, it will suffer from performance issues or failed jobs due to out of memory.

6.     You:  You were talking cores for partitioning where did memory come in?
Ninja: To determine partitioning your equation needs to consider
a.     Size of input data
b.     Size of output data
c.     Number of cores
d.     Amount of memory

  As a simplistic rule, you should fit a partition in memory. That’s where dataflow performance shines. Bear in mind on Azure compute, 4 cores are on one node and if you have 14GB of RAM, all the 4 partitions running on them will be sharing the memory space. Memory footprint of these partitions may not all be equal.

7.     You:  You said cores gets nicely split up for partitions, but memory does not? How?
Ninja: A variety of reasons and it has to with data skew. Your original source partitions may not exactly yield equal sized partitions. This can happen with uneven file sizes or suboptimal partitioning on your SQL sources.

8.     You:  If I do a good job on my source partitioning, I should be good on skew right?
Ninja: Not in all cases. You may introduce skew by filter or split transform where partitions can be largely empty. Likewise, row shrinking transforms like aggregate can vastly reduce the number of partitions after a shuffle.

9.     You:  Shuffle. What’s that? A card game?
Ninja: Works like a card shuffle. Let’s see an example where a shuffle can happen.

        In the example of with derive in Q.3, the execution reads 1 partition, create a derived field and writes it to the sink. 16 Tasks are created which do this for their partition. In the dataflow monitor you will see this group of transformations as  1 stage.
      However, for the dataflow with aggregate, the tasks cannot independently work by themselves because they need data from other partitions! That’s where the shuffle comes in.
Let’s say you had group by “country”. All data for a single country is shuffled from different partitions onto one partition. Hence this dataflow runs in 2 stages.

10.  You:  What do I do to avoid skewed partitions?
Ninja: Understanding your data and your logic in the dataflow is a good start. Monitoring statistics show you how much data ran in a partition and how much time it took.

11.  You:  I don’t understand the partitioning schemes available. When do I use what?
Ninja: These are the partitioning methods on any transform and what they mean.

a.     Use current partitioning – Keep whatever the previous transformation had
b.     Single partition – Bring everything together into 1 single transformation. Do NOT use this unless your data is very small.
c.     Round Robin – This will equally distribute data across every transformation so there will be no skew at all. It works the fastest but is not advisable if you are going to be going to do some aggregation after. 
d.     Hash – It hashes on a set of columns and places data in partitions accordingly. This will cause a shuffle. Pick columns that give you best distribution
e.     Dynamic Range – Dynamic range partitions the data after sorting it based on the columns. If you have too much of data for one value, you will get skewed partitions. Date ranges are a good candidate for dynamic range
f.      Fixed Range – When you know the exact start and end range you can write an expression to split the data yourself. You write an expression for every partition. Any data that does not fall in the conditions specified is automatically placed in the last partition.
g.     Key partitioning – Every unique value of the key(s) specified will fall into a partition. Say you pick product code and there are 80 unique products. You will get 80 partitions. That’s why you don’t specify the number of partitions in this method. It is the most dynamic of them all. Do NOT use this with unique value columns since you will get a partition for every row!

12.  You:  If I create 1000s of partitions, I should be good right? 
Ninja: Not quite. Make sure your partitions are of adequate size not extremely tiny or extremely big. If you had 10s of rows in every partition, your overall dataflow will run much slower since too many tasks need to be scheduled. It gets worse when a shuffle needs to happen. On the contrary too large partitions makes it not fit in memory. 
This is where the IR compute you create matters. Large cluster sizes-256 core come with 64GB of RAM and are good to hold large partitions.

13.  You:  Any pointers on SQL source and sink? 
Ninja: Explicitly partition your SQL source.


Picking a column will generate a SQL query hash partitioning based on this column.
You can enter a SQL query condition to split data. Use this option to address a physical table partition. Bear in mind depending on how many partitions are active (4 in the example), you will have as much concurrent database connections. Size up your database to accept as many connections and keep your connection alive.
For the SQL Sink the same applies. If you are moving from files which create lots of partitions and if you are using a large compute, potentially lots of partitions can be active. This means too many concurrent sessions writing to database. This is one place where you can use hash partitioning to make fewer partitions.

14.  You:  Does this mean I am now a dataflow partitioning ninja?
Ninja: Not so fast. Now you know how to hold the sword right :-)


No comments:

Post a Comment