diff --git a/README.md b/README.md index f0dbcb1..4b329fc 100644 --- a/README.md +++ b/README.md @@ -4,3 +4,131 @@ [![image](https://img.shields.io/badge/discourse-forum-blue.svg)](https://forum.reana.io) [![image](https://img.shields.io/github/license/reanahub/reana-demo-dask-coffea.svg)](https://github.com/reanahub/reana-demo-dask-coffea/blob/master/LICENSE) [![image](https://www.reana.io/static/img/badges/launch-on-reana-at-cern.svg)](https://reana.cern.ch/launch?url=https%3A%2F%2Fgithub.com%2Freanahub%2Freana-demo-dask-coffea&specification=reana.yaml&name=reana-demo-dask-coffea) + +## About + +This [REANA](http://www.reana.io/) reproducible analysis example provides a +simple example how to run Dask workflows using Coffea. The example was adapted +from +[Coffea Casa tutorials](https://github.com/CoffeaTeam/coffea-casa-tutorials/blob/master/examples/example1.ipynb) +repository. + +## Analysis structure + +Making a research data analysis reproducible basically means to provide +"runnable recipes" addressing (1) where is the input data, (2) what software was +used to analyse the data, (3) which computing environments were used to run the +software and (4) which computational workflow steps were taken to run the +analysis. This will permit to instantiate the analysis on the computational +cloud and run the analysis to obtain (5) output results. + +### 1. Input data + +In this example, we are using a single CMS open data set file +`Run2012B_SingleMu.root` which is hosted at EOSPUBLIC XRootD server. + +### 2. Analysis code + +The analysis code consists of a single Python file called `analysis.py` which +connects to a Dask cluster and then conducts the analysis and prints MET +histogram. + +### 3. Compute environment + +In order to be able to rerun the analysis even several years in the future, we +need to "encapsulate the current compute environment". We shall achieve this by +preparing a [Docker](https://www.docker.com/) container image for our analysis +steps. + +This example makes use of the Coffea platform image with the specific version +0.7.22. The container image can be found on Docker Hub at +[docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049](https://hub.docker.com/r/coffeateam/coffea-dask-cc7). + +### 4. Analysis workflow + +The analysis workflow is simple and consists of a single command. We simply run +the script `python analysis.py` to run the example. The command will then use +the Dask behind the scenes to possibly launch parallel computations. As a user, +we do not have to specify the computational graph ourselves; the Dask library +will take care of dispatching computations. + +### 5. Output results + +The example produces the following MET event-level histogram as an output. + +![](https://github.com/user-attachments/assets/e52c2391-626d-4556-90ca-75248516cc95) + +## Running the example on REANA cloud + +There are two ways to execute this analysis example on REANA. + +If you would like to simply launch this analysis example on the REANA instance +at CERN and inspect its results using the web interface, please click on the +following badge: + +[![Launch on REANA@CERN badge](https://www.reana.io/static/img/badges/launch-on-reana-at-cern.svg)](https://reana.cern.ch/launch?url=https://github.com/reanahub/reana-demo-dask-coffea&specification=reana.yaml&name=reana-demo-dask-coffea) + +If you would like a step-by-step guide on how to use the REANA command-line +client to launch this analysis example, please read on. + +We start by creating a [reana.yaml](reana.yaml) file describing the above +analysis structure with its inputs, code, runtime environment, computational +workflow steps and expected outputs: + +```yaml +inputs: + files: + - analysis.py +workflow: + type: serial + resources: + dask: + image: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049 + specification: + steps: + - name: process + environment: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049 + commands: + - python analysis.py +outputs: + files: + - histogram.png +tests: + files: + - tests/log-messages.feature + - tests/workspace-files.feature +``` + +In this example we are using a simple Serial workflow engine to launch our +Dask-based computations. + +We can now install the REANA command-line client, run the analysis and download +the resulting plots: + +```console +$ # create new virtual environment +$ virtualenv ~/.virtualenvs/reana +$ source ~/.virtualenvs/reana/bin/activate +$ # install REANA client +$ pip install reana-client +$ # connect to some REANA cloud instance +$ export REANA_SERVER_URL=https://reana.cern.ch/ +$ export REANA_ACCESS_TOKEN=XXXXXXX +$ # create new workflow +$ reana-client create -n myanalysis +$ export REANA_WORKON=myanalysis +$ # upload input code, data and workflow to the workspace +$ reana-client upload +$ # start computational workflow +$ reana-client start +$ # ... should be finished in about 5 minutes +$ reana-client status +$ # list workspace files +$ reana-client ls +$ # download output results +$ reana-client download +``` + +Please see the [REANA-Client](https://reana-client.readthedocs.io/) +documentation for more detailed explanation of typical `reana-client` usage +scenarios. diff --git a/analysis.py b/analysis.py new file mode 100644 index 0000000..893f444 --- /dev/null +++ b/analysis.py @@ -0,0 +1,76 @@ +import numpy as np +import hist +import coffea.processor as processor +import awkward as ak +from coffea.nanoevents import schemas +import matplotlib.pyplot as plt +from dask.distributed import Client +import os + + +# This program plots an event-level variable (in this case, MET, but switching it is as easy as a dict-key change). It also demonstrates an easy use of the book-keeping cutflow tool, to keep track of the number of events processed. + + +# The processor class bundles our data analysis together while giving us some helpful tools. It also leaves looping and chunks to the framework instead of us. +class Processor(processor.ProcessorABC): + def __init__(self): + # Bins and categories for the histogram are defined here. For format, see https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Hist.html && https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Bin.html + dataset_axis = hist.axis.StrCategory( + name="dataset", label="", categories=[], growth=True + ) + MET_axis = hist.axis.Regular( + name="MET", label="MET [GeV]", bins=50, start=0, stop=100 + ) + + # The accumulator keeps our data chunks together for histogramming. It also gives us cutflow, which can be used to keep track of data. + self.output = processor.dict_accumulator( + { + "MET": hist.Hist(dataset_axis, MET_axis), + "cutflow": processor.defaultdict_accumulator(int), + } + ) + + def process(self, events): + # This is where we do our actual analysis. The dataset has columns similar to the TTree's; events.columns can tell you them, or events.[object].columns for deeper depth. + dataset = events.metadata["dataset"] + MET = events.MET.pt + + # We can define a new key for cutflow (in this case 'all events'). Then we can put values into it. We need += because it's per-chunk (demonstrated below) + self.output["cutflow"]["all events"] += ak.size(MET) + self.output["cutflow"]["number of chunks"] += 1 + + # This fills our histogram once our data is collected. The hist key ('MET=') will be defined in the bin in __init__. + self.output["MET"].fill(dataset=dataset, MET=MET) + return self.output + + def postprocess(self, accumulator): + pass + + +DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080") +client = Client(DASK_SCHEDULER_URI) + +fileset = { + "SingleMu": [ + "root://eospublic.cern.ch//eos/root-eos/benchmark/Run2012B_SingleMu.root" + ] +} + +executor = processor.DaskExecutor(client=client) + +run = processor.Runner( + executor=executor, schema=schemas.NanoAODSchema, savemetrics=True +) + +output, metrics = run(fileset, "Events", processor_instance=Processor()) + +# Generates a 1D histogram from the data output to the 'MET' key. fill_opts are optional, to fill the graph (default is a line). +output["MET"].plot1d() + + +# Easy way to print all cutflow dict values. Can just do print(output['cutflow']["KEY_NAME"]) for one. +for key, value in output["cutflow"].items(): + print(key, value) + +# Save the histogram plot to a file (e.g., 'histogram.png') +plt.savefig("histogram.png") diff --git a/reana.yaml b/reana.yaml new file mode 100644 index 0000000..340d6d0 --- /dev/null +++ b/reana.yaml @@ -0,0 +1,21 @@ +inputs: + files: + - analysis.py +workflow: + type: serial + resources: + dask: + image: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049 + specification: + steps: + - name: process + environment: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049 + commands: + - python analysis.py +outputs: + files: + - histogram.png +tests: + files: + - tests/log-messages.feature + - tests/workspace-files.feature diff --git a/tests/log-messages.feature b/tests/log-messages.feature new file mode 100644 index 0000000..74473bb --- /dev/null +++ b/tests/log-messages.feature @@ -0,0 +1,15 @@ +# Tests for the expected workflow log messages + +Feature: Log messages + + As a researcher, + I want to be able to see the log messages of my workflow execution, + So that I can verify that the workflow ran correctly. + + Scenario: The workflow start has produced the expected messages + When the workflow is finished + Then the job logs for the "process" step should contain + """ + all events 53446198 + number of chunks 534 + """ \ No newline at end of file diff --git a/tests/workspace-files.feature b/tests/workspace-files.feature new file mode 100644 index 0000000..670421a --- /dev/null +++ b/tests/workspace-files.feature @@ -0,0 +1,17 @@ +# Tests for the presence of the expected workflow files + +Feature: Workspace files + + As a researcher, + I want to make sure that my workflow produces expected files, + so that I can be sure that the workflow outputs are correct. + + Scenario: The workspace contains the expected input files + When the workflow is finished + Then the workspace should include "analysis.py" + + Scenario: The workflow generates the final plot + When the workflow is finished + Then the workspace should contain "histogram.png" + And the sha256 checksum of the file "histogram.png" should be "c8f87114530c049d587f355cef07280fa5d760910c32638136a713eab1aa72e1" + And all the outputs should be included in the workspace \ No newline at end of file