Source code for recoder.nn

import torch
from torch import nn
import torch.nn.functional as F


def activation(x, act):
  if act == 'none': return x
  func = getattr(torch, act)
  return func(x)


[docs]class FactorizationModel(nn.Module): """ Base class for factorization models. All subclasses should implement the following methods. """
[docs] def init_model(self, num_items=None, num_users=None): """ Initializes the model with the number of users and items to be represented. Args: num_users (int): number of users to be represented in the model num_items (int): number of items to be represented in the model """ raise NotImplementedError
[docs] def model_params(self): """ Returns the model parameters. Mainly used when storing the model hyper-parameters (i.e hidden layers, activation..etc) in a snapshot file by :class:`recoder.model.Recoder`. Returns: dict: Model parameters. """ raise NotImplementedError
[docs] def load_model_params(self, model_params): """ Loads the ``model_params`` into the model. Mainly used when loading the model hyper-parameters (i.e hidden layers, activation..etc) from a snapshot file of the model stored by :class:`recoder.model.Recoder`. Args: model_params (dict): model parameters """ raise NotImplementedError
[docs] def forward(self, input, input_users=None, input_items=None, target_users=None, target_items=None): """ Applies a forward pass of the input on the latent factor model. Args: input (torch.FloatTensor): the input dense matrix of user item interactions. input_users (torch.LongTensor): the users represented in the input batch, where each user corresponds to a row in ``input`` based on their index. input_items (torch.LongTensor): the items represented in the input batch, where each items corresponds to a column in ``input`` based on their index. target_users (torch.LongTensor): the target users to predict. Typically, this is not used, but kept for consistency. target_items (torch.LongTensor): the target items to predict. """ raise NotImplementedError
[docs]class DynamicAutoencoder(FactorizationModel): """ An Autoencoder module that processes variable size vectors. This is particularly efficient for cases where we only want to reconstruct sub-samples of a large sparse vector and not the whole vector, i.e negative sampling. Let `F` be a `DynamicAutoencoder` function that reconstructs vectors of size `d`, let `X` be a matrix of size `Bxd` where `B` is the batch size, and let `Z` be a sub-matrix of `X` and `I` be a vector of any length, such that `1 <= I[i] <= d` and `Z = X[:, I]`. The reconstruction of `Z` is `F(Z, I)`. See `Examples`. Args: hidden_layers (list): autoencoder hidden layers sizes. only the encoder layers. activation_type (str, optional): activation function to use for hidden layers. all activations in torch.nn.functional are supported is_constrained (bool, optional): constraining model by using the encoder weights in the decoder (tying the weights). dropout_prob (float, optional): dropout probability at the bottleneck layer noise_prob (float, optional): dropout (noise) probability at the input layer sparse (bool, optional): if True, gradients w.r.t. to the embedding layers weight matrices will be sparse tensors. Currently, sparse gradients are only fully supported by ``torch.optim.SparseAdam``. Examples:: >>>> autoencoder = DynamicAutoencoder([500,100]) >>>> batch_size = 32 >>>> input = torch.rand(batch_size, 5) >>>> input_items = torch.LongTensor([10, 126, 452, 29, 34]) >>>> output = autoencoder(input, input_items=input_items, target_items=input_items) >>>> output 0.0850 0.9490 ... 0.2430 0.5323 0.3519 0.4816 ... 0.9483 0.2497 ... ⋱ ... 0.8744 0.8194 ... 0.5755 0.2090 0.5006 0.9532 ... 0.8333 0.4330 [torch.FloatTensor of size 32x5] >>>> >>>> # predicting a different target of items >>>> target_items = torch.LongTensor([31, 14, 95, 49, 10, 36, 239]) >>>> output = autoencoder(input, input_items=input_items, target_items=target_items) >>>> output 0.5446 0.5468 ... 0.9854 0.6465 0.0564 0.1238 ... 0.5645 0.6576 ... ⋱ ... 0.0498 0.6978 ... 0.8462 0.2135 0.6540 0.5686 ... 0.6540 0.4330 [torch.FloatTensor of size 32x7] >>>> >>>> # reconstructing the whole vector >>>> input = torch.rand(batch_size, 500) >>>> output = autoencoder(input) >>>> output 0.0865 0.9054 ... 0.8987 0.0456 0.9852 0.6540 ... 0.1205 0.8488 ... ⋱ ... 0.4650 0.3540 ... 0.5646 0.5605 0.6940 0.2140 ... 0.9820 0.5405 [torch.FloatTensor of size 32x500] """ def __init__(self, hidden_layers=None, activation_type='tanh', is_constrained=False, dropout_prob=0.0, noise_prob=0.0, sparse=False): super().__init__() self.activation_type = activation_type self.is_constrained = is_constrained self.hidden_layers = hidden_layers self.dropout_prob = dropout_prob self.noise_prob = noise_prob self.sparse = sparse self.num_items = None self.num_embeddings = None self.noise_layer = None self.dropout_layer = None
[docs] def init_model(self, num_items=None, num_users=None): self.num_items = num_items self.num_embeddings = num_items self.__create_encoding_layers() self.__create_decoding_layers() self.noise_layer = None if self.noise_prob > 0.0: self.noise_layer = nn.Dropout(p=self.noise_prob) self.dropout_layer = None if self.dropout_prob > 0.0: self.dropout_layer = nn.Dropout(p=self.dropout_prob) if self.is_constrained: self.__tie_weights()
[docs] def model_params(self): return { 'hidden_layers': self.hidden_layers, 'activation_type': self.activation_type, 'is_constrained': self.is_constrained, 'dropout_prob': self.dropout_prob, 'noise_prob': self.noise_prob }
[docs] def load_model_params(self, model_params): self.hidden_layers = model_params['hidden_layers'] self.activation_type = model_params['activation_type'] self.is_constrained = model_params['is_constrained'] self.dropout_prob = model_params['dropout_prob'] self.noise_prob = model_params['noise_prob']
def __create_encoding_layers(self): self.en_embedding_layer = nn.Embedding(self.num_embeddings, self.hidden_layers[0], sparse=self.sparse) self.__en_linear_embedding_layer = LinearEmbedding(self.en_embedding_layer, input_based=True) self.encoding_layers = nn.Sequential(*self.__create_coding_layers(self.hidden_layers)) nn.init.xavier_uniform_(self.en_embedding_layer.weight) nn.init.constant_(self.__en_linear_embedding_layer.bias, 0) def __create_decoding_layers(self): _decoding_layers = self.__create_coding_layers(list(reversed(self.hidden_layers))) if self.is_constrained: for ind, decoding_layer in enumerate(_decoding_layers): # Deleting layer weight to unregister it as a parameter # Only register decoding layers biases as parameters del decoding_layer.weight # Reset the decoding layers weights as encoding layers weights tranpose # These won't be registered as model parameters for el, dl in zip(self.encoding_layers, reversed(_decoding_layers)): dl.weight = el.weight.t() self.de_embedding_layer = self.en_embedding_layer else: self.de_embedding_layer = nn.Embedding(self.num_embeddings, self.hidden_layers[0], sparse=self.sparse) self.decoding_layers = nn.Sequential(*_decoding_layers) self.__de_linear_embedding_layer = LinearEmbedding(self.de_embedding_layer, input_based=False) nn.init.xavier_uniform_(self.de_embedding_layer.weight) nn.init.constant_(self.__de_linear_embedding_layer.bias, 0) def __create_coding_layers(self, layer_sizes): layers = [] for ind, layer_size in enumerate(layer_sizes[1:], 1): layer = nn.Linear(layer_sizes[ind-1], layer_size) layers.append(layer) torch.nn.init.xavier_uniform_(layer.weight) torch.nn.init.constant_(layer.bias, 0) return layers def __tie_weights(self): for el, dl in zip(self.encoding_layers, reversed(self.decoding_layers)): dl.weight = el.weight.t()
[docs] def forward(self, input, input_users=None, input_items=None, target_users=None, target_items=None): if self.is_constrained: self.__tie_weights() # Normalize the input z = F.normalize(input, p=2, dim=1) if self.noise_prob > 0.0: z = self.noise_layer(z) z = self.__en_linear_embedding_layer(input_items, z) z = activation(z, self.activation_type) for encoding_layer in self.encoding_layers: z = activation(encoding_layer(z), self.activation_type) if self.dropout_prob > 0.0: z = self.dropout_layer(z) for decoding_layer in self.decoding_layers: z = activation(decoding_layer(z), self.activation_type) z = self.__de_linear_embedding_layer(target_items, z) return z
class LinearEmbedding(nn.Module): def __init__(self, embedding_layer: nn.Embedding, input_based=True, bias=True): super().__init__() self.embedding_layer = embedding_layer self.input_based = input_based self.in_features = embedding_layer.num_embeddings if input_based else embedding_layer.embedding_dim self.out_features = embedding_layer.embedding_dim if input_based else embedding_layer.num_embeddings if bias: self.bias = nn.Parameter(torch.Tensor(self.out_features)) else: self.bias = None def forward(self, x, y): if x is not None: _weight = self.embedding_layer(x) _bias = self.bias if self.input_based else self.bias.index_select(0, x) else: _weight = self.embedding_layer.weight _bias = self.bias if self.input_based: return F.linear(y, _weight.t(), _bias) else: return F.linear(y, _weight, _bias)
[docs]class MatrixFactorization(FactorizationModel): """ Defines a Matrix Factorization model for collaborative filtering. This is particularly efficient for cases where we only want to reconstruct sub-samples of a large sparse vector and not the whole vector, i.e negative sampling. Args: embedding_size (int): embedding size (rank) of the latent factors of users and items activation_type (str, optional): activation function to be applied on the user embedding. all activations in torch.nn.functional are supported. dropout_prob (float, optional): dropout probability to be applied on the user embedding sparse (bool, optional): if True, gradients w.r.t. to the embedding layers weight matrices will be sparse tensors. Currently, sparse gradients are only fully supported by ``torch.optim.SparseAdam``. """ def __init__(self, embedding_size, activation_type='none', dropout_prob=0, sparse=False): super().__init__() self.embedding_size = embedding_size self.activation_type = activation_type self.dropout_prob = dropout_prob self.num_users = None self.num_items = None self.user_embedding_layer = None self.item_embedding_layer = None self.bias = None self.dropout_layer = None self.sparse = sparse
[docs] def init_model(self, num_items=None, num_users=None): self.num_users = num_users self.num_items = num_items self.user_embedding_layer = nn.Embedding(self.num_users, self.embedding_size, sparse=self.sparse) self.item_embedding_layer = nn.Embedding(self.num_items, self.embedding_size, sparse=self.sparse) self.bias = nn.Parameter(torch.Tensor(self.num_items)) self.dropout_layer = None if self.dropout_prob > 0.0: self.dropout_layer = nn.Dropout(p=self.dropout_prob) nn.init.xavier_uniform_(self.user_embedding_layer.weight) nn.init.xavier_uniform_(self.item_embedding_layer.weight) nn.init.constant_(self.bias, 0)
[docs] def model_params(self): return { 'embedding_size': self.embedding_size, 'activation_type': self.activation_type, 'dropout_prob': self.dropout_prob, }
[docs] def load_model_params(self, model_params): self.embedding_size = model_params['embedding_size'] self.activation_type = model_params['activation_type'] self.dropout_prob = model_params['dropout_prob']
[docs] def forward(self, input, input_users=None, input_items=None, target_users=None, target_items=None): users_embeddings = self.user_embedding_layer(input_users) users_embeddings = activation(users_embeddings, self.activation_type) if self.dropout_prob > 0: users_embeddings = self.dropout_layer(users_embeddings) if target_items is None: items_embeddings = self.item_embedding_layer.weight bias = self.bias else: items_embeddings = self.item_embedding_layer(target_items) bias = self.bias.index_select(0, target_items) output = F.linear(users_embeddings, items_embeddings, bias) return output