Skip to content

Commit

Permalink
updates to high-performance Python
Browse files Browse the repository at this point in the history
  • Loading branch information
kah3f committed May 10, 2024
1 parent f89447b commit 94d7eaa
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 125 deletions.
7 changes: 4 additions & 3 deletions content/courses/python-high-performance/codes/pimpi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

echo Running on `hostname`

module purge

#Load the versions of gcc, MPI, and Python/Anaconda in which you installed mpi4py
module load anaconda
module load gcc
module load openmpi
module load anaconda

#Activate your environment. Edit to specify whatever name you chose.
conda activate mpienv

srun python MonteCarloPiMPI.py 100000000
1 change: 0 additions & 1 deletion content/courses/python-high-performance/compiled_code.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ weight: 24
menu:
hp-python:
parent: High-Performance Python
weight: 24
---

Broadly speaking, interpreted languages tend to be slow, but are relatively easy to learn and use. Compiled languages generally deliver the maximum speed, but are more complex to learn and use effectively. Python can utilize libraries of compiled code that are appropriately prepared, or can invoke a compiler (standard or "just in time") to compile snippets of code and incorporate it directly into the execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ weight: 45
menu:
hp-python:
parent: High-Performance Python
weight: 45
---

Nearly all recent computers, including personal laptops, are multicore systems. The central-processing units (CPUs) of these machines are divided into multiple processor cores. These cores share the main memory (RAM) of the computer and may share at least some of the faster memory (cache). This type of system is called a **shared-memory processing** or **symmetric multiprocessing** (SMP) computer.
Expand Down Expand Up @@ -214,7 +213,7 @@ Xarray is particularly well suited to geophysical data in the form of NetCDF fil

### Dask and Machine Learning

Machine learning is beyond our scope here, but we will make a few comments. Dask can integrate with scikit-learn in the form of `Dask-ML`.
Machine learning is beyond our scope here, but we will make a few comments. Dask can integrate with scikit-learn in the form of `Dask-ML`. You may need to use pip to install dask-ml.
```python
>>>import numpy as np
>>>import dask.array as da
Expand All @@ -238,118 +237,3 @@ Machine learning is beyond our scope here, but we will make a few comments. Das
>>>clf.score(X_large, y_large)
```
Dask can also be used with Pytorch. See the [documentation](https://examples.dask.org/machine-learning/torch-prediction.html) for an example.

## MPI

The most widely used general-purpose communications library for distributed parallelization is MPI, the Message Passing Interface.

MPI works on multicore systems as well as multinode, but the programming model is still different from threads.

In MPI each process has an ID called its _rank_. Ranks are numbered from 0 to _n-1_, for _n_ processes. Each process runs completely independently. No process shares memory with any other process whether running on the same node or not. All communications occur over the network.
To use MPI the programmer must manage the distribution of the data to different processes and the communication among the processes.

MPI messages are identified by an "envelope" of metadata. This consists of the _destination_, the _source_ (the "return address"), a _communicator_ (a group of processes that will be exchanging information), and optionally a _tag_. A communicator consisting of all processes, called COMM_WORLD, is set up when the MPI program is initiated.

The process with rank 0 is usually called the **root process**. Since the mimumum number of processes is 1, the root process is the only one that is guaranteed to be present. For this reason it is usually used to manage various bookkeeping tasks as well as input/output.

## The mpi4py Package

The most popular direct way to use MPI with Python is the `mpi4py` package. It is not included in the base Anaconda distribution. To install it into your environment in an HPC cluster such as Rivanna, load the appropriate modules for compiler and an MPI distribution. It is important that a command `mpicc` provided by your HPC site be first in your path, since that should have been set up to communicate properly with your resource manager (SLURM etc.)
```
module load gcc/11.4.0
module load openmpi
module load anaconda
pip install --user mpi4py
```
You must use `pip` rather than `conda` because conda will install precompiled binaries and you must compile `mpi4py` explicitly.

MPI consists of dozens of functions, though most programmers need only a fraction of the total. The mpi4py package has implemented most of them using a "Pythonic" syntax, rather than the more C-like syntax used by other languages. One peculiarity of mpi4py is that only particular types may be communicated; in particular, only NumPy NDArrays or pickled objects are supported. To simplify our discussion, we will ignore the versions for pickled objects. The requirement that NumPy arrays be sent means that even single values must be represented as one-element NumPy arrays.

MPI requires more advanced programming skills so we will just show an example here. Our Monte Carlo pi program is well suited to MPI so we can use that.

{{% code-download file="/courses/python-high-performance/codes/MonteCarloPiMPI.py" lang="python" %}}

The first invocation of MPI is the call to Get_rank. This returns the rank of the process that calls it. Remember that each MPI process runs as a separate executable; the only way their behaviors can be controlled individually is through the rank. This call also initializes MPI; a separate MPI.Init is not required. The next line allows us to find out how many processes are in COMM_WORLD. The number of processes for MPI programs is always set outside the program, and should never be hardcoded into the source code.

Next we divide up the number of "throws" into roughly equal chunks, just as we did in the corresponding Multiprocessing example. The same list is generated, but we use _myrank_ to select the element of the list that each rank will use.

After this each process invokes the `pi` routine using `myNumPoints` for its rank. We need to collect all the estimates and average them. The _reduction_ collects the results from each rank (in rank order) and applies the "op" (operation). A reduction must be a binary operation that returns another object of the same type; the operation is applied along the sequence. In this case we want the sum so we add the result from rank 0 to that from rank 1, then we take that sum and add the result from rank 2, and so forth. Dividing by the number of processes provides the average.

The Reduce function returns each result to the process regarded as the root, which would normally be 0 as it is in this case, and root carries out the operation. Thus _only_ the root process knows the final result. We then select root to print out the value.

This example reruns the problem without distributing the throws in order to obtain a serial time for comparison purposes. Of course, a real application would not do that.

### Running the MPI Python Program on Rivanna

In order to launch multiple tasks (or processes) of our program, we run this program through the MPI executor. On our HPC cluster this is srun.

```
srun python MonteCarloPiMPI.py 1000000000
```

**Note: you cannot launch the MPI program with `srun` on the Rivanna login nodes.** In order to execute our program on designated compute node(s), we need to write a simple bash script that defines the compute resources we need. We call this our job script. For our example, the job script `pimpi.sh` looks like this:

{{% code-download file="/courses/python-high-performance/codes/pimpi.sh" lang="bash" %}}

The `#SBATCH` directives define the compute resources (`-N`, `--ntasks-per-node`, `-p`), compute wall time (`-t`), and the allocation account (`--account`) to be used. `-N 1` specifies that all MPI tasks should run on a single node. We are limiting the number of nodes for this workshop so that everyone gets a chance to run their code on the shared resources.

**Submitting the job:**

Open a terminal window and execute this command:
```
sbatch pimpi.sh
```

**Checking the job status:**

Check the job status with the `squeue -u` or `sacct` commands as described in the [Multiprocessing](#multiprocessing) section.

**Checking the output file:**

Open the `slurm-XXXXXX.out` files in a text editor and record the total run time for the job.

**Exercise**

Rerun the MPI job using 1 or 4 cpu cores by changing the `--ntasks-per-node` option.

### Scaling
On the cluster, the timings are very similar to Multiprocessing on the workstation.

{{< table >}}
| CPU Cores | Run Time |
| --- | --- |
| 1 (serial) | 400 sec|
| 4 | 102 sec |
| 8 | 60 sec |
| 16 | 31 sec |
{{< /table >}}

## Dask-MPI

Dask can use `mpi4py` on a high-performance cluster. First install mpi4py according to the instructions in the previous section, then `pip install --user dask-mpi`.

### Schedulers

We have not discussed Dask [_schedulers_](https://docs.dask.org/en/latest/scheduling.html) previously. The scheduler is a process that managers the workers that carry out the tasks.
We have been implicitly using the _single-machine_ scheduler, which is the default. Within the single-machine scheduler are two options, _threaded_ and _processes_. The threaded single-machine scheduler is the default for Dask Arrays, Dask Dataframes, and Dask Delayed. However, as we discussed with [Multiprocessing](/courses/python-high-performance/multiprocessing), the GIL (Global Interpreter Lock) inhibits threading in general. Most of NumPy and Pandas release the GIL so threading works well with them. If you cannot use NumPy and Pandas then the processes scheduler is preferred. It is much like Multiprocessing.

To use Dask-MPI we must introduce the Dask `distributed` scheduler. The `distributed` scheduler may be preferable to `processes` even on a single machine, and it is required for use across multiple nodes.

### Running Dask-MPI

We will discuss here only the batch interface for Dask MPI. Dask-MPI provides an `initialize` function that initializes MPI and sets up communications. You must then start a Client cluster to connect the workers. Dask-MPI uses rank 0 for the manager process and rank 1 to mediate between manager and workers, so you must request _at least_ 3 processes. Close the cluster at the end of your script to avoid error messages about terminated processes.

**Example**
Convert the "timeseries" example to Dask MPI.

{{< code-download file="/courses/python-high-performance/codes/dask_df_mpi.py" lang="python" >}}

Run this simple example with

{{< code-download file="/courses/python-high-performance/codes/run_dask_mpi.slurm" lang="bash" >}}

The OMPI_MCA environment variable suppresses a warning message that is seldom relevant.

Using Dask-MPI is not difficult, especially in batch mode, but users interested in trying it should be sure to first understand the distributed scheduler, then study the online examples carefully. Dask-MPI does not require explicit calls to MPI but with the convenience comes some loss of control; the algorithm must be suited to the distributed scheduler. More information may be found in the [documentation](https://mpi.dask.org/en/latest/batch.html).

Full documentation for Dask-MPI is [here](http://mpi.dask.org/en/latest/).
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ weight: 60
menu:
hp-python:
parent: High-Performance Python
weight: 60
---

Certain tasks can be greatly accelerated if run on a graphics processing unit (GPU). A GPU can be regarded as a device that runs hundreds or thousands of threads. The memory per thread is usually fairly limited but has a very high bandwidth. Data must be moved to and from the host computer's memory to the GPU's memory.
Expand Down
Loading

0 comments on commit 94d7eaa

Please sign in to comment.