import numpy as np
import recoder.utils as utils
from recoder.data import RecommendationDataLoader
from multiprocessing import Process, Queue
def average_precision(x, y, k, normalize=True):
x = x[:k]
x_in_y = np.isin(x, y, assume_unique=True).astype(np.int)
tp = x_in_y.cumsum() # true positives at every position in x_in_y
precision = tp / (1 + np.arange(len(x))) # precision at every position
precision_drecall = np.multiply(precision, x_in_y) # precision * delta_recall at every position
normalization = min(k, len(y)) if normalize else len(y)
ap = precision_drecall.sum() / normalization
return ap
def recall(x, y, k, normalize=True):
x = x[:k]
x_in_y = np.isin(x, y, assume_unique=True).astype(np.int)
normalization = min(k, len(y)) if normalize else len(y)
_recall = x_in_y.sum() / normalization
return _recall
def dcg(x, y, k):
x = x[:k]
x_in_y = np.isin(x, y, assume_unique=True).astype(np.int)
cg = x_in_y / np.log2(2 + np.arange(len(x))) # cumulative gain at every position
_dcg = cg.sum()
return _dcg
def ndcg(x, y, k):
dcg_k = dcg(x, y, k)
idcg_k = dcg(y, y, k)
ndcg_k = dcg_k / idcg_k
return ndcg_k
[docs]class Metric(object):
"""
A Base class for metrics. All metrics should implement the ``evaluate`` function.
Args:
metric_name (str): metric name. useful for representing it as string
(printing) and hashing.
"""
def __init__(self, metric_name):
self.metric_name = metric_name
def __str__(self):
return self.metric_name
def __hash__(self):
return self.metric_name.__hash__()
[docs] def evaluate(self, x, y):
"""
Evaluates the recommendations with respect to the items the user interacted with.
Args:
x (list): items recommended for the user
y (list): items the user interacted with
Returns:
float: metric value
"""
raise NotImplementedError
[docs]class AveragePrecision(Metric):
"""
Computes the average precision @ K of the recommended items.
Args:
k (int): the cut position of the recommended list
normalize (bool, optional): if True, normalize the value to 1 (divide by k)
if k is less than the number of items the user interacted with, otherwise normalize
only by number of items the user interacted with.
"""
def __init__(self, k, normalize=True):
super().__init__(metric_name='AveragePrecision@{}'.format(k))
self.k = k
self.normalize = normalize
[docs] def evaluate(self, x, y):
return average_precision(x, y, k=self.k, normalize=self.normalize)
[docs]class Recall(Metric):
"""
Computes the recall @ K of the recommended items.
Args:
k (int): the cut position of the recommended list
normalize (bool, optional): if True, normalize the value to 1 (divide by k)
if k is less than the number of items in the user interactions, otherwise normalize
only by number of items in the user interactions.
"""
def __init__(self, k, normalize=True):
super().__init__(metric_name='Recall@{}'.format(k))
self.k = k
self.normalize = normalize
[docs] def evaluate(self, x, y):
return recall(x, y, k=self.k, normalize=self.normalize)
[docs]class NDCG(Metric):
"""
Computes the normalized discounted cumulative gain @ K of the recommended items.
Args:
k (int): the cut position of the recommended list
"""
def __init__(self, k):
super().__init__(metric_name='NDCG@{}'.format(k))
self.k = k
[docs] def evaluate(self, x, y):
return ndcg(x, y, k=self.k)
[docs]class RecommenderEvaluator(object):
"""
Evaluates a :class:`recoder.recommender.Recommender` given a set of :class:`Metric`
Args:
recommender (Recommender): the Recommender to evaluate
metrics (list): list of metrics used to evaluate the recommender
"""
def __init__(self, recommender, metrics):
self.recommender = recommender
self.metrics = metrics
[docs] def evaluate(self, eval_dataset, batch_size=1,
num_users=None, num_workers=0):
"""
Evaluates the recommender with an evaluation dataset.
Args:
eval_dataset (RecommendationDataset): the dataset to use
in evaluating the model
batch_size (int): the size of the users batch passed to the recommender
num_users (int, optional): the number of users from the dataset to evaluate on. If None,
evaluate on all users
num_workers (int, optional): the number of workers to use on evaluating
the recommended items. This is useful if the recommender runs on GPU, so the
evaluation can run in parallel.
Returns:
dict: A dict mapping each metric to the list of the metric values on each
user in the dataset.
"""
dataloader = RecommendationDataLoader(eval_dataset, batch_size=batch_size,
collate_fn=lambda _: _)
results = {}
for metric in self.metrics:
results[metric] = []
if num_workers > 0:
input_queue = Queue()
results_queues = [Queue() for _ in range(num_workers)]
def evaluate(input_queue, results_queue, metrics):
results = {}
for metric in self.metrics:
results[metric.metric_name] = []
while True:
x, y = input_queue.get(block=True)
if x is None:
break
for metric in metrics:
results[metric.metric_name].append(metric.evaluate(x, y))
results_queue.put(results)
workers = [Process(target=evaluate, args=(input_queue, results_queues[p_idx], self.metrics))
for p_idx in range(num_workers)]
for worker in workers:
worker.start()
processed_num_users = 0
for batch in dataloader:
input, target = batch
recommendations = self.recommender.recommend(input)
relevant_items = [target.interactions_matrix[i].nonzero()[1] for i in range(len(target.users))]
for x, y in zip(recommendations, relevant_items):
if num_workers > 0:
input_queue.put((x, y))
else:
for metric in self.metrics:
results[metric].append(metric.evaluate(x, y))
processed_num_users += len(target.users)
if num_users is not None and processed_num_users >= num_users:
break
for _ in range(num_workers):
input_queue.put((None, None))
if num_workers > 0:
for results_queue in results_queues:
queue_results = results_queue.get()
for metric in self.metrics:
results[metric].extend(queue_results[metric.metric_name])
for worker in workers:
worker.join()
return results