{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "e2d8b054", "metadata": {}, "source": [ "# Dealing with large output files\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eb5792d0", "metadata": {}, "source": [ "You might imagine that if you followed the instructions [on the making of parallel runs](https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html) and the loading of the resulting dataset, you could just use the `dataset.to_zarr()` function to save the data to a single `zarr` datastore. This is true for small enough datasets. However, there is a bug in `xarray` which makes this inefficient for large data sets.\n", "\n", "At the same time, it will often improve performance if large datasets are saved as a single `zarr` store, chunked appropriately, and the type of the variables in them modified. It is often also useful to add other variables to the dataset. This document describes how to do all this.\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4010c8d0", "metadata": {}, "source": [ "## Why are we doing this? And what chunk sizes should we choose?\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "46cb22d5", "metadata": {}, "source": [ "If you are running a relatively small case (perhaps 1/10 the size of the memory of your machine), nearly anything you do will work. However, as your problems get larger, it can help to write the data into a single zarr datastore, and to chunk that store appropriately.\n", "\n", "To illustrate this, here is the time it takes to retrieve all the results (with `ds['variableName'].values`) of some common data structures with different chunk sizes. (What is a chunk size? More on that below). The data in this example has 39 million trajectories starting over 120 times, and there are 250 observations, resulting in a directory size of 88Gb in double precision and 39 in single. In this table, \"trajectory:5e4, obs:10\" indicates that each chunk extends over 50,000 trajectories and 10 obs. The chunking in the original data is roughly a few thousand observations and 10 obs.\n", "\n", "| File type | open [s] | read 1 obs, all traj [s] | read 23 obs, all traj [s] | read 8000 contiguous traj, all obs [s] | read traj that start at a given time, all obs [s] |\n", "| ----------------------- | -------- | ------------------------ | ------------------------- | ------------------------------------- | ------------------------------------------------- |\n", "| Straigth from parcels | 2.9 | 8.4 | 59.9 | 1.5 | 17.4 |\n", "| trajectory:5e4, obs:10 | 0.48 | 2.5 | 19.5 | 0.4 | 10.33 |\n", "| trajectory:5e4, obs:100 | 0.55 | 20.5 | 13.8 | 0.5 | 3.88 |\n", "| trajectory:5e5, obs:10 | 0.54 | 2.2 | 16.3 | 0.85 | 18.5 |\n", "| trajectory:5e5, obs:100 | 0.46 | 19.9 | 40.0 | 0.62 | 49.36 |\n", "\n", "You can see several things in this. It is always quicker to open a single file, and for all data access patterns, there is are chunksizes that are more efficient than the default output. Why is this?\n", "\n", "The chunksize determines how data is stored on disk. For the default zarr datastore, each chunk of data is stored as a single compressed file. In netCDF, chunking is similar except that the compressed data is stored within a single file. In either case, if you must access any data from within a chunk, you must read the entire chunk from disk.\n", "\n", "So when we access one obs dimension and many trajectories, the chunking scheme that is elongated in the trajectory direction is fastest. When we get all the observation for a scattered set of trajectories, the chunking that is elongated in observations is the best. In general, the product of the two chunksizes (the number of data points in a chunk) should be hundreds of thousands to 10s of millions. A suboptimal chunking scheme is usually not tragic, but if you know how you will most often access the data, you can save considerable time.\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0d5d3385", "metadata": {}, "source": [ "## How to save the output of an MPI ocean parcels run to a single zarr dataset\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b5001615", "metadata": {}, "source": [ "First, we need to import the necessary modules, specify the directory `inputDir` which contains the output of the parcels run (the directory that has proc01, proc02 and so forth), the location of the output zarr file `outputDir` and a dictionary giving the chunk size for the `trajectory` and `obs` coordinates, `chunksize`.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2622a91d", "metadata": {}, "outputs": [], "source": [ "import time\n", "from glob import glob\n", "from os import path\n", "\n", "import dask\n", "import xarray as xr\n", "from dask.diagnostics import ProgressBar\n", "from numpy import *\n", "from pylab import *\n", "\n", "# first specify the directory in which the MPI code wrote its output\n", "inputDir = (\n", " \"dataPathsTemp/\"\n", " + \"theAmericas_wholeGlobe_range100km_depthFrom_1m_to_500m_habitatTree_months01_to_02_fixed_1m/\"\n", " + \"2007/tracks.zarr\"\n", ")\n", "\n", "\n", "# specify chunksize and where the output zarr file should go; also set chunksize of output file\n", "chunksize = {\"trajectory\": 5 * int(1e4), \"obs\": 10}\n", "outputDir = \"/home/pringle/jnkData/singleFile_5e4_X_10_example.zarr\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "33383cbe", "metadata": {}, "source": [ "Now for large datasets, this code can take a while to run; for 36 million trajectories and 250 observations, it can take an hour and a half. I prefer not to accidentally destroy data that takes more than an hour to create, so I put in a safety check and only let the code run if the output directory does not exist.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "51a1414d", "metadata": {}, "outputs": [], "source": [ "# do not overwrite existing data sets\n", "if path.exists(outputDir):\n", " print(\"the ouput path\", outputDir, \"exists\")\n", " print(\"please delete if you want to replace it\")\n", " assert False, \"stopping execution\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b8818397", "metadata": {}, "source": [ "It will often be useful to change the [dtype](https://numpy.org/doc/stable/reference/generated/numpy.dtype.html) of the output data. Doing so can save a great deal of disk space. For example, the input data for this example is 88Gb in size, but by changing lat, lon and z to single precision, I can make the file about half as big.\n", "\n", "This comes at the cost of some accuracy. Float64 has 14 digits of accuracy, float32 has 7. For latitude and longitude, going from float64 to float32 increases the error by the circumference of the Earth divided 1e7, or about 1m. This is good enough for what I am doing. However, a year of time has about 3.15e7 seconds, and we often want to know within a second when a particle is released (to avoid floating point issues when picking out particles that start at a specific time). So the 3.15e7/1e7 error (a few seconds) in the time coordinate could cause problems. So I don't want to reduce the precision of time.\n", "\n", "There is one other important issue. Due to a bug in xarray, it is much slower to save datasets with a datetime64 variable in them. So time here will be given as float64. If (as we do below) the attribute data is preserved, it will still appear as a datetime type when the data file is loaded\n", "\n", "To change precision, put an entry into the dictionary `varType` whose key is the name of the variable, and whose value is the type you wish the variable to be cast to:\n" ] }, { "cell_type": "code", "execution_count": 14, "id": "9ca2bb15", "metadata": {}, "outputs": [], "source": [ "varType = {\n", " \"lat\": dtype(\"float32\"),\n", " \"lon\": dtype(\"float32\"),\n", " \"time\": dtype(\"float64\"), # to avoid bug in xarray\n", " \"z\": dtype(\"float32\"),\n", "}" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c26e9ba7", "metadata": {}, "source": [ "Now we need to read in the data as discussed in the section on making an MPI run. However, note that `xr.open_zarr()` is given the `decode_times=False` option, which prevents the time variable from being converted into a datetime64[ns] object. This is necessary due to a bug in xarray. As discussed above, when the data set is read back in, time will again be interpreted as a datetime.\n" ] }, { "cell_type": "code", "execution_count": 15, "id": "e7dd9f61", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "opening data from multiple process files\n", " done opening in 6.37\n" ] } ], "source": [ "print(\"opening data from multiple process files\")\n", "tic = time.time()\n", "files = glob(path.join(inputDir, \"proc*\"))\n", "dataIn = xr.concat(\n", " [xr.open_zarr(f, decode_times=False) for f in files],\n", " dim=\"trajectory\",\n", " compat=\"no_conflicts\",\n", " coords=\"minimal\",\n", ")\n", "print(\" done opening in %5.2f\" % (time.time() - tic))" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f93a60ff", "metadata": {}, "source": [ "Now we can take advantage of the `.astype` operator to change the type of the variables. This is a lazy operator, and it will only be applied to the data when the data values are requested below, when the data is written to a new zarr store.\n" ] }, { "cell_type": "code", "execution_count": 16, "id": "6819cd84", "metadata": {}, "outputs": [], "source": [ "for v in varType.keys():\n", " dataIn[v] = dataIn[v].astype(varType[v])" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f8410589", "metadata": {}, "source": [ "The dataset is then rechunked to our desired shape. This does not actually do anything right now, but will when the data is written below. Before doing this, it is useful to remove the per-variable chunking metadata, because of inconsistencies which arise due to (I think) each MPI process output having a different chunking. This is explained in more detail in https://github.com/dcs4cop/xcube/issues/347\n" ] }, { "cell_type": "code", "execution_count": 17, "id": "9a56c3cc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "re-chunking\n", " done in 9.15590238571167\n" ] } ], "source": [ "print(\"re-chunking\")\n", "tic = time.time()\n", "for v in dataIn.variables:\n", " if \"chunks\" in dataIn[v].encoding:\n", " del dataIn[v].encoding[\"chunks\"]\n", "dataIn = dataIn.chunk(chunksize)\n", "print(\" done in\", time.time() - tic)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6f59018b", "metadata": {}, "source": [ "The dataset `dataIn` is now ready to be written back out with dataIn.to_zarr(). Because this can take a while, it is nice to delay computation and then compute() the resulting object with a progress bar, so we know how long we have to get a cup of coffee or tea.\n" ] }, { "cell_type": "code", "execution_count": 19, "id": "de5415ed", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[########################################] | 100% Completed | 33m 9ss\n" ] } ], "source": [ "delayedObj = dataIn.to_zarr(outputDir, compute=False)\n", "with ProgressBar():\n", " results = delayedObj.compute()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9080025f", "metadata": {}, "source": [ "We can now load the zarr data set we have created, and see what is in it, compared to what was in the input dataset. Note that since we have not used \"decode_times=False\", the time coordinate appears as a datetime object.\n" ] }, { "cell_type": "code", "execution_count": 20, "id": "3157592c", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The original data\n", " \n", "Dimensions: (trajectory: 39363539, obs: 250)\n", "Coordinates:\n", " * obs (obs) int32 0 1 2 3 4 5 6 7 ... 242 243 244 245 246 247 248 249\n", " * trajectory (trajectory) int64 0 22 32 40 ... 39363210 39363255 39363379\n", "Data variables:\n", " age (trajectory, obs) float32 dask.array\n", " lat (trajectory, obs) float64 dask.array\n", " lon (trajectory, obs) float64 dask.array\n", " time (trajectory, obs) datetime64[ns] dask.array\n", " z (trajectory, obs) float64 dask.array\n", "Attributes:\n", " Conventions: CF-1.6/CF-1.7\n", " feature_type: trajectory\n", " ncei_template_version: NCEI_NetCDF_Trajectory_Template_v2.0\n", " parcels_mesh: spherical\n", " parcels_version: 2.3.2.dev137 \n", "\n", "The new dataSet\n", " \n", "Dimensions: (trajectory: 39363539, obs: 250)\n", "Coordinates:\n", " * obs (obs) int32 0 1 2 3 4 5 6 7 ... 242 243 244 245 246 247 248 249\n", " * trajectory (trajectory) int64 0 22 32 40 ... 39363210 39363255 39363379\n", "Data variables:\n", " age (trajectory, obs) float32 dask.array\n", " lat (trajectory, obs) float32 dask.array\n", " lon (trajectory, obs) float32 dask.array\n", " time (trajectory, obs) datetime64[ns] dask.array\n", " z (trajectory, obs) float32 dask.array\n", "Attributes:\n", " Conventions: CF-1.6/CF-1.7\n", " feature_type: trajectory\n", " ncei_template_version: NCEI_NetCDF_Trajectory_Template_v2.0\n", " parcels_mesh: spherical\n", " parcels_version: 2.3.2.dev137\n" ] } ], "source": [ "dataOriginal = xr.concat(\n", " [xr.open_zarr(f) for f in files],\n", " dim=\"trajectory\",\n", " compat=\"no_conflicts\",\n", " coords=\"minimal\",\n", ")\n", "dataProcessed = xr.open_zarr(outputDir)\n", "print(\"The original data\\n\", dataOriginal, \"\\n\\nThe new dataSet\\n\", dataProcessed)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }