From 73c5834cc486f7166934deca3c4a72115d8c5309 Mon Sep 17 00:00:00 2001 From: Edwin Onuonga Date: Thu, 7 Jan 2021 03:21:11 +0400 Subject: [PATCH] [add:lib] Add multi-processed predictions for HMMClassifier (#136) --- README.md | 1 + .../classifiers/hmm/hmm_classifier.py | 53 ++++++++++++++++--- .../classifiers/knn/knn_classifier.py | 5 +- .../Pen-Tip Trajectories (Example).ipynb | 26 ++++----- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 7372c7c2..2b096231 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ The following algorithms provided within Sequentia support the use of multivaria - [x] Hidden Markov Models (via [`hmmlearn`](https://github.com/hmmlearn/hmmlearn))
Learning with the Baum-Welch algorithm [[1]](#references) - [x] Gaussian Mixture Model emissions - [x] Linear, left-right and ergodic topologies + - [x] Multi-processed predictions - [x] Dynamic Time Warping k-Nearest Neighbors (via [`dtaidistance`](https://github.com/wannesm/dtaidistance)) - [x] Sakoe–Chiba band global warping constraint - [x] Dependent and independent feature warping (DTWD & DTWI) diff --git a/lib/sequentia/classifiers/hmm/hmm_classifier.py b/lib/sequentia/classifiers/hmm/hmm_classifier.py index 8f2ef552..08f2cfa3 100644 --- a/lib/sequentia/classifiers/hmm/hmm_classifier.py +++ b/lib/sequentia/classifiers/hmm/hmm_classifier.py @@ -1,4 +1,6 @@ -import numpy as np, pickle +import tqdm, tqdm.auto, numpy as np, pickle +from joblib import Parallel, delayed +from multiprocessing import cpu_count from .gmmhmm import GMMHMM from sklearn.metrics import confusion_matrix from sklearn.preprocessing import LabelEncoder @@ -43,7 +45,7 @@ def fit(self, models): self._encoder = LabelEncoder() self._encoder.fit([model.label for model in models]) - def predict(self, X, prior='frequency', return_scores=False, original_labels=True): + def predict(self, X, prior='frequency', verbose=True, return_scores=False, original_labels=True, n_jobs=1): """Predicts the label for an observation sequence (or multiple sequences) according to maximum likelihood or posterior scores. Parameters @@ -60,12 +62,24 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`. + verbose: bool + Whether to display a progress bar or not. + + .. note:: + If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process + are always displayed in the console, regardless of where you are running this function from + (e.g. a Jupyter notebook). + return_scores: bool Whether to return the scores of each model on the observation sequence(s). original_labels: bool Whether to inverse-transform the labels to their original encoding. + n_jobs: int > 0 or -1 + | The number of jobs to run in parallel. + | Setting this to -1 will use all available CPU cores. + Returns ------- prediction(s): str/numeric or :class:`numpy:numpy.ndarray` (str/numeric) @@ -91,7 +105,10 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru assert np.isclose(sum(prior), 1.), 'Class priors must form a probability distribution by summing to one' else: self._val.one_of(prior, ['frequency', 'uniform'], desc='prior') + self._val.boolean(verbose, desc='verbose') self._val.boolean(return_scores, desc='return_scores') + self._val.boolean(original_labels, desc='original_labels') + self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero') # Create look-up for prior probabilities if prior == 'frequency': @@ -105,10 +122,15 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru # Convert single observation sequence to a singleton list X = [X] if isinstance(X, np.ndarray) else X + # Lambda for calculating the log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors + posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models]) + # Calculate log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors # Perform the MAP classification rule and return labels to original encoding if necessary - posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models]) - scores = np.array([posteriors(x) for x in X]) + n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X)) + X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)] + scores = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, posteriors, chunk, verbose) for i, chunk in enumerate(X_chunks)) + scores = np.concatenate(scores) best_idxs = np.atleast_1d(scores.argmax(axis=1)) labels = self._encoder.inverse_transform(best_idxs) if original_labels else best_idxs @@ -117,7 +139,7 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru else: return (labels, scores) if return_scores else labels - def evaluate(self, X, y, prior='frequency'): + def evaluate(self, X, y, prior='frequency', verbose=True, n_jobs=1): """Evaluates the performance of the classifier on a batch of observation sequences and their labels. Parameters @@ -137,6 +159,18 @@ def evaluate(self, X, y, prior='frequency'): Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`. + verbose: bool + Whether to display a progress bar or not. + + .. note:: + If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process + are always displayed in the console, regardless of where you are running this function from + (e.g. a Jupyter notebook). + + n_jobs: int > 0 or -1 + | The number of jobs to run in parallel. + | Setting this to -1 will use all available CPU cores. + Returns ------- accuracy: float @@ -146,7 +180,7 @@ def evaluate(self, X, y, prior='frequency'): The confusion matrix representing the discrepancy between predicted and actual labels. """ X, y = self._val.observation_sequences_and_labels(X, y) - predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False) + predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False, verbose=verbose, n_jobs=n_jobs) cm = confusion_matrix(self._encoder.transform(y), predictions, labels=self._encoder.transform(self._encoder.classes_)) return np.sum(np.diag(cm)) / np.sum(cm), cm @@ -183,6 +217,13 @@ def load(cls, path): with open(path, 'rb') as file: return pickle.load(file) + def _chunk_predict(self, process, posteriors, chunk, verbose): # Requires fit + """Makes predictions (scores) for a chunk of the observation sequences, for a given subprocess.""" + return np.array([posteriors(x) for x in tqdm.auto.tqdm( + chunk, desc='Classifying examples (process {})'.format(process), + disable=not(verbose), position=process-1 + )]) + @property def models(self): try: diff --git a/lib/sequentia/classifiers/knn/knn_classifier.py b/lib/sequentia/classifiers/knn/knn_classifier.py index cc4cf934..7ac38e2b 100644 --- a/lib/sequentia/classifiers/knn/knn_classifier.py +++ b/lib/sequentia/classifiers/knn/knn_classifier.py @@ -175,15 +175,16 @@ def predict(self, X, verbose=True, original_labels=True, n_jobs=1): X = self._val.observation_sequences(X, allow_single=True) self._val.boolean(verbose, desc='verbose') + self._val.boolean(original_labels, desc='original_labels') self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero') if isinstance(X, np.ndarray): distances = np.array([self._dtw(X, x) for x in tqdm.auto.tqdm(self._X, desc='Calculating distances', disable=not(verbose))]) return self._output(self._find_nearest(distances), original_labels) else: - n_jobs = cpu_count() if n_jobs == -1 else n_jobs + n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X)) X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)] - labels = Parallel(n_jobs=min(n_jobs, len(X)))(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks)) + labels = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks)) return self._output(np.concatenate(labels), original_labels) # Flatten the resulting array def evaluate(self, X, y, verbose=True, n_jobs=1): diff --git a/notebooks/Pen-Tip Trajectories (Example).ipynb b/notebooks/Pen-Tip Trajectories (Example).ipynb index 19e97d15..cf1e023a 100644 --- a/notebooks/Pen-Tip Trajectories (Example).ipynb +++ b/notebooks/Pen-Tip Trajectories (Example).ipynb @@ -409,7 +409,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "9036fa23214f4727b2ad661a19a549c4", + "model_id": "3ddd5bf499294bf0bc0963ab4dbb6ece", "version_major": 2, "version_minor": 0 }, @@ -444,7 +444,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "1a3610f874c4400cab88d38292c49468", + "model_id": "c2f3a92e26114f1fbb4d45058698b747", "version_major": 2, "version_minor": 0 }, @@ -461,8 +461,8 @@ "text": [ "w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n", "\n", - "CPU times: user 1.75 s, sys: 108 ms, total: 1.86 s\n", - "Wall time: 2.2 s\n" + "CPU times: user 1.68 s, sys: 195 ms, total: 1.88 s\n", + "Wall time: 1.98 s\n" ] } ], @@ -491,8 +491,8 @@ "text": [ "w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n", "\n", - "CPU times: user 699 ms, sys: 80.5 ms, total: 779 ms\n", - "Wall time: 3.73 s\n" + "CPU times: user 721 ms, sys: 90.5 ms, total: 811 ms\n", + "Wall time: 3.24 s\n" ] } ], @@ -521,8 +521,8 @@ "name": "stdout", "output_type": "stream", "text": [ - "CPU times: user 542 ms, sys: 17.1 ms, total: 559 ms\n", - "Wall time: 21.9 s\n" + "CPU times: user 556 ms, sys: 17.3 ms, total: 573 ms\n", + "Wall time: 10.1 s\n" ] } ], @@ -568,7 +568,7 @@ "\n", "While the fast C compiled functions in the [`dtaidistance`](https://github.com/wannesm/dtaidistance) package (along with the multiprocessing capabilities of Sequentia's `KNNClassifier`) help to speed up classification **a lot**, the practical use of $k$-NN becomes more limited as the dataset grows larger. \n", "\n", - "In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx22s$, which is even faster than the HMM classifier that we show below. " + "In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx10s$, which is even faster than the HMM classifier that we show below. " ] }, { @@ -605,7 +605,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "0fbffb1d3bbc44b5b6356b54a89a61b2", + "model_id": "30052beed780408db9e7b1f1212b6404", "version_major": 2, "version_minor": 0 }, @@ -655,14 +655,14 @@ "name": "stdout", "output_type": "stream", "text": [ - "CPU times: user 3min 33s, sys: 18 s, total: 3min 51s\n", - "Wall time: 2min 34s\n" + "CPU times: user 197 ms, sys: 13.5 ms, total: 210 ms\n", + "Wall time: 55.4 s\n" ] } ], "source": [ "%%time\n", - "acc, cm = clf.evaluate(X_test, y_test)" + "acc, cm = clf.evaluate(X_test, y_test, n_jobs=-1)" ] }, {