Skip to content

Commit

Permalink
Merge pull request #681 from NatLibFi/issue678-refactor-suggestionres…
Browse files Browse the repository at this point in the history
…ult-sparse-array

Refactor: Represent suggestion results as sparse arrays
  • Loading branch information
osma authored Apr 14, 2023
2 parents 693ab21 + 3edab48 commit 415d944
Show file tree
Hide file tree
Showing 37 changed files with 654 additions and 762 deletions.
7 changes: 6 additions & 1 deletion annif/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from glob import glob

from annif import logger
from annif.suggestion import SuggestionBatch


class AnnifBackend(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -83,7 +84,11 @@ def _suggest_batch(self, texts, params):
"""This method can be implemented by backends to use batching of documents in
their operations. This default implementation uses the regular suggest
functionality."""
return [self._suggest(text, params) for text in texts]
return SuggestionBatch.from_sequence(
[self._suggest(text, params) for text in texts],
self.project.subjects,
limit=int(params.get("limit")),
)

def suggest(self, texts, params=None):
"""Suggest subjects for the input documents and return a list of subject sets
Expand Down
8 changes: 3 additions & 5 deletions annif/backend/dummy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Dummy backend for testing basic interaction of projects and backends"""


from annif.suggestion import ListSuggestionResult, SubjectSuggestion
from annif.suggestion import SubjectSuggestion

from . import backend

Expand All @@ -27,17 +27,15 @@ def _suggest(self, text, params):

# Give no hits for no text
if len(text) == 0:
return ListSuggestionResult([])
return []

# allow overriding returned subject via uri parameter
if "uri" in params:
subject_id = self.project.subjects.by_uri(params["uri"])
else:
subject_id = self.subject_id

return ListSuggestionResult(
[SubjectSuggestion(subject_id=subject_id, score=score)]
)
return [SubjectSuggestion(subject_id=subject_id, score=score)]

def _learn(self, corpus, params):
# in this dummy backend we "learn" by picking up the subject ID
Expand Down
94 changes: 32 additions & 62 deletions annif/backend/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import annif.eval
import annif.parallel
import annif.suggestion
import annif.util
from annif.exception import NotSupportedException
from annif.suggestion import SuggestionBatch

from . import backend, hyperopt

Expand All @@ -28,37 +28,28 @@ def initialize(self, parallel=False):
project = self.project.registry.get_project(project_id)
project.initialize(parallel)

def _normalize_hits(self, hits, source_project):
"""Hook for processing hits from backends. Intended to be overridden
by subclasses."""
return hits

def _suggest_with_sources(self, texts, sources):
hit_sets_from_sources = []
for project_id, weight in sources:
source_project = self.project.registry.get_project(project_id)
hit_sets = source_project.suggest(texts)
norm_hit_sets = [
self._normalize_hits(hits, source_project) for hits in hit_sets
]
hit_sets_from_sources.append(
annif.suggestion.WeightedSuggestionsBatch(
hit_sets=norm_hit_sets,
weight=weight,
subjects=source_project.subjects,
)
)
return hit_sets_from_sources
return {
project_id: self.project.registry.get_project(project_id).suggest(texts)
for project_id, _ in sources
}

def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params):
"""Hook for merging hit sets from sources. Can be overridden by
subclasses."""
return annif.util.merge_hits(hit_sets_from_sources, len(self.project.subjects))
def _merge_source_batches(self, batch_by_source, sources, params):
"""Merge the given SuggestionBatches from each source into a single
SuggestionBatch. The default implementation computes a weighted
average based on the weights given in the sources tuple. Intended
to be overridden in subclasses."""

batches = [batch_by_source[project_id] for project_id, _ in sources]
weights = [weight for _, weight in sources]
return SuggestionBatch.from_averaged(batches, weights).filter(
limit=int(params["limit"])
)

def _suggest_batch(self, texts, params):
sources = annif.util.parse_sources(params["sources"])
hit_sets_from_sources = self._suggest_with_sources(texts, sources)
return self._merge_hit_sets_from_sources(hit_sets_from_sources, params)
batch_by_source = self._suggest_with_sources(texts, sources)
return self._merge_source_batches(batch_by_source, sources, params)


class EnsembleOptimizer(hyperopt.HyperparameterOptimizer):
Expand All @@ -74,8 +65,8 @@ def __init__(self, backend, corpus, metric):
]

def _prepare(self, n_jobs=1):
self._gold_subjects = []
self._source_hits = []
self._gold_batches = []
self._source_batches = []

for project_id in self._sources:
project = self._backend.project.registry.get_project(project_id)
Expand All @@ -92,23 +83,11 @@ def _prepare(self, n_jobs=1):
jobs, pool_class = annif.parallel.get_pool(n_jobs)

with pool_class(jobs) as pool:
for hit_sets, subject_sets in pool.imap_unordered(
for suggestions, gold_batch in pool.imap_unordered(
psmap.suggest_batch, self._corpus.doc_batches
):
self._gold_subjects.extend(subject_sets)
self._source_hits.extend(self._hit_sets_to_list(hit_sets))

def _hit_sets_to_list(self, hit_sets):
"""Convert a dict of lists of hits to a list of dicts of hits, e.g.
{"proj-1": [p-1-doc-1-hits, p-1-doc-2-hits]
"proj-2": [p-2-doc-1-hits, p-2-doc-2-hits]}
to
[{"proj-1": p-1-doc-1-hits, "proj-2": p-2-doc-1-hits},
{"proj-1": p-1-doc-2-hits, "proj-2": p-2-doc-2-hits}]
"""
return [
dict(zip(hit_sets.keys(), doc_hits)) for doc_hits in zip(*hit_sets.values())
]
self._source_batches.append(suggestions)
self._gold_batches.append(gold_batch)

def _normalize(self, hps):
total = sum(hps.values())
Expand All @@ -120,28 +99,19 @@ def _format_cfg_line(self, hps):
)

def _objective(self, trial):
batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
weights = {
eval_batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
proj_weights = {
project_id: trial.suggest_uniform(project_id, 0.0, 1.0)
for project_id in self._sources
}
for goldsubj, srchits in zip(self._gold_subjects, self._source_hits):
weighted_hits = []
for project_id, hits in srchits.items():
weighted_hits.append(
annif.suggestion.WeightedSuggestionsBatch(
hit_sets=[hits],
weight=weights[project_id],
subjects=self._backend.project.subjects,
)
)
batch.evaluate(
annif.util.merge_hits(
weighted_hits, len(self._backend.project.subjects)
)[0],
goldsubj,
for gold_batch, src_batches in zip(self._gold_batches, self._source_batches):
batches = [src_batches[project_id] for project_id in self._sources]
weights = [proj_weights[project_id] for project_id in self._sources]
avg_batch = SuggestionBatch.from_averaged(batches, weights).filter(
limit=int(self._backend.params["limit"])
)
results = batch.results(metrics=[self._metric])
eval_batch.evaluate_many(avg_batch, gold_batch)
results = eval_batch.results(metrics=[self._metric])
return results[self._metric]

def _postprocess(self, study):
Expand Down
4 changes: 2 additions & 2 deletions annif/backend/fasttext.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import annif.util
from annif.exception import NotInitializedException, NotSupportedException
from annif.suggestion import ListSuggestionResult, SubjectSuggestion
from annif.suggestion import SubjectSuggestion

from . import backend, mixins

Expand Down Expand Up @@ -163,4 +163,4 @@ def _suggest_chunks(self, chunktexts, params):
score=score / len(chunktexts),
)
)
return ListSuggestionResult(results)
return results
10 changes: 5 additions & 5 deletions annif/backend/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests.exceptions

from annif.exception import OperationFailedException
from annif.suggestion import ListSuggestionResult, SubjectSuggestion
from annif.suggestion import SubjectSuggestion

from . import backend

Expand Down Expand Up @@ -69,13 +69,13 @@ def _suggest(self, text, params):
req.raise_for_status()
except requests.exceptions.RequestException as err:
self.warning("HTTP request failed: {}".format(err))
return ListSuggestionResult([])
return []

try:
response = req.json()
except ValueError as err:
self.warning("JSON decode failed: {}".format(err))
return ListSuggestionResult([])
return []

if "results" in response:
results = response["results"]
Expand All @@ -93,6 +93,6 @@ def _suggest(self, text, params):
]
except (TypeError, ValueError) as err:
self.warning("Problem interpreting JSON data: {}".format(err))
return ListSuggestionResult([])
return []

return ListSuggestionResult(subject_suggestions)
return subject_suggestions
3 changes: 1 addition & 2 deletions annif/backend/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import annif.util
from annif.exception import NotInitializedException
from annif.suggestion import ListSuggestionResult


class ChunkingBackend(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -39,7 +38,7 @@ def _suggest(self, text, params):
chunktexts.append(" ".join(sentences[i : i + chunksize]))
self.debug("Split sentences into {} chunks".format(len(chunktexts)))
if len(chunktexts) == 0: # no input, empty result
return ListSuggestionResult([])
return []
return self._suggest_chunks(chunktexts, params)


Expand Down
7 changes: 3 additions & 4 deletions annif/backend/mllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import annif.util
from annif.exception import NotInitializedException, NotSupportedException
from annif.lexical.mllm import MLLMModel
from annif.suggestion import VectorSuggestionResult
from annif.suggestion import vector_to_suggestions

from . import backend, hyperopt

Expand Down Expand Up @@ -48,7 +48,7 @@ def _objective(self, trial):
else:
ranking = []
results = self._backend._prediction_to_result(ranking, params)
batch.evaluate(results, goldsubj)
batch.evaluate_many([results], [goldsubj])
results = batch.results(metrics=[self._metric])
return results[self._metric]

Expand Down Expand Up @@ -144,8 +144,7 @@ def _prediction_to_result(self, prediction, params):
vector = np.zeros(len(self.project.subjects), dtype=np.float32)
for score, subject_id in prediction:
vector[subject_id] = score
result = VectorSuggestionResult(vector)
return result.filter(self.project.subjects, limit=int(params["limit"]))
return vector_to_suggestions(vector, int(params["limit"]))

def _suggest(self, text, params):
candidates = self._generate_candidates(text)
Expand Down
27 changes: 17 additions & 10 deletions annif/backend/nn_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import annif.parallel
import annif.util
from annif.exception import NotInitializedException, NotSupportedException
from annif.suggestion import VectorSuggestionResult
from annif.suggestion import SuggestionBatch, vector_to_suggestions

from . import backend, ensemble

Expand Down Expand Up @@ -130,21 +130,28 @@ def initialize(self, parallel=False):
model_filename, custom_objects={"MeanLayer": MeanLayer}
)

def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params):
def _merge_source_batches(self, batch_by_source, sources, params):
src_weight = dict(sources)
score_vectors = np.array(
[
[
np.sqrt(hits.as_vector(len(subjects)))
* weight
* len(hit_sets_from_sources)
for hits in proj_hit_set
np.sqrt(suggestions.as_vector())
* src_weight[project_id]
* len(batch_by_source)
for suggestions in batch
]
for proj_hit_set, weight, subjects in hit_sets_from_sources
for project_id, batch in batch_by_source.items()
],
dtype=np.float32,
).transpose(1, 2, 0)
results = self._model(score_vectors).numpy()
return [VectorSuggestionResult(res) for res in results]
prediction = self._model(score_vectors).numpy()
return SuggestionBatch.from_sequence(
[
vector_to_suggestions(row, limit=int(params["limit"]))
for row in prediction
],
self.project.subjects,
)

def _create_model(self, sources):
self.info("creating NN ensemble model")
Expand Down Expand Up @@ -215,7 +222,7 @@ def _corpus_to_vectors(self, corpus, seq, n_jobs):
):
doc_scores = []
for project_id, p_hits in hits.items():
vector = p_hits.as_vector(len(self.project.subjects))
vector = p_hits.as_vector()
doc_scores.append(
np.sqrt(vector) * sources[project_id] * len(sources)
)
Expand Down
8 changes: 4 additions & 4 deletions annif/backend/omikuji.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
NotSupportedException,
OperationFailedException,
)
from annif.suggestion import ListSuggestionResult, SubjectSuggestion
from annif.suggestion import SubjectSuggestion, SuggestionBatch

from . import backend, mixins

Expand Down Expand Up @@ -129,11 +129,11 @@ def _suggest_batch(self, texts, params):
batch_results = []
for row in vector:
if row.nnz == 0: # All zero vector, empty result
batch_results.append(ListSuggestionResult([]))
batch_results.append([])
continue
feature_values = [(col, row[0, col]) for col in row.nonzero()[1]]
results = []
for subj_id, score in self._model.predict(feature_values, top_k=limit):
results.append(SubjectSuggestion(subject_id=subj_id, score=score))
batch_results.append(ListSuggestionResult(results))
return batch_results
batch_results.append(results)
return SuggestionBatch.from_sequence(batch_results, self.project.subjects)
Loading

0 comments on commit 415d944

Please sign in to comment.