# Data Chunking with Dask

In this notebook we demonstrate:

* Xarray + Dask
* NetCDF file Chunks versus Dask Chunks
* chunk shapes

The following material uses Coupled Model Intercomparison Project (CMIP6) collections. Please see the data collection [catalogue](https://geonetwork.nci.org.au/geonetwork/srv/eng/catalog.search#/metadata/f6600_2266_8675_3563) and [CMIP6 terms of use](https://pcmdi.llnl.gov/CMIP6/TermsOfUse/TermsOfUse6-1.html) for more information. 


---

slightly adapted version from: https://github.com/NCI-data-analysis-platform/climate-cmip 
- Original Authors: NCI Virtual Research Environment Team
- Keywords: CMIP6, Xarray, Dask, Chunks
- Creation Date: 2019-June; Updated: 2020-May
---
- Adaptation for DKRZ data pool: S. Kindermann

### Load the required modules

In [None]:
import xarray as xr
import netCDF4 as nc
import time
%matplotlib inline

### Data

We will use precipitation data from SSP5-85 from the ACCESS-CM2 model in this example. Let's take a look at some of the data:

In [None]:
# On Gadi: netcdf module must be loaded

!ncdump -hst '/pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231.nc'

In [None]:
# Outside of Gadi, access via THREDDS

!ncdump -hst 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231.nc'

### Xarray + Dask

Xarray can automatically wrap its data into Dask arrays. This capability turns Xarray into an extremely powerful tool when working with big earth science datasets. 

To see this in action, we will download a fairly large dataset to analyze. We use Xarray's `open_mfdataset` to allow multiple files to be opened simultaneously.

In [None]:
# On DKRZ system

!ls /pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108
path = '/pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/*'

In [None]:
f_ssp585 = xr.open_mfdataset(path, combine='by_coords')
f_ssp585

<div class="alert alert-warning">
<b>NOTE:</b> the values are not displayed, since that would trigger computation.
</div>

### Chunks

Notice that it says:`pr(time, lat, lon) float32 dask.array<chunksize=(18263, 144, 192), meta=np.ndarray>`. There is now the `chunksize` component. The data array also becomes a Dask array.

The chunking of the array comes from the integration of Dask with Xarray. Dask divides the data array into small pieces called "chunks", with each chunk designed to be small enough to fit into memory. 

The file itself may be already chunked. Filesystem chunking is available in netCDF-4 and HDF5 datasets. The CMIP6 data should all be netCDF-4 and include some form of chunking for each file.

#### Looking at the file metadata in the "Data" section above, we see in this case the file is chunked such that  
#### `pr:_ChunkSizes = 1, 144, 192 ;`

Here we see that the data is chunked in space but not time, where one chunk is one time-step and all points in lat-lon.

![](../../data/chunks.png)
image source: https://www.unidata.ucar.edu/blogs/developer/en/entry/chunking_data_why_it_matters

Consider 2 types of data access
1. Accessing a 2D lat-lon slice in time (RHS figure)
2. Accessing a time series at a single lat-lon point (LHS figure)

With the chunking above, the first type of data access only requires access to a single chunk, while the second type needs to access ALL the chunks of the data array regardless. This dataset will be fastest for 2D lat-lon single time-step data access.

In general, even without chunking - when the data is accessed contiguously (by index order) - time is the slowest variable to access, then y, with x being the fastest. With the chunking method of this CMIP6 dataset, time still remains the slowest variable. More uniform variable access speeds would require more evenly spaced chunks.

### Exercise

Time how long it takes to load the precipitation data at `time='2015-01-01'` and then time how long it takes to load the data at `lat=0` and `lon=180` (remember to use `method='nearest'` for the latter case). How much difference in time is there when using these different access methods?


In [None]:
%%time
f_ssp585.pr.sel(time='2015-01-01').load()

In [None]:
%%time
f_ssp585.pr.sel(lat=0,lon=180,method='nearest').load()

### The same volume of data can take orders of magnitude longer to load

The spatial dataset contained 27648 data-points and took in the order of 100ms to load. The time-series dataset had 31411 data-points and took order 10,000 ms to load.

<div class="alert alert-info">
<b>NOTE:</b> If you look at the dashboard, the task stream actually shows that the most time consuming part is data concatenation. 
</div>

Chunking and the ways in which data is read is important when considering both how you access the data and if you wish to parallelise your code.

### NetCDF file Chunks versus Dask Chunks

Keep in mind, Dask chunking is different to chunking of the stored data. As we saw in our example, the stored data was chunked with chunks of size (1,144,192) whereas the Dask array had a chunk size of (18263, 144, 192). It's possible to change the chunking size in the Dask array. In the example below, we are specifying that there are 730 chunks in time.

In [None]:
f_ssp585 = xr.open_mfdataset(path,chunks={'time':730}, combine='by_coords')

f_ssp585

### How big do you make your chunks?

The rule of thumb for Dask chunks is that you should "create arrays with a minimum chunksize of at least one million elements":  http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance

NetCDF4 chunks are often a lot smaller than Dask array chunks. The minimum chunksize exists because if you have too many chunks, queuing of operations when parallelising will be slow. If the chunk sizes are too large, computation and memory can be wasted. The default chunks from dask gave us chunks of size (18263, 144, 192) or around 500 million elements so we could try reducing those chunks if needed. The larger the array, the larger the cost of queueing and therefore larger chunks may be needed.

#### IMPORTANT: Whatever Dask array chunks you use, make sure they align with the netCDF4 file chunks!!

So far our chunks have been in time, and the netCDF4 file is also chunked in time. If we tried to use dask chunks to optimise the time-series loading of data, it would not help! 

#### Exercise

Try to load the data in with chunks size `(31411,180,1)` (i.e. chunked in lon) and name that file `f_bad_chunk`. Next, try reloading the time series of pr at `lat=0` and `lon=180` and time how long this takes.

In [None]:
f_bad_chunk = xr.open_mfdataset(path,chunks={'time':31411,'lat':180,'lon':1}, combine='by_coords')

Try running your previous code for `f_bad_chunk` again loading the time series of pr at `lat=0` and `lon=180` and time how long it takes if you scale up or down the number of workers. 

Do the same with the original chunking method of `f_ssp585` and see if there is a difference.

We will use three different schedulers to read the data. First, let's initiate a client.

In [None]:
# If you run this notebook on your local computer or NCI's VDI instance, you can create cluster
from dask.distributed import Client
client = Client()
print(client)

In [None]:
# If you run this notebook on Gadi under pangeo environment, you can create cluster using scheduler.json file
from dask.distributed import Client, LocalCluster
client = Client()
print(client)

<div class="alert alert-info">
<b>Warning: Please make sure you specify the correct path to the scheduler.json file within your environment.</b>  
</div>

Starting the Dask Client will provide a dashboard which is useful to gain insight into the computation. The link to the dashboard will become visible when you create the Client. We recommend having the Client open on one side of your screen and your notebook open on the other side, which will be useful for learning purposes.

In [None]:
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='synchronous')

In [None]:
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='threads')

In [None]:
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='processes')

### Poor chunking with dask can make your performance worse!

As you can see, bad chunks and the alignment of the chunks slow down the I/O performance significantly. They are both  important to keep in mind when creating Dask chunks. 

### Close the client

Before moving on to the next exercise, make sure to close your client or stop this kernel.

In [None]:
client.close()

### Summary

This example shows how to make data chunking with Dask. 

For further information regarding Dask, please see: https://docs.dask.org/en/latest/