import os
import glog as log
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.nn import BCEWithLogitsLoss
import numpy as np
from recoder import __version__
from recoder.data import RecommendationDataset, RecommendationDataLoader, BatchCollator
from recoder.metrics import RecommenderEvaluator
from recoder.nn import FactorizationModel
from recoder.recommender import InferenceRecommender
from recoder.losses import MSELoss, MultinomialNLLLoss
from tqdm import tqdm
[docs]class Recoder(object):
"""
Module to train/evaluate a recommendation :class:`recoder.nn.FactorizationModel`.
Args:
model (FactorizationModel): the factorization model to train.
num_items (int, optional): the number of items to represent. If None, it will
be computed from the first training dataset passed to ``train()``.
num_users (int, optional): the number of users to represent. If not provided, it will
be computed from the first training dataset passed to ``train()``.
optimizer_type (str, optional): optimizer type (one of 'sgd', 'adam', 'adagrad', 'rmsprop').
loss (str or torch.nn.Module, optional): loss function used to train the model.
If loss is a ``str``, it should be `mse` for ``recoder.losses.MSELoss``, `logistic` for
``torch.nn.BCEWithLogitsLoss``, or `logloss` for ``recoder.losses.MultinomialNLLLoss``. If ``loss``
is a ``torch.nn.Module``, then that Module will be used as a loss function and make sure that
the loss reduction is a sum reduction and not an elementwise mean.
loss_params (dict, optional): loss function extra params based on loss module if ``loss`` is a ``str``.
Ignored if ``loss`` is a ``torch.nn.Module``.
use_cuda (bool, optional): use GPU when training/evaluating the model.
user_based (bool, optional): If your model is based on users or not. If True, an exception will
will be raised when there are inconsistencies between the users represented in the model
and the users in the training datasets.
item_based (bool, optional): If your model is based on items or not. If True, an exception will
will be raised when there are inconsistencies between the items represented in the model
and the items in the training datasets.
"""
def __init__(self, model: FactorizationModel,
num_items=None, num_users=None,
optimizer_type='sgd', loss='mse',
loss_params=None, use_cuda=False,
user_based=True, item_based=True):
self.model = model
self.num_items = num_items
self.num_users = num_users
self.optimizer_type = optimizer_type
self.loss = loss
self.loss_params = loss_params if loss_params else {}
self.use_cuda = use_cuda
self.user_based = user_based
self.item_based = item_based
if self.use_cuda:
self.device = torch.device('cuda')
else:
self.device = torch.device('cpu')
self.optimizer = None
self.sparse_optimizer = None
self.current_epoch = 1
self.items = None
self.users = None
self.__model_initialized = False
self.__optimizer_state_dict = None
self.__sparse_optimizer_state_dict = None
def __init_model(self):
if self.__model_initialized:
return
self.model.init_model(self.num_items, self.num_users)
self.model = self.model.to(device=self.device)
self.__model_initialized = True
def __init_loss_module(self):
if issubclass(self.loss.__class__, torch.nn.Module):
self.loss_module = self.loss
elif self.loss == 'logistic':
self.loss_module = BCEWithLogitsLoss(reduction='sum', **self.loss_params)
elif self.loss == 'mse':
self.loss_module = MSELoss(reduction='sum', **self.loss_params)
elif self.loss == 'logloss':
self.loss_module = MultinomialNLLLoss(reduction='sum')
elif self.loss is None:
raise ValueError('No loss function defined')
else:
raise ValueError('Unknown loss function {}'.format(self.loss))
def __init_optimizer(self, lr, weight_decay):
# When continuing training on the same Recoder instance
if self.optimizer is not None:
self.__optimizer_state_dict = self.optimizer.state_dict()
if self.sparse_optimizer is not None:
self.__sparse_optimizer_state_dict = self.sparse_optimizer.state_dict()
# Collecting sparse parameter names
sparse_params_names = []
sparse_modules = [torch.nn.Embedding, torch.nn.EmbeddingBag]
for module_name, module in self.model.named_modules():
if type(module) in sparse_modules and module.sparse:
sparse_params_names.extend([module_name + '.' + param_name
for param_name, param in module.named_parameters()])
# Initializing optimizer params
params = []
sparse_params = []
for param_name, param in self.model.named_parameters():
_weight_decay = weight_decay
if 'bias' in param_name:
_weight_decay = 0
if param_name in sparse_params_names:
# If module is an embedding layer with sparse gradients
# then add its parameters to sparse optimizer
sparse_params.append({'params': param, 'weight_decay': _weight_decay})
else:
params.append({'params': param, 'weight_decay': _weight_decay})
if self.optimizer_type == "adam":
if len(params) > 0:
self.optimizer = optim.Adam(params, lr=lr)
if len(sparse_params) > 0:
self.sparse_optimizer = optim.SparseAdam(sparse_params, lr=lr)
elif self.optimizer_type == "adagrad":
if len(sparse_params) > 0:
raise ValueError('Sparse gradients optimization not supported with adagrad')
self.optimizer = optim.Adagrad(params, lr=lr)
elif self.optimizer_type == "sgd":
if len(sparse_params) > 0:
raise ValueError('Sparse gradients optimization not supported with sgd')
self.optimizer = optim.SGD(params, lr=lr, momentum=0.9)
elif self.optimizer_type == "rmsprop":
if len(sparse_params) > 0:
raise ValueError('Sparse gradients optimization not supported with rmsprop')
self.optimizer = optim.RMSprop(params, lr=lr, momentum=0.9)
else:
raise Exception('Unknown optimizer kind')
if self.__optimizer_state_dict is not None:
self.optimizer.load_state_dict(self.__optimizer_state_dict)
self.__optimizer_state_dict = None # no need for this anymore
if self.__sparse_optimizer_state_dict is not None and self.sparse_optimizer is not None:
self.sparse_optimizer.load_state_dict(self.__sparse_optimizer_state_dict)
self.__sparse_optimizer_state_dict = None
[docs] def init_from_model_file(self, model_file):
"""
Initializes the model from a pre-trained model
Args:
model_file (str): the pre-trained model file path
"""
log.info('Loading model from: {}'.format(model_file))
if not os.path.isfile(model_file):
raise Exception('No state file found in {}'.format(model_file))
model_saved_state = torch.load(model_file, map_location='cpu')
model_params = model_saved_state['model_params']
self.current_epoch = model_saved_state['last_epoch']
self.loss = model_saved_state.get('loss', self.loss)
self.loss_params = model_saved_state.get('loss_params', self.loss_params)
self.optimizer_type = model_saved_state['optimizer_type']
self.items = model_saved_state.get('items', None)
self.users = model_saved_state.get('users', None)
self.num_items = model_saved_state.get('num_items', None)
self.num_users = model_saved_state.get('num_users', None)
self.__optimizer_state_dict = model_saved_state['optimizer']
self.__sparse_optimizer_state_dict = model_saved_state.get('sparse_optimizer', None)
self.model.load_model_params(model_params)
self.__init_model()
self.model.load_state_dict(model_saved_state['model'])
[docs] def save_state(self, model_checkpoint_prefix):
"""
Saves the model state in the path starting with ``model_checkpoint_prefix`` and appending it
with the model current training epoch
Args:
model_checkpoint_prefix (str): the model save path prefix
Returns:
the model state file path
"""
checkpoint_file = "{}_epoch_{}.model".format(model_checkpoint_prefix, self.current_epoch)
log.info("Saving model to {}".format(checkpoint_file))
current_state = {
'recoder_version': __version__,
'model_params': self.model.model_params(),
'last_epoch': self.current_epoch,
'model': self.model.state_dict(),
'optimizer_type': self.optimizer_type,
'optimizer': self.optimizer.state_dict(),
'items': self.items,
'users': self.users,
'num_items': self.num_items,
'num_users': self.num_users
}
if type(self.loss) is str:
current_state['loss'] = self.loss
current_state['loss_params'] = self.loss_params
torch.save(current_state, checkpoint_file)
return checkpoint_file
def __init_training(self, train_dataset, lr,
weight_decay):
if self.items is None:
self.items = train_dataset.items
else:
self.items = np.unique(np.append(self.items, train_dataset.items))
if self.users is None:
self.users = train_dataset.users
else:
self.users = np.unique(np.append(self.users, train_dataset.users))
if self.item_based and self.num_items is None:
self.num_items = int(np.max(self.items)) + 1
elif self.item_based:
assert self.num_items >= int(np.max(self.items)) + 1,\
'The largest item id should be smaller than number of items.' \
'If your model is not based on items, set item_based to False in Recoder constructor.'
if self.user_based and self.num_users is None:
self.num_users = int(np.max(self.users)) + 1
elif self.user_based:
assert self.num_users >= int(np.max(self.users)) + 1,\
'The largest user id should be smaller than number of users.' \
'If your model is not based on users, set user_based to False in Recoder constructor.'
self.__init_model()
self.__init_optimizer(lr=lr, weight_decay=weight_decay)
self.__init_loss_module()
[docs] def train(self, train_dataset, val_dataset=None,
lr=0.001, weight_decay=0, num_epochs=1,
iters_per_epoch=None, batch_size=64, lr_milestones=None,
negative_sampling=False, num_sampling_users=0, num_data_workers=0,
model_checkpoint_prefix=None, checkpoint_freq=0,
eval_freq=0, eval_num_recommendations=None,
eval_num_users=None, metrics=None, eval_batch_size=None):
"""
Trains the model
Args:
train_dataset (RecommendationDataset): train dataset.
val_dataset (RecommendationDataset, optional): validation dataset.
lr (float, optional): learning rate.
weight_decay (float, optional): weight decay (L2 normalization).
num_epochs (int, optional): number of epochs to train the model.
iters_per_epoch (int, optional): number of training iterations per training epoch. If None,
one epoch is full number of training samples in the dataset
batch_size (int, optional): batch size
lr_milestones (list, optional): optimizer learning rate epochs milestones (0.1 decay).
negative_sampling (bool, optional): whether to apply mini-batch based negative sampling or not.
num_sampling_users (int, optional): number of users to consider for sampling items.
This is useful for increasing the number of negative samples in mini-batch based negative
sampling while keeping the batch-size small. If 0, then num_sampling_users will
be equal to batch_size.
num_data_workers (int, optional): number of data workers to use for building the mini-batches.
checkpoint_freq (int, optional): epochs frequency of saving a checkpoint of the model
model_checkpoint_prefix (str, optional): model checkpoint save path prefix
eval_freq (int, optional): epochs frequency of doing an evaluation
eval_num_recommendations (int, optional): num of recommendations to generate on evaluation
eval_num_users (int, optional): number of users from the validation dataset to use for evaluation.
If None, all users in the validation dataset are used for evaluation.
metrics (list[Metric], optional): list of ``Metric`` used to evaluate the model
eval_batch_size (int, optional): the size of the evaluation batch
"""
log.info('{} Mode'.format('CPU' if self.device.type == 'cpu' else 'GPU'))
model_params = self.model.model_params()
for param in model_params:
log.info('Model {}: {}'.format(param, model_params[param]))
log.info('Initial Learning Rate: {}'.format(lr))
log.info('Weight decay: {}'.format(weight_decay))
log.info('Batch Size: {}'.format(batch_size))
log.info('Optimizer: {}'.format(self.optimizer_type))
log.info('LR milestones: {}'.format(lr_milestones))
log.info('Loss Function: {}'.format(self.loss))
for param in self.loss_params:
log.info('Loss {}: {}'.format(param, self.loss_params[param]))
if num_sampling_users == 0:
num_sampling_users = batch_size
if eval_batch_size is None:
eval_batch_size = batch_size
assert num_sampling_users >= batch_size and num_sampling_users % batch_size == 0, \
"number of sampling users should be a multiple of the batch size"
self.__init_training(train_dataset=train_dataset, lr=lr, weight_decay=weight_decay)
train_dataloader = RecommendationDataLoader(train_dataset, batch_size=batch_size,
negative_sampling=negative_sampling,
num_sampling_users=num_sampling_users,
num_workers=num_data_workers)
if val_dataset is not None:
val_dataloader = RecommendationDataLoader(val_dataset, batch_size=batch_size,
negative_sampling=negative_sampling,
num_sampling_users=num_sampling_users,
num_workers=num_data_workers)
else:
val_dataloader = None
if lr_milestones is not None:
_last_epoch = -1 if self.current_epoch == 1 else (self.current_epoch - 2)
lr_scheduler = MultiStepLR(self.optimizer, milestones=lr_milestones,
gamma=0.1, last_epoch=_last_epoch)
else:
lr_scheduler = None
self._train(train_dataloader=train_dataloader,
val_dataloader=val_dataloader,
num_epochs=num_epochs,
current_epoch=self.current_epoch,
lr_scheduler=lr_scheduler,
batch_size=batch_size,
model_checkpoint_prefix=model_checkpoint_prefix,
checkpoint_freq=checkpoint_freq,
eval_freq=eval_freq,
metrics=metrics,
eval_num_recommendations=eval_num_recommendations,
iters_per_epoch=iters_per_epoch,
eval_num_users=eval_num_users,
eval_batch_size=eval_batch_size)
def _train(self, train_dataloader, val_dataloader,
num_epochs, current_epoch, lr_scheduler,
batch_size, model_checkpoint_prefix, checkpoint_freq,
eval_freq, metrics, eval_num_recommendations, iters_per_epoch,
eval_num_users, eval_batch_size):
num_batches = len(train_dataloader)
iters_processed = 0
if iters_per_epoch is None:
iters_per_epoch = num_batches
for epoch in range(current_epoch, num_epochs + 1):
self.current_epoch = epoch
self.model.train()
aggregated_losses = []
if lr_scheduler is not None:
lr_scheduler.step()
lr = lr_scheduler.get_lr()[0]
else:
lr = self.optimizer.defaults['lr']
description = 'Epoch {}/{} (lr={})'.format(epoch, num_epochs, lr)
if iters_processed == 0 or iters_processed == num_batches:
# If we are starting from scratch,
# or we iterated through the whole dataloader,
# reset and reinitialize the iterator
iters_processed = 0
iterator = enumerate(train_dataloader, 1)
iters_to_process = min(iters_per_epoch, num_batches - iters_processed)
iters_processed += iters_to_process
progress_bar = tqdm(range(iters_to_process), desc=description)
for batch_itr, (input, target) in iterator:
if self.optimizer is not None:
self.optimizer.zero_grad()
if self.sparse_optimizer is not None:
self.sparse_optimizer.zero_grad()
if target is None:
target_items = input.items
else:
target_items = target.items
loss = self.__compute_loss(input, target)
loss.backward()
if self.optimizer is not None:
self.optimizer.step()
if self.sparse_optimizer is not None:
self.sparse_optimizer.step()
aggregated_losses.append(loss.item())
# Number of items in the batch
if target_items is not None:
num_items = target_items.size(0)
else:
num_items = len(self.items)
progress_bar.set_postfix(loss=np.mean(aggregated_losses[-1]),
num_items=num_items,
refresh=False)
progress_bar.update()
if batch_itr % iters_per_epoch == 0:
break
postfix = {'loss': loss.item()}
if eval_freq > 0 and epoch % eval_freq == 0 and val_dataloader is not None:
val_loss = self._validate(val_dataloader)
postfix['val_loss'] = val_loss
if metrics is not None and eval_num_recommendations is not None:
results = self._evaluate(val_dataloader.dataset,
num_recommendations=eval_num_recommendations,
metrics=metrics, batch_size=eval_batch_size,
num_users=eval_num_users)
for metric in results:
postfix[str(metric)] = np.mean(results[metric])
progress_bar.set_postfix(postfix)
progress_bar.close()
if model_checkpoint_prefix and \
((checkpoint_freq > 0 and epoch % checkpoint_freq == 0) or epoch == num_epochs):
self.save_state(model_checkpoint_prefix)
def _validate(self, val_dataloader):
self.model.eval()
total_loss = 0.0
num_batches = 1
for itr, (input, target) in enumerate(val_dataloader):
loss = self.__compute_loss(input, target)
total_loss += loss.item()
num_batches = itr + 1
avg_loss = total_loss / num_batches
return avg_loss
def __compute_loss(self, input, target):
input_items = input.items
input_users = input.users
input_dense = torch.sparse.FloatTensor(input.indices, input.values, input.size) \
.to(device=self.device).to_dense()
if input_items is not None:
input_items = input_items.to(device=self.device)
if input_users is not None:
input_users = input_users.to(device=self.device)
if target is not None:
target_items = target.items
target_users = target.users
target_dense = torch.sparse.FloatTensor(target.indices, target.values, target.size) \
.to(device=self.device).to_dense()
if target_items is not None:
target_items = target_items.to(device=self.device)
if target_users is not None:
target_users = target_users.to(device=self.device)
else:
target_dense = input_dense
target_items = input_items
target_users = input_users
output = self.model(input_dense, input_items=input_items,
input_users=input_users, target_items=target_items,
target_users=target_users)
# Average loss over samples in a batch
normalization = torch.FloatTensor([target_dense.size(0)]).to(device=self.device)
loss = self.loss_module(output, target_dense) / normalization
return loss
[docs] def predict(self, users_interactions, return_input=False):
"""
Predicts the user interactions with all items
Args:
users_interactions (UsersInteractions): A batch of users' history consisting of list of ``Interaction``
return_input (bool, optional): whether to return the dense input batch
Returns:
if ``return_input`` is ``True`` a tuple of the predictions and the input batch
is returned, otherwise only the predictions are returned
"""
if self.model is None:
raise Exception('Model not initialized.')
self.model.eval()
batch_collator = BatchCollator(batch_size=len(users_interactions.users), negative_sampling=False)
input = batch_collator.collate(users_interactions)
batch = input[0]
input_dense = torch.sparse.FloatTensor(batch.indices, batch.values, batch.size) \
.to(device=self.device).to_dense()
output = self.model(input_dense, input_users=batch.users.to(device=self.device))
return output, input_dense if return_input else output
def _evaluate(self, eval_dataset, num_recommendations, metrics, batch_size=1, num_users=None):
if self.model is None:
raise Exception('Model not initialized')
self.model.eval()
recommender = InferenceRecommender(self, num_recommendations)
evaluator = RecommenderEvaluator(recommender, metrics)
results = evaluator.evaluate(eval_dataset, batch_size=batch_size, num_users=num_users)
return results
[docs] def recommend(self, users_interactions, num_recommendations):
"""
Generate list of recommendations for each user in ``users_hist``.
Args:
users_interactions (UsersInteractions): list of users interactions.
num_recommendations (int): number of recommendations to generate.
Returns:
list: list of recommended items for each user in users_interactions.
"""
output, input = self.predict(users_interactions, return_input=True)
# Set input items output to -inf so that they don't get recommended
output[input > 0] = - float('inf')
top_output, top_ind = torch.topk(output, num_recommendations, dim=1, sorted=True)
recommendations = top_ind.tolist()
return recommendations
[docs] def evaluate(self, eval_dataset, num_recommendations, metrics, batch_size=1, num_users=None):
"""
Evaluates the current model given an evaluation dataset.
Args:
eval_dataset (RecommendationDataset): evaluation dataset
num_recommendations (int): number of top recommendations to consider.
metrics (list): list of ``Metric`` to use for evaluation.
batch_size (int, optional): batch size of computations.
"""
results = self._evaluate(eval_dataset, num_recommendations, metrics,
batch_size=batch_size, num_users=num_users)
for metric in results:
log.info('{}: {}'.format(metric, np.mean(results[metric])))