Data Chunking with Dask#

In this notebook we demonstrate:

  • Xarray + Dask

  • NetCDF file Chunks versus Dask Chunks

  • chunk shapes

The following material uses Coupled Model Intercomparison Project (CMIP6) collections. Please see the data collection catalogue and CMIP6 terms of use for more information.


slightly adapted version from: https://github.com/NCI-data-analysis-platform/climate-cmip

  • Original Authors: NCI Virtual Research Environment Team

  • Keywords: CMIP6, Xarray, Dask, Chunks

  • Creation Date: 2019-June; Updated: 2020-May


  • Adaptation for DKRZ data pool: S. Kindermann

Load the required modules#

import xarray as xr
import netCDF4 as nc
import time
%matplotlib inline

Data#

We will use precipitation data from SSP5-85 from the ACCESS-CM2 model in this example. Let’s take a look at some of the data:

# On Gadi: netcdf module must be loaded

!ncdump -hst '/pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231.nc'
netcdf pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231 {
dimensions:
	time = UNLIMITED ; // (18263 currently)
	lat = 144 ;
	lon = 192 ;
	bnds = 2 ;
variables:
	double time(time) ;
		time:bounds = "time_bnds" ;
		time:units = "days since 1850-01-01" ;
		time:calendar = "proleptic_gregorian" ;
		time:axis = "T" ;
		time:long_name = "time" ;
		time:standard_name = "time" ;
		time:_Storage = "chunked" ;
		time:_ChunkSizes = 1 ;
		time:_Endianness = "little" ;
	double time_bnds(time, bnds) ;
		time_bnds:_Storage = "chunked" ;
		time_bnds:_ChunkSizes = 1, 2 ;
		time_bnds:_DeflateLevel = 1 ;
		time_bnds:_Endianness = "little" ;
	double lat(lat) ;
		lat:bounds = "lat_bnds" ;
		lat:units = "degrees_north" ;
		lat:axis = "Y" ;
		lat:long_name = "Latitude" ;
		lat:standard_name = "latitude" ;
		lat:_Storage = "contiguous" ;
		lat:_Endianness = "little" ;
	double lat_bnds(lat, bnds) ;
		lat_bnds:_Storage = "chunked" ;
		lat_bnds:_ChunkSizes = 144, 2 ;
		lat_bnds:_DeflateLevel = 1 ;
		lat_bnds:_Endianness = "little" ;
	double lon(lon) ;
		lon:bounds = "lon_bnds" ;
		lon:units = "degrees_east" ;
		lon:axis = "X" ;
		lon:long_name = "Longitude" ;
		lon:standard_name = "longitude" ;
		lon:_Storage = "contiguous" ;
		lon:_Endianness = "little" ;
	double lon_bnds(lon, bnds) ;
		lon_bnds:_Storage = "chunked" ;
		lon_bnds:_ChunkSizes = 192, 2 ;
		lon_bnds:_DeflateLevel = 1 ;
		lon_bnds:_Endianness = "little" ;
	float pr(time, lat, lon) ;
		pr:standard_name = "precipitation_flux" ;
		pr:long_name = "Precipitation" ;
		pr:comment = "includes both liquid and solid phases" ;
		pr:units = "kg m-2 s-1" ;
		pr:cell_methods = "area: time: mean" ;
		pr:cell_measures = "area: areacella" ;
		pr:history = "2019-11-08T10:45:49Z altered by CMOR: replaced missing value flag (-1.07374e+09) with standard missing value (1e+20)." ;
		pr:missing_value = 1.e+20f ;
		pr:_FillValue = 1.e+20f ;
		pr:_Storage = "chunked" ;
		pr:_ChunkSizes = 1, 144, 192 ;
		pr:_DeflateLevel = 1 ;
		pr:_Endianness = "little" ;

// global attributes:
		:Conventions = "CF-1.7 CMIP-6.2" ;
		:activity_id = "ScenarioMIP" ;
		:branch_method = "standard" ;
		:branch_time_in_child = 60265. ;
		:branch_time_in_parent = 60265. ;
		:creation_date = "2019-11-08T10:45:50Z" ;
		:data_specs_version = "01.00.30" ;
		:experiment = "update of RCP8.5 based on SSP5" ;
		:experiment_id = "ssp585" ;
		:external_variables = "areacella" ;
		:forcing_index = 1 ;
		:frequency = "day" ;
		:further_info_url = "https://furtherinfo.es-doc.org/CMIP6.CSIRO-ARCCSS.ACCESS-CM2.ssp585.none.r1i1p1f1" ;
		:grid = "native atmosphere N96 grid (144x192 latxlon)" ;
		:grid_label = "gn" ;
		:history = "2019-11-08T10:45:50Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards." ;
		:initialization_index = 1 ;
		:institution = "CSIRO (Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia), ARCCSS (Australian Research Council Centre of Excellence for Climate System Science)" ;
		:institution_id = "CSIRO-ARCCSS" ;
		:mip_era = "CMIP6" ;
		:nominal_resolution = "250 km" ;
		:notes = "Exp: CM2-ssp585; Local ID: bk786; Variable: pr ([\'fld_s05i216\'])" ;
		:parent_activity_id = "CMIP" ;
		:parent_experiment_id = "historical" ;
		:parent_mip_era = "CMIP6" ;
		:parent_source_id = "ACCESS-CM2" ;
		:parent_time_units = "days since 1850-01-01" ;
		:parent_variant_label = "r1i1p1f1" ;
		:physics_index = 1 ;
		:product = "model-output" ;
		:realization_index = 1 ;
		:realm = "atmos" ;
		:run_variant = "forcing: GHG, Oz, SA, Sl, Vl, BC, OC, (GHG = CO2, N2O, CH4, CFC11, CFC12, CFC113, HCFC22, HFC125, HFC134a)" ;
		:source = "ACCESS-CM2 (2019): \n",
			"aerosol: UKCA-GLOMAP-mode\n",
			"atmos: MetUM-HadGEM3-GA7.1 (N96; 192 x 144 longitude/latitude; 85 levels; top level 85 km)\n",
			"atmosChem: none\n",
			"land: CABLE2.5\n",
			"landIce: none\n",
			"ocean: ACCESS-OM2 (GFDL-MOM5, tripolar primarily 1deg; 360 x 300 longitude/latitude; 50 levels; top grid cell 0-10 m)\n",
			"ocnBgchem: none\n",
			"seaIce: CICE5.1.2 (same grid as ocean)" ;
		:source_id = "ACCESS-CM2" ;
		:source_type = "AOGCM" ;
		:sub_experiment = "none" ;
		:sub_experiment_id = "none" ;
		:table_id = "day" ;
		:table_info = "Creation Date:(30 April 2019) MD5:e14f55f257cceafb2523e41244962371" ;
		:title = "ACCESS-CM2 output prepared for CMIP6" ;
		:variable_id = "pr" ;
		:variant_label = "r1i1p1f1" ;
		:version = "v20191108" ;
		:cmor_version = "3.4.0" ;
		:tracking_id = "hdl:21.14100/1cade23c-cf5e-4d0e-96f9-4128cd729af7" ;
		:license = "CMIP6 model data produced by CSIRO is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License (https://creativecommons.org/licenses/).  Consult https://pcmdi.llnl.gov/CMIP6/TermsOfUse for terms of use governing CMIP6 output, including citation requirements and proper acknowledgment.  Further information about this data, including some limitations, can be found via the further_info_url (recorded as a global attribute in this file).  The data producers and data providers make no warranty, either express or implied, including, but not limited to, warranties of merchantability and fitness for a particular purpose. All liabilities arising from the supply of the information (including any liability arising in negligence) are excluded to the fullest extent permitted by law." ;
		:_NCProperties = "version=2,netcdf=4.6.2,hdf5=1.10.5" ;
		:_SuperblockVersion = 0 ;
		:_IsNetcdf4 = 0 ;
		:_Format = "netCDF-4 classic model" ;
}
# Outside of Gadi, access via THREDDS

!ncdump -hst 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231.nc'
netcdf pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231 {
dimensions:
	time = UNLIMITED ; // (18263 currently)
	bnds = 2 ;
	lat = 144 ;
	lon = 192 ;
variables:
	double time(time) ;
		time:bounds = "time_bnds" ;
		time:units = "days since 1850-01-01" ;
		time:calendar = "proleptic_gregorian" ;
		time:axis = "T" ;
		time:long_name = "time" ;
		time:standard_name = "time" ;
		time:_ChunkSizes = 1 ; // "1850-01-02"
	double time_bnds(time, bnds) ;
		time_bnds:_ChunkSizes = 1, 2 ;
	double lat(lat) ;
		lat:bounds = "lat_bnds" ;
		lat:units = "degrees_north" ;
		lat:axis = "Y" ;
		lat:long_name = "Latitude" ;
		lat:standard_name = "latitude" ;
	double lat_bnds(lat, bnds) ;
		lat_bnds:_ChunkSizes = 144, 2 ;
	double lon(lon) ;
		lon:bounds = "lon_bnds" ;
		lon:units = "degrees_east" ;
		lon:axis = "X" ;
		lon:long_name = "Longitude" ;
		lon:standard_name = "longitude" ;
	double lon_bnds(lon, bnds) ;
		lon_bnds:_ChunkSizes = 192, 2 ;
	float pr(time, lat, lon) ;
		pr:standard_name = "precipitation_flux" ;
		pr:long_name = "Precipitation" ;
		pr:comment = "includes both liquid and solid phases" ;
		pr:units = "kg m-2 s-1" ;
		pr:cell_methods = "area: time: mean" ;
		pr:cell_measures = "area: areacella" ;
		pr:history = "2019-11-08T10:45:49Z altered by CMOR: replaced missing value flag (-1.07374e+09) with standard missing value (1e+20)." ;
		pr:missing_value = 1.e+20f ;
		pr:_FillValue = 1.e+20f ;
		pr:_ChunkSizes = 1, 144, 192 ;

// global attributes:
		:Conventions = "CF-1.7 CMIP-6.2" ;
		:activity_id = "ScenarioMIP" ;
		:branch_method = "standard" ;
		:branch_time_in_child = 60265. ;
		:branch_time_in_parent = 60265. ;
		:creation_date = "2019-11-08T10:45:50Z" ;
		:data_specs_version = "01.00.30" ;
		:experiment = "update of RCP8.5 based on SSP5" ;
		:experiment_id = "ssp585" ;
		:external_variables = "areacella" ;
		:forcing_index = 1 ;
		:frequency = "day" ;
		:further_info_url = "https://furtherinfo.es-doc.org/CMIP6.CSIRO-ARCCSS.ACCESS-CM2.ssp585.none.r1i1p1f1" ;
		:grid = "native atmosphere N96 grid (144x192 latxlon)" ;
		:grid_label = "gn" ;
		:history = "2019-11-08T10:45:50Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards." ;
		:initialization_index = 1 ;
		:institution = "CSIRO (Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia), ARCCSS (Australian Research Council Centre of Excellence for Climate System Science)" ;
		:institution_id = "CSIRO-ARCCSS" ;
		:mip_era = "CMIP6" ;
		:nominal_resolution = "250 km" ;
		:notes = "Exp: CM2-ssp585; Local ID: bk786; Variable: pr ([\'fld_s05i216\'])" ;
		:parent_activity_id = "CMIP" ;
		:parent_experiment_id = "historical" ;
		:parent_mip_era = "CMIP6" ;
		:parent_source_id = "ACCESS-CM2" ;
		:parent_time_units = "days since 1850-01-01" ;
		:parent_variant_label = "r1i1p1f1" ;
		:physics_index = 1 ;
		:product = "model-output" ;
		:realization_index = 1 ;
		:realm = "atmos" ;
		:run_variant = "forcing: GHG, Oz, SA, Sl, Vl, BC, OC, (GHG = CO2, N2O, CH4, CFC11, CFC12, CFC113, HCFC22, HFC125, HFC134a)" ;
		:source = "ACCESS-CM2 (2019): \n",
			"aerosol: UKCA-GLOMAP-mode\n",
			"atmos: MetUM-HadGEM3-GA7.1 (N96; 192 x 144 longitude/latitude; 85 levels; top level 85 km)\n",
			"atmosChem: none\n",
			"land: CABLE2.5\n",
			"landIce: none\n",
			"ocean: ACCESS-OM2 (GFDL-MOM5, tripolar primarily 1deg; 360 x 300 longitude/latitude; 50 levels; top grid cell 0-10 m)\n",
			"ocnBgchem: none\n",
			"seaIce: CICE5.1.2 (same grid as ocean)" ;
		:source_id = "ACCESS-CM2" ;
		:source_type = "AOGCM" ;
		:sub_experiment = "none" ;
		:sub_experiment_id = "none" ;
		:table_id = "day" ;
		:table_info = "Creation Date:(30 April 2019) MD5:e14f55f257cceafb2523e41244962371" ;
		:title = "ACCESS-CM2 output prepared for CMIP6" ;
		:variable_id = "pr" ;
		:variant_label = "r1i1p1f1" ;
		:version = "v20191108" ;
		:cmor_version = "3.4.0" ;
		:tracking_id = "hdl:21.14100/1cade23c-cf5e-4d0e-96f9-4128cd729af7" ;
		:license = "CMIP6 model data produced by CSIRO is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License (https://creativecommons.org/licenses/).  Consult https://pcmdi.llnl.gov/CMIP6/TermsOfUse for terms of use governing CMIP6 output, including citation requirements and proper acknowledgment.  Further information about this data, including some limitations, can be found via the further_info_url (recorded as a global attribute in this file).  The data producers and data providers make no warranty, either express or implied, including, but not limited to, warranties of merchantability and fitness for a particular purpose. All liabilities arising from the supply of the information (including any liability arising in negligence) are excluded to the fullest extent permitted by law." ;
		:DODS_EXTRA.Unlimited_Dimension = "time" ;
		:_NCProperties = "version=2,netcdf=4.6.2,hdf5=1.10.5" ;
		:_Format = "classic" ;
}

Xarray + Dask#

Xarray can automatically wrap its data into Dask arrays. This capability turns Xarray into an extremely powerful tool when working with big earth science datasets.

To see this in action, we will download a fairly large dataset to analyze. We use Xarray’s open_mfdataset to allow multiple files to be opened simultaneously.

# On DKRZ system

!ls /pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108
path = '/pool/data/CMIP6/data/ScenarioMIP/CSIRO-ARCCSS/ACCESS-CM2/ssp585/r1i1p1f1/day/pr/gn/v20191108/*'
pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20150101-20641231.nc
pr_day_ACCESS-CM2_ssp585_r1i1p1f1_gn_20650101-21001231.nc
f_ssp585 = xr.open_mfdataset(path, combine='by_coords')
f_ssp585
<xarray.Dataset>
Dimensions:    (time: 31411, bnds: 2, lat: 144, lon: 192)
Coordinates:
  * time       (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
  * lat        (lat) float64 -89.38 -88.12 -86.88 -85.62 ... 86.88 88.12 89.38
  * lon        (lon) float64 0.9375 2.812 4.688 6.562 ... 355.3 357.2 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(18263, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(18263, 144, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(18263, 192, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(18263, 144, 192), meta=np.ndarray>
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            ScenarioMIP
    branch_method:          standard
    branch_time_in_child:   60265.0
    branch_time_in_parent:  60265.0
    creation_date:          2019-11-08T10:45:50Z
    ...                     ...
    variable_id:            pr
    variant_label:          r1i1p1f1
    version:                v20191108
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/1cade23c-cf5e-4d0e-96f9-4128cd729af7
    license:                CMIP6 model data produced by CSIRO is licensed un...
NOTE: the values are not displayed, since that would trigger computation.

Chunks#

Notice that it says:pr(time, lat, lon) float32 dask.array<chunksize=(18263, 144, 192), meta=np.ndarray>. There is now the chunksize component. The data array also becomes a Dask array.

The chunking of the array comes from the integration of Dask with Xarray. Dask divides the data array into small pieces called “chunks”, with each chunk designed to be small enough to fit into memory.

The file itself may be already chunked. Filesystem chunking is available in netCDF-4 and HDF5 datasets. The CMIP6 data should all be netCDF-4 and include some form of chunking for each file.

Looking at the file metadata in the “Data” section above, we see in this case the file is chunked such that#

pr:_ChunkSizes = 1, 144, 192 ;#

Here we see that the data is chunked in space but not time, where one chunk is one time-step and all points in lat-lon.

image source: https://www.unidata.ucar.edu/blogs/developer/en/entry/chunking_data_why_it_matters

Consider 2 types of data access

  1. Accessing a 2D lat-lon slice in time (RHS figure)

  2. Accessing a time series at a single lat-lon point (LHS figure)

With the chunking above, the first type of data access only requires access to a single chunk, while the second type needs to access ALL the chunks of the data array regardless. This dataset will be fastest for 2D lat-lon single time-step data access.

In general, even without chunking - when the data is accessed contiguously (by index order) - time is the slowest variable to access, then y, with x being the fastest. With the chunking method of this CMIP6 dataset, time still remains the slowest variable. More uniform variable access speeds would require more evenly spaced chunks.

Exercise#

Time how long it takes to load the precipitation data at time='2015-01-01' and then time how long it takes to load the data at lat=0 and lon=180 (remember to use method='nearest' for the latter case). How much difference in time is there when using these different access methods?

%%time
f_ssp585.pr.sel(time='2015-01-01').load()
CPU times: user 5.82 ms, sys: 3.84 ms, total: 9.66 ms
Wall time: 22 ms
<xarray.DataArray 'pr' (time: 1, lat: 144, lon: 192)>
array([[[1.6359276e-05, 1.1489548e-05, 1.1842003e-05, ...,
         2.6086314e-05, 2.8186410e-05, 1.7582113e-05],
        [2.6601911e-05, 2.8185083e-05, 2.2405620e-05, ...,
         1.7252434e-05, 2.2461660e-05, 2.7394799e-05],
        [8.8953284e-06, 1.6366070e-05, 2.8468858e-05, ...,
         2.3836124e-06, 2.5670124e-06, 4.2845654e-06],
        ...,
        [1.6479444e-06, 1.4545669e-06, 1.0920535e-06, ...,
         1.7799719e-06, 1.7468499e-06, 1.7031343e-06],
        [3.6177703e-08, 3.3514027e-08, 2.8007335e-08, ...,
         2.8953947e-08, 3.2579656e-08, 3.5454985e-08],
        [4.3186881e-09, 4.3916062e-09, 4.4698378e-09, ...,
         4.2707637e-09, 4.2820667e-09, 4.2873936e-09]]], dtype=float32)
Coordinates:
  * time     (time) datetime64[ns] 2015-01-01T12:00:00
  * lat      (lat) float64 -89.38 -88.12 -86.88 -85.62 ... 86.88 88.12 89.38
  * lon      (lon) float64 0.9375 2.812 4.688 6.562 ... 353.4 355.3 357.2 359.1
Attributes:
    standard_name:  precipitation_flux
    long_name:      Precipitation
    comment:        includes both liquid and solid phases
    units:          kg m-2 s-1
    cell_methods:   area: time: mean
    cell_measures:  area: areacella
    history:        2019-11-08T10:45:49Z altered by CMOR: replaced missing va...
%%time
f_ssp585.pr.sel(lat=0,lon=180,method='nearest').load()
CPU times: user 14.9 s, sys: 5.15 s, total: 20.1 s
Wall time: 20.2 s
<xarray.DataArray 'pr' (time: 31411)>
array([1.83213061e-07, 2.28843783e-06, 1.21034245e-05, ...,
       1.55756240e-07, 6.65439970e-08, 3.86067654e-07], dtype=float32)
Coordinates:
  * time     (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
    lat      float64 0.625
    lon      float64 180.9
Attributes:
    standard_name:  precipitation_flux
    long_name:      Precipitation
    comment:        includes both liquid and solid phases
    units:          kg m-2 s-1
    cell_methods:   area: time: mean
    cell_measures:  area: areacella
    history:        2019-11-08T10:45:49Z altered by CMOR: replaced missing va...

The same volume of data can take orders of magnitude longer to load#

The spatial dataset contained 27648 data-points and took in the order of 100ms to load. The time-series dataset had 31411 data-points and took order 10,000 ms to load.

NOTE: If you look at the dashboard, the task stream actually shows that the most time consuming part is data concatenation.

Chunking and the ways in which data is read is important when considering both how you access the data and if you wish to parallelise your code.

NetCDF file Chunks versus Dask Chunks#

Keep in mind, Dask chunking is different to chunking of the stored data. As we saw in our example, the stored data was chunked with chunks of size (1,144,192) whereas the Dask array had a chunk size of (18263, 144, 192). It’s possible to change the chunking size in the Dask array. In the example below, we are specifying that there are 730 chunks in time.

f_ssp585 = xr.open_mfdataset(path,chunks={'time':730}, combine='by_coords')

f_ssp585
<xarray.Dataset>
Dimensions:    (time: 31411, bnds: 2, lat: 144, lon: 192)
Coordinates:
  * time       (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
  * lat        (lat) float64 -89.38 -88.12 -86.88 -85.62 ... 86.88 88.12 89.38
  * lon        (lon) float64 0.9375 2.812 4.688 6.562 ... 355.3 357.2 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(730, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(18263, 144, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(18263, 192, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(730, 144, 192), meta=np.ndarray>
Attributes: (12/47)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            ScenarioMIP
    branch_method:          standard
    branch_time_in_child:   60265.0
    branch_time_in_parent:  60265.0
    creation_date:          2019-11-08T10:45:50Z
    ...                     ...
    variable_id:            pr
    variant_label:          r1i1p1f1
    version:                v20191108
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/1cade23c-cf5e-4d0e-96f9-4128cd729af7
    license:                CMIP6 model data produced by CSIRO is licensed un...

How big do you make your chunks?#

The rule of thumb for Dask chunks is that you should “create arrays with a minimum chunksize of at least one million elements”: http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance

NetCDF4 chunks are often a lot smaller than Dask array chunks. The minimum chunksize exists because if you have too many chunks, queuing of operations when parallelising will be slow. If the chunk sizes are too large, computation and memory can be wasted. The default chunks from dask gave us chunks of size (18263, 144, 192) or around 500 million elements so we could try reducing those chunks if needed. The larger the array, the larger the cost of queueing and therefore larger chunks may be needed.

IMPORTANT: Whatever Dask array chunks you use, make sure they align with the netCDF4 file chunks!!#

So far our chunks have been in time, and the netCDF4 file is also chunked in time. If we tried to use dask chunks to optimise the time-series loading of data, it would not help!

Exercise#

Try to load the data in with chunks size (31411,180,1) (i.e. chunked in lon) and name that file f_bad_chunk. Next, try reloading the time series of pr at lat=0 and lon=180 and time how long this takes.

f_bad_chunk = xr.open_mfdataset(path,chunks={'time':31411,'lat':180,'lon':1}, combine='by_coords')

Try running your previous code for f_bad_chunk again loading the time series of pr at lat=0 and lon=180 and time how long it takes if you scale up or down the number of workers.

Do the same with the original chunking method of f_ssp585 and see if there is a difference.

We will use three different schedulers to read the data. First, let’s initiate a client.

# If you run this notebook on your local computer or NCI's VDI instance, you can create cluster
from dask.distributed import Client
client = Client()
print(client)
<Client: 'tcp://127.0.0.1:41595' processes=4 threads=12, memory=20.00 GiB>
# If you run this notebook on Gadi under pangeo environment, you can create cluster using scheduler.json file
from dask.distributed import Client, LocalCluster
client = Client()
print(client)
/home/k/k204199/.conda/envs/summerschool_2022/lib/python3.10/site-packages/distributed/node.py:183: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 35675 instead
  warnings.warn(
<Client: 'tcp://127.0.0.1:34531' processes=4 threads=12, memory=20.00 GiB>
Warning: Please make sure you specify the correct path to the scheduler.json file within your environment.

Starting the Dask Client will provide a dashboard which is useful to gain insight into the computation. The link to the dashboard will become visible when you create the Client. We recommend having the Client open on one side of your screen and your notebook open on the other side, which will be useful for learning purposes.

%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='synchronous')
/home/k/k204199/.conda/envs/summerschool_2022/lib/python3.10/site-packages/dask/base.py:1365: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
CPU times: user 15.3 s, sys: 551 ms, total: 15.8 s
Wall time: 15.4 s
<xarray.DataArray 'pr' (time: 31411)>
array([1.83213061e-07, 2.28843783e-06, 1.21034245e-05, ...,
       1.55756240e-07, 6.65439970e-08, 3.86067654e-07], dtype=float32)
Coordinates:
  * time     (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
    lat      float64 0.625
    lon      float64 180.9
Attributes:
    standard_name:  precipitation_flux
    long_name:      Precipitation
    comment:        includes both liquid and solid phases
    units:          kg m-2 s-1
    cell_methods:   area: time: mean
    cell_measures:  area: areacella
    history:        2019-11-08T10:45:49Z altered by CMOR: replaced missing va...
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='threads')
CPU times: user 15.4 s, sys: 505 ms, total: 15.9 s
Wall time: 15.4 s
<xarray.DataArray 'pr' (time: 31411)>
array([1.83213061e-07, 2.28843783e-06, 1.21034245e-05, ...,
       1.55756240e-07, 6.65439970e-08, 3.86067654e-07], dtype=float32)
Coordinates:
  * time     (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
    lat      float64 0.625
    lon      float64 180.9
Attributes:
    standard_name:  precipitation_flux
    long_name:      Precipitation
    comment:        includes both liquid and solid phases
    units:          kg m-2 s-1
    cell_methods:   area: time: mean
    cell_measures:  area: areacella
    history:        2019-11-08T10:45:49Z altered by CMOR: replaced missing va...
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load(scheduler='processes')
CPU times: user 495 ms, sys: 134 ms, total: 629 ms
Wall time: 18.4 s
<xarray.DataArray 'pr' (time: 31411)>
array([1.83213061e-07, 2.28843783e-06, 1.21034245e-05, ...,
       1.55756240e-07, 6.65439970e-08, 3.86067654e-07], dtype=float32)
Coordinates:
  * time     (time) datetime64[ns] 2015-01-01T12:00:00 ... 2100-12-31T12:00:00
    lat      float64 0.625
    lon      float64 180.9
Attributes:
    standard_name:  precipitation_flux
    long_name:      Precipitation
    comment:        includes both liquid and solid phases
    units:          kg m-2 s-1
    cell_methods:   area: time: mean
    cell_measures:  area: areacella
    history:        2019-11-08T10:45:49Z altered by CMOR: replaced missing va...

Poor chunking with dask can make your performance worse!#

As you can see, bad chunks and the alignment of the chunks slow down the I/O performance significantly. They are both important to keep in mind when creating Dask chunks.

Close the client#

Before moving on to the next exercise, make sure to close your client or stop this kernel.

client.close()

Summary#

This example shows how to make data chunking with Dask.

For further information regarding Dask, please see: https://docs.dask.org/en/latest/