Skip to content

Commit

Permalink
[add:lib] Add multi-processed predictions for HMMClassifier (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
eonu authored Jan 6, 2021
1 parent 05a07cf commit 73c5834
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The following algorithms provided within Sequentia support the use of multivaria
- [x] Hidden Markov Models (via [`hmmlearn`](https://github.com/hmmlearn/hmmlearn))<br/><em>Learning with the Baum-Welch algorithm</em> [[1]](#references)
- [x] Gaussian Mixture Model emissions
- [x] Linear, left-right and ergodic topologies
- [x] Multi-processed predictions
- [x] Dynamic Time Warping k-Nearest Neighbors (via [`dtaidistance`](https://github.com/wannesm/dtaidistance))
- [x] Sakoe–Chiba band global warping constraint
- [x] Dependent and independent feature warping (DTWD & DTWI)
Expand Down
53 changes: 47 additions & 6 deletions lib/sequentia/classifiers/hmm/hmm_classifier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import numpy as np, pickle
import tqdm, tqdm.auto, numpy as np, pickle
from joblib import Parallel, delayed
from multiprocessing import cpu_count
from .gmmhmm import GMMHMM
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelEncoder
Expand Down Expand Up @@ -43,7 +45,7 @@ def fit(self, models):
self._encoder = LabelEncoder()
self._encoder.fit([model.label for model in models])

def predict(self, X, prior='frequency', return_scores=False, original_labels=True):
def predict(self, X, prior='frequency', verbose=True, return_scores=False, original_labels=True, n_jobs=1):
"""Predicts the label for an observation sequence (or multiple sequences) according to maximum likelihood or posterior scores.
Parameters
Expand All @@ -60,12 +62,24 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.
verbose: bool
Whether to display a progress bar or not.
.. note::
If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
are always displayed in the console, regardless of where you are running this function from
(e.g. a Jupyter notebook).
return_scores: bool
Whether to return the scores of each model on the observation sequence(s).
original_labels: bool
Whether to inverse-transform the labels to their original encoding.
n_jobs: int > 0 or -1
| The number of jobs to run in parallel.
| Setting this to -1 will use all available CPU cores.
Returns
-------
prediction(s): str/numeric or :class:`numpy:numpy.ndarray` (str/numeric)
Expand All @@ -91,7 +105,10 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
assert np.isclose(sum(prior), 1.), 'Class priors must form a probability distribution by summing to one'
else:
self._val.one_of(prior, ['frequency', 'uniform'], desc='prior')
self._val.boolean(verbose, desc='verbose')
self._val.boolean(return_scores, desc='return_scores')
self._val.boolean(original_labels, desc='original_labels')
self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')

# Create look-up for prior probabilities
if prior == 'frequency':
Expand All @@ -105,10 +122,15 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
# Convert single observation sequence to a singleton list
X = [X] if isinstance(X, np.ndarray) else X

# Lambda for calculating the log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])

# Calculate log un-normalized posteriors as a sum of the log forward probabilities (likelihoods) and log priors
# Perform the MAP classification rule and return labels to original encoding if necessary
posteriors = lambda x: np.array([model.forward(x) + np.log(prior[model.label]) for model in self._models])
scores = np.array([posteriors(x) for x in X])
n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
scores = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, posteriors, chunk, verbose) for i, chunk in enumerate(X_chunks))
scores = np.concatenate(scores)
best_idxs = np.atleast_1d(scores.argmax(axis=1))
labels = self._encoder.inverse_transform(best_idxs) if original_labels else best_idxs

Expand All @@ -117,7 +139,7 @@ def predict(self, X, prior='frequency', return_scores=False, original_labels=Tru
else:
return (labels, scores) if return_scores else labels

def evaluate(self, X, y, prior='frequency'):
def evaluate(self, X, y, prior='frequency', verbose=True, n_jobs=1):
"""Evaluates the performance of the classifier on a batch of observation sequences and their labels.
Parameters
Expand All @@ -137,6 +159,18 @@ def evaluate(self, X, y, prior='frequency'):
Alternatively, class prior probabilities can be specified in an iterable of floats, e.g. `[0.1, 0.3, 0.6]`.
verbose: bool
Whether to display a progress bar or not.
.. note::
If both ``verbose=True`` and ``n_jobs > 1``, then the progress bars for each process
are always displayed in the console, regardless of where you are running this function from
(e.g. a Jupyter notebook).
n_jobs: int > 0 or -1
| The number of jobs to run in parallel.
| Setting this to -1 will use all available CPU cores.
Returns
-------
accuracy: float
Expand All @@ -146,7 +180,7 @@ def evaluate(self, X, y, prior='frequency'):
The confusion matrix representing the discrepancy between predicted and actual labels.
"""
X, y = self._val.observation_sequences_and_labels(X, y)
predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False)
predictions = self.predict(X, prior=prior, return_scores=False, original_labels=False, verbose=verbose, n_jobs=n_jobs)
cm = confusion_matrix(self._encoder.transform(y), predictions, labels=self._encoder.transform(self._encoder.classes_))
return np.sum(np.diag(cm)) / np.sum(cm), cm

Expand Down Expand Up @@ -183,6 +217,13 @@ def load(cls, path):
with open(path, 'rb') as file:
return pickle.load(file)

def _chunk_predict(self, process, posteriors, chunk, verbose): # Requires fit
"""Makes predictions (scores) for a chunk of the observation sequences, for a given subprocess."""
return np.array([posteriors(x) for x in tqdm.auto.tqdm(
chunk, desc='Classifying examples (process {})'.format(process),
disable=not(verbose), position=process-1
)])

@property
def models(self):
try:
Expand Down
5 changes: 3 additions & 2 deletions lib/sequentia/classifiers/knn/knn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ def predict(self, X, verbose=True, original_labels=True, n_jobs=1):

X = self._val.observation_sequences(X, allow_single=True)
self._val.boolean(verbose, desc='verbose')
self._val.boolean(original_labels, desc='original_labels')
self._val.restricted_integer(n_jobs, lambda x: x == -1 or x > 0, 'number of jobs', '-1 or greater than zero')

if isinstance(X, np.ndarray):
distances = np.array([self._dtw(X, x) for x in tqdm.auto.tqdm(self._X, desc='Calculating distances', disable=not(verbose))])
return self._output(self._find_nearest(distances), original_labels)
else:
n_jobs = cpu_count() if n_jobs == -1 else n_jobs
n_jobs = min(cpu_count() if n_jobs == -1 else n_jobs, len(X))
X_chunks = [list(chunk) for chunk in np.array_split(np.array(X, dtype=object), n_jobs)]
labels = Parallel(n_jobs=min(n_jobs, len(X)))(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
labels = Parallel(n_jobs=n_jobs)(delayed(self._chunk_predict)(i+1, chunk, verbose) for i, chunk in enumerate(X_chunks))
return self._output(np.concatenate(labels), original_labels) # Flatten the resulting array

def evaluate(self, X, y, verbose=True, n_jobs=1):
Expand Down
26 changes: 13 additions & 13 deletions notebooks/Pen-Tip Trajectories (Example).ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "9036fa23214f4727b2ad661a19a549c4",
"model_id": "3ddd5bf499294bf0bc0963ab4dbb6ece",
"version_major": 2,
"version_minor": 0
},
Expand Down Expand Up @@ -444,7 +444,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1a3610f874c4400cab88d38292c49468",
"model_id": "c2f3a92e26114f1fbb4d45058698b747",
"version_major": 2,
"version_minor": 0
},
Expand All @@ -461,8 +461,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
"CPU times: user 1.75 s, sys: 108 ms, total: 1.86 s\n",
"Wall time: 2.2 s\n"
"CPU times: user 1.68 s, sys: 195 ms, total: 1.88 s\n",
"Wall time: 1.98 s\n"
]
}
],
Expand Down Expand Up @@ -491,8 +491,8 @@
"text": [
"w c d e a e b h s v c y w e v v w v v b o e l c d c p n h p y p m h d a y d b n m m a g o g c n l y\n",
"\n",
"CPU times: user 699 ms, sys: 80.5 ms, total: 779 ms\n",
"Wall time: 3.73 s\n"
"CPU times: user 721 ms, sys: 90.5 ms, total: 811 ms\n",
"Wall time: 3.24 s\n"
]
}
],
Expand Down Expand Up @@ -521,8 +521,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 542 ms, sys: 17.1 ms, total: 559 ms\n",
"Wall time: 21.9 s\n"
"CPU times: user 556 ms, sys: 17.3 ms, total: 573 ms\n",
"Wall time: 10.1 s\n"
]
}
],
Expand Down Expand Up @@ -568,7 +568,7 @@
"\n",
"While the fast C compiled functions in the [`dtaidistance`](https://github.com/wannesm/dtaidistance) package (along with the multiprocessing capabilities of Sequentia's `KNNClassifier`) help to speed up classification **a lot**, the practical use of $k$-NN becomes more limited as the dataset grows larger. \n",
"\n",
"In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx22s$, which is even faster than the HMM classifier that we show below. "
"In this case, since our dataset is relatively small, classifying all test examples was completed in $\\approx10s$, which is even faster than the HMM classifier that we show below. "
]
},
{
Expand Down Expand Up @@ -605,7 +605,7 @@
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0fbffb1d3bbc44b5b6356b54a89a61b2",
"model_id": "30052beed780408db9e7b1f1212b6404",
"version_major": 2,
"version_minor": 0
},
Expand Down Expand Up @@ -655,14 +655,14 @@
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3min 33s, sys: 18 s, total: 3min 51s\n",
"Wall time: 2min 34s\n"
"CPU times: user 197 ms, sys: 13.5 ms, total: 210 ms\n",
"Wall time: 55.4 s\n"
]
}
],
"source": [
"%%time\n",
"acc, cm = clf.evaluate(X_test, y_test)"
"acc, cm = clf.evaluate(X_test, y_test, n_jobs=-1)"
]
},
{
Expand Down

0 comments on commit 73c5834

Please sign in to comment.