A Deeper Explanation Of Recurrent Neural Networks
Part I
Motivation:
I have been trying to implement a recurrent neural network in Python but I encountered many complications:
- Knowledge is scattered across many articles, papers and code bases
- The mathematical notations are different and confusing
- The equations do not provide a straightforward way to implement RNNs from scratch, especially the backpropagation part
Contents:
Through this article:
- We will try to follow the running of a Vanilla RNN
- Explain backpropagation in RNN by calculate the gradients
- Implement an RNN from scratch
Problem:
Given a text: The cat plays
We would like to predict the next character given all the previous characters in the sentence:
- Given input=T predict output=h
- Given input=Th predict output=e
- …
We could in the end represent this as follows:
Recurrent Neural Networks:
Toy Example:
Recurrent neural networks in their simplest form are models that input a sequence of given length and output a sequence of the same length. These are called Vanilla Recurrent Neural Networks
Let’s consider the case of a very simple text:
text="<BOS>aaba<EOS>"
In this case, we have a simple sequence (sentence). To train our RNN model to predict the next character, we can use our text to form an input and a target (output)
Input="<BOS>aaba"
Target="aaba<EOS>"
Our input and target sequence length is 5 in this case.
Each one of the characters can be represented a one hot encoded vector:
- Our text has the unique characters:
a,b,<BOS>,<EOS>
- The size of this vector would be then 4
Which means that both the inputs and targets can be represented as follows:
The output of our trained model would be a list of probability vectors:
- Each of these vectors has the same size of our vocabulary size which is 4
- The probability vector shows the likelihood of each character as a potential output.
- For instance, if the output vector is
y_hat=[0.9,0.05,0.025,0.025]
Then our model is telling us that the most likely vector would be the charactera
Mathematical Formulation:
Let X and Y be the input and target sequences:
Where:
- X is the input matrix and Y is the output or target matrix
- x_t is the input character encoding at timestep t and y_t is the output character encoding at timestep t
- T is the sequence length (in our toy example it was 5)
- R is the the vocabulary size (in our case it was 4)
- h_t: is the hidden state at timestep t. This entity is intended to transport information from one timestep to another
- z_t: represents how we combine the previous hidden state and the input character at timestep t
- tanh: is the activation function, in theory, we have the possibility not to use it. But tanh can help account for non linearity patterns
- o_t: is the output vector
- sigmoid: will transform the output vector o_t to a set of probabilities outputted as y_hat_t
Dimensions:
Forward Pass:
In a forward pass:
- We iteratively treat each character (timestep), in the same order of the sequence
- h_0 is the initial hidden state of the network and can be initialized with zeros
- All the weights are the same and shared across the different cells of the network
- Each cell passes its hidden state to the next cell
- At each timestep, the input x_t and the hidden state h_t are used to predict y_t
Implementation:
Note: We will be using tensorflow but only for tensor and matrix operations, the same can be done by numpy or any other similar library
Let’s start by building a simple RNN cell class:
import tensorflow as tf
class SimpleRNNCell(object):
def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t
def forward(self, x, h_prev, W_xh, W_hh, W_hy, b_h, b_y):
self.h_prev = h_prev
z_curr = W_hh@self.h_prev + W_xh@x + b_h
self.h_curr = tf.math.tanh(z_curr)
o_curr = W_hy@self.h_curr + b_y
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr
- The set of weights will be passed on as arguments as they are not properties of the RNN cell but properties of the network that connects these cells.
- The class will only keep track h_curr, y_hat and h_prev
To test this class:
rnn_units = 5
vocabulary_size = 6
W_hh=tf.random.normal(shape=(rnn_units,rnn_units),dtype=tf.float32)
W_xh=tf.random.normal(shape=(rnn_units,vocabulary_size),dtype=tf.float32)
W_hy=tf.random.normal(shape=(vocabulary_size,rnn_units),dtype=tf.float32)
b_h=tf.random.normal(shape=(rnn_units,1),dtype=tf.float32)
b_y=tf.random.normal(shape=(vocabulary_size,1),dtype=tf.float32)
x=tf.constant([0,1,0,0,0,0],shape=(vocabulary_size,1),dtype=tf.float32)
h_prev=tf.zeros(shape=(rnn_units,1),dtype=tf.float32)
rnn_cell=SimpleRNNCell()
y_hat, h_curr=rnn_cell.forward(
x=x,
h_prev=h_prev,
W_xh=W_xh,
W_hh=W_hh,
W_hy=W_hy,
b_h=b_h,
b_y=b_y)
##[output]> y_hat is a tensor of shape (6,1)
##[output]> h_curr is a tensor of shape (5,1)
Let’s now try to implement a Recurrent Neural Network for a whole input and output sequences:
class RNN(object):
def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):
self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units
## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))
## Initializing Weights
self.W_hh=tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32)
self.W_xh=tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32)
self.W_hy=tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32)
self.b_h=tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32)
self.b_y=tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}
def forward(self,x,h=None):
y_hat=[]
if h is None:
h=self.h_init
for i in range(0,self.sequence_length):
x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))
y_hat_t,h=self.cells[i].forward(
x_t,
h_prev=h,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1),h
As you can see this class is collection of sequence_length
of the previous class.
To test this class:
sequence_length=3
rnn_units = 5
vocabulary_size = 6
x=tf.constant(
[
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0],
[0,0,0],
[0,0,1]
],
dtype=tf.float32)
## x is a sequence of character one hot encoded through a vocabulary size of 6
rnn=RNN(
input_dim=vocabulary_size,
output_dim=vocabulary_size,
sequence_length=sequence_length,
rnn_units=rnn_units
)
y_hat,h = rnn.forward(x=x)
##[output]> y_hat is a tensor of shape (6,3) which is the same shape of x
## it shows in each column the set of probabilities of producing one of the characters of the vocabulary
## h: hidden state of the last RNN cell
Backward Pass:
To train our RNN, we will need to update the weights to provide predictions as close as possible to the reality of the outputs. To do this, we need to:
- Have a loss function that assesses the quality of our predictions
- Compute the gradient of the loss with respect to the different weights
- Use the gradients to update the weights in the correct direction
Loss Function:
In terms of choice of loss function at each timestep, we will be using the Cross Entropy Loss defined as follows:
And the total loss of network is:
Where:
- R is the vocabulary size
- T is the sequence length
Computing The Gradients:
To update the weights we have to figure out the following gradients:
Please refer to the Appendix at the end of this article on how we computed the gradients
After calculating the gradients, we have the following equations:
Now let’s follow step by step we calculate the different gradients. Since the last relationship is a recurrent relation between t and t+1, we need to start in reverse. Also, since all gradients are sums we can write:
- At timestep t=T:
- At time step t=T-1
- At time step t=T-2
- And so on until we reach the first time step. In the end the gradient of each weight would be the sum of the partial gradients computed at each timestep.
- Once the gradients are computed, we can update the weights
d_W_hh = zeros(R,R)
d_W_xh = zeros(R,T)
d_b_h = zeros(R,1)
d_W_hy = zeros(T,R)
d_b_y = zeros(T,1)
# Initial d_t:
d_t = zeros(R,1)
For t from T to 1
d_L_h_t = transpose(W_hy).(y_hat[t]-y[t]) + d_t
d_W_hh = diag(1-h[t]^2).d_L_h_t.transpose(h[t-1])+d_W_hh
d_W_xh = diag(1-h[t]^2).d_L_h_t.transpose(x[t])+d_W_xh
d_b_h = diag(1-h[t]^2).d_L_h_t+d_b_h
d_W_hy = (y_hat[t]-y[t]).transpose(h[t])
d_b_y = y_hat[t]-y[t]
### d_t to be passed to the next step
d_t = diag(1-h[t]^2).W_hh.d_L_h_t
W_hh = W_hh - learning_rate * d_W_hh
W_xh = W_xh - learning_rate * d_W_xh
W_hy = W_hy - learning_rate * d_W_hy
b_h = b_h - learning_rate * d_b_h
b_y = b_y - learning_rate * d_b_y
We can now implement the above algorithm:
- We will keep the previous code but we will replace the weights with a dictionary of weights
- We will update the two classes
SimpleRnnCell
andRNN
- We will define a method to compute the
loss
which is the cross entropy loss
import tensorflow as tf
class SimpleRNNCell(object):
def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t
def forward(self, x, h_prev, weights):
self.h_prev = h_prev
z_curr = weights["W_hh"]@self.h_prev + weights["W_xh"]@x + weights["b_h"]
self.h_curr = tf.math.tanh(z_curr)
o_curr = weights["W_hy"]@self.h_curr + weights["b_y"]
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr
def backward(self,y,x,next_gradients,weights):
d_L_by_h_curr=tf.transpose(weights["W_hy"])@(self.y_hat-y)+next_gradients["dh_next"]
d_L_by_W_hh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(self.h_prev)+next_gradients["d_L_by_W_hh"]
d_L_by_W_xh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(x)+next_gradients["d_L_by_W_xh"]
d_L_by_b_h=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr+next_gradients["d_L_by_b_h"]
d_L_by_W_hy=(self.y_hat-y)@tf.transpose(self.h_curr)+next_gradients["d_L_by_W_hy"]
d_L_by_b_y=self.y_hat-y+next_gradients["d_L_by_b_y"]
dh_next=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@weights["W_hh"]@d_L_by_h_curr
return {
"d_L_by_W_hh":d_L_by_W_hh,
"d_L_by_W_xh":d_L_by_W_xh,
"d_L_by_b_h":d_L_by_b_h,
"d_L_by_W_hy":d_L_by_W_hy,
"d_L_by_b_y":d_L_by_b_y,
"dh_next":dh_next
}
class RNN(object):
def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):
self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units
## Initializing Weights
self.weights={
"W_hh":tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32),
"W_xh":tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32),
"W_hy":tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32),
"b_h":tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32),
"b_y":tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
}
## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))
## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}
def forward(self,x,h=None):
y_hat=[]
if h is None:
h=self.h_init
for i in range(0,self.sequence_length):
x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))
y_hat_t,h=self.cells[i].forward(
x_t,
h_prev=h,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1),h
def loss(self,y,y_hat):
return tf.reduce_sum(-tf.reduce_mean(y*tf.math.log(y_hat),axis=0)).numpy()
def backward(self,x,y):
## Initializing gradients
partial_gradients = {
"d_L_by_W_hh":tf.zeros_like(self.weights["W_hh"]),
"d_L_by_W_xh":tf.zeros_like(self.weights["W_xh"]),
"d_L_by_b_h":tf.zeros_like(self.weights["b_h"]),
"d_L_by_W_hy":tf.zeros_like(self.weights["W_hy"]),
"d_L_by_b_y":tf.zeros_like(self.weights["b_y"]),
"dh_next":tf.zeros_like(self.h_init)
}
for i in reversed(range(0,self.sequence_length)):
y_curr=tf.reshape(y[:,i],shape=(self.output_dim,1))
x_curr=tf.reshape(x[:,i],shape=(self.input_dim,1))
partial_gradients=self.cells[i].backward(x=x_curr,y=y_curr,next_gradients=partial_gradients,weights=self.weights)
return partial_gradients
def update_weights(self,partial_gradients,learning_rate=0.01):
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*partial_gradients["d_L_by_W_hh"]
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*partial_gradients["d_L_by_W_xh"]
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*partial_gradients["d_L_by_W_hy"]
self.weights["b_h"]=self.weights["b_h"]-learning_rate*partial_gradients["d_L_by_b_h"]
self.weights["b_y"]=self.weights["b_y"]-learning_rate*partial_gradients["d_L_by_b_y"]
We can test the code to preview the results:
x=tf.constant(
[
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0],
[0,0,0],
[0,0,1]
],
dtype=tf.float32)
y=tf.constant(
[
[0,0,1],
[0,0,0],
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0]
],
dtype=tf.float32)
sequence_length=3
rnn_units = 5
vocabulary_size = 6
rnn=RNN(
input_dim=vocabulary_size,
output_dim=vocabulary_size,
sequence_length=sequence_length,
rnn_units=rnn_units
)
## Compute the first prediciton with the initial weights
y_hat,h=rnn.forward(x=x)
## Compute the loss
rnn.loss(y=y,y_hat=y_hat)
## Do a backward pass to get the weight gradients
gradients=rnn.backward(x,y)
## Update the weights following the gradients
rnn.update_weights(gradients,learning_rate=0.01)
## Get the new weights
rnn.weights
Training RNN:
The procedure to train a recurrent neural network follows:
For epoch in epochs:
losses=[]
For train_input_sequence,train_target_sequence in train_data:
y_hat,h=rnn_forward_pass(train_input_sequence,h_initial)
sequence_loss=compute_loss(y_hat,train_target_sequence)
gradients=rnn_backward_pass(train_input_sequence,train_target_sequence)
rnn_weights=update_rnn_weights(gradients,learning_rate)
losses.append(sequence_loss)
epoch_loss=average(losses)
In our case:
- We will be using tensorflow dataset as it offers many interesting methods, like shuffling and batching
- We will be using a mini batch gradient descent in each epoch. Meaning, we will, in each epoch, take only a subset of the input sequences to update the weights, then move on to the next batch in the next epoch
def train(self,train_dataset,epochs=20,batch_size=50,learning_rate=0.01):
batch_train_dataset=train_dataset.batch(batch_size)
losses=[]
batch_iterator=iter(batch_train_dataset)
for epoch in range(epochs):
print(f"Epoch: {epoch}")
train_X,train_Y=next(batch_iterator)
loss=0
for i in range(len(train_X)):
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
print(f"Sequence: {i} Loss: {seq_loss}")
loss+=seq_loss/len(train_X) #we divide to get an average in the end
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate)
losses.append(loss.numpy())
## Return the loss in each epoch
return pd.DataFrame(zip(range(epochs),losses),columns=["epoch","loss"])
One can also address exploding gradients in RNN by using gradient norm clipping (setting a maximum norm for the gradients):
def update_weights(self,gradients,learning_rate=0.01,clip_gradients_norm=5):
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hh"],clip_norm=clip_gradients_norm)
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_xh"],clip_norm=clip_gradients_norm)
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hy"],clip_norm=clip_gradients_norm)
self.weights["b_h"]=self.weights["b_h"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_h"],clip_norm=clip_gradients_norm)
self.weights["b_y"]=self.weights["b_y"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_y"],clip_norm=clip_gradients_norm)
def train(self,train_dataset,epochs=20,batch_size=50,learning_rate=0.01,clip_gradients_norm=1):
batch_train_dataset=train_dataset.batch(batch_size)
losses=[]
batch_iterator=iter(batch_train_dataset)
for epoch in range(epochs):
print(f"Epoch: {epoch}")
train_X,train_Y=next(batch_iterator)
loss=0
for i in range(len(train_X)):
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
print(f"Sequence: {i} Loss: {seq_loss}")
loss+=seq_loss/len(train_X)
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate,clip_gradients_norm=clip_gradients_norm)
losses.append(loss.numpy())
return pd.DataFrame(zip(range(epochs),losses),columns=["epoch","loss"])
We can also, add a way to evaluate the model on a validation dataset:
- The validation dataset needs to go unbatched and internally it will used as a single batch to evaluate all the sequences
- We need to implement an evaluate method to report the loss on the validation data
We can as well, use the library tqdm
to show a progress bar to avoid having too many logs. It is also cooler.
Putting it all together:
import pandas as pd
from tqdm import tqdm
import tensorflow as tf
class SimpleRNNCell(object):
def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t
def forward(self, x, h_prev, weights):
self.h_prev = h_prev
z_curr = weights["W_hh"]@self.h_prev + weights["W_xh"]@x + weights["b_h"]
self.h_curr = tf.math.tanh(z_curr)
o_curr = weights["W_hy"]@self.h_curr + weights["b_y"]
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr
def backward(self,y,x,next_gradients,weights):
d_L_by_h_curr=tf.transpose(weights["W_hy"])@(self.y_hat-y)+next_gradients["dh_next"]
d_L_by_W_hh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(self.h_prev)+next_gradients["d_L_by_W_hh"]
d_L_by_W_xh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(x)+next_gradients["d_L_by_W_xh"]
d_L_by_b_h=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr+next_gradients["d_L_by_b_h"]
d_L_by_W_hy=(self.y_hat-y)@tf.transpose(self.h_curr)+next_gradients["d_L_by_W_hy"]
d_L_by_b_y=self.y_hat-y+next_gradients["d_L_by_b_y"]
dh_next=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@weights["W_hh"]@d_L_by_h_curr
return {
"d_L_by_W_hh":d_L_by_W_hh,
"d_L_by_W_xh":d_L_by_W_xh,
"d_L_by_b_h":d_L_by_b_h,
"d_L_by_W_hy":d_L_by_W_hy,
"d_L_by_b_y":d_L_by_b_y,
"dh_next":dh_next
}
class RNN(object):
def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):
self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units
## Initializing Weights
self.weights={
"W_hh":tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32),
"W_xh":tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32),
"W_hy":tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32),
"b_h":tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32),
"b_y":tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
}
## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))
## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}
def forward(self,x):
y_hat=[]
h_prev=self.h_init
for i in range(0,self.sequence_length):
x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))
y_hat_t,h_prev=self.cells[i].forward(
x_t,
h_prev=h_prev,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1)
def loss(self,y,y_hat):
return tf.reduce_sum(-tf.reduce_mean(y*tf.math.log(y_hat),axis=0)).numpy()
def backward(self,x,y):
## Initializing gradients
partial_gradients = {
"d_L_by_W_hh":tf.zeros_like(self.weights["W_hh"]),
"d_L_by_W_xh":tf.zeros_like(self.weights["W_xh"]),
"d_L_by_b_h":tf.zeros_like(self.weights["b_h"]),
"d_L_by_W_hy":tf.zeros_like(self.weights["W_hy"]),
"d_L_by_b_y":tf.zeros_like(self.weights["b_y"]),
"dh_next":tf.zeros_like(self.h_init)
}
for i in reversed(range(0,self.sequence_length)):
y_curr=tf.reshape(y[:,i],shape=(self.output_dim,1))
x_curr=tf.reshape(x[:,i],shape=(self.input_dim,1))
partial_gradients=self.cells[i].backward(x=x_curr,y=y_curr,next_gradients=partial_gradients,weights=self.weights)
return partial_gradients
def update_weights(self,gradients,learning_rate=0.01,clip_gradients_norm=5):
## Updating the weights following the gradients
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hh"],clip_norm=clip_gradients_norm)
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_xh"],clip_norm=clip_gradients_norm)
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hy"],clip_norm=clip_gradients_norm)
self.weights["b_h"]=self.weights["b_h"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_h"],clip_norm=clip_gradients_norm)
self.weights["b_y"]=self.weights["b_y"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_y"],clip_norm=clip_gradients_norm)
def train(self,train_dataset,validation_dataset=None,epochs=20,batch_size=50,learning_rate=0.01,clip_gradients_norm=1):
batch_train_dataset=train_dataset.batch(batch_size)
batch_validation_dataset=validation_dataset.batch(len(validation_dataset))
train_losses=[]
val_losses=[]
batch_iterator=iter(batch_train_dataset)
for epoch in range(epochs):
print(f"----------------------------- Epoch: {epoch} ----------------------------- ")
train_X,train_Y=next(batch_iterator)
train_loss=0
progress=tqdm(range(len(train_X)), desc=f'Training| Epoch:{epoch}', unit_scale=True, unit='Epoch')
for i in progress:
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h=h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
train_loss+=seq_loss/len(train_X)
progress.set_postfix({"Sequence: {i} Loss":train_loss})
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate,clip_gradients_norm=clip_gradients_norm)
train_losses.append(train_loss)
val_loss=self.evaluate(batch_validation_dataset)
val_losses.append(val_loss)
return pd.DataFrame(zip(range(epochs),train_losses,val_losses),columns=["epoch","train_loss","validation_loss"])
def evaluate(self,single_batch_dataset):
X,Y=next(iter(single_batch_dataset))
loss=0
progress=tqdm(range(len(X)), desc=f'Validating', unit_scale=True, unit='Epoch')
h=self.h_init
for i in progress:
y_hat,h=self.forward(X[i],h=h)
seq_loss=self.loss(y=Y[i],y_hat=y_hat)
loss+=seq_loss/len(X)
progress.set_postfix({"Validation Loss":loss})
return loss
Conclusion:
In the end:
- We have in this part implemented a first working class to build and train a recurrent neural network
- Few things were not discussed here, like the problem of vanishing and exploding gradient in RNNs
- In the next part, we will try to use this class for text generation
I hope this article helps anyone in need of understanding RNNs under the hood. Please let me know if you have any questions!
References:
- https://github.com/CaptainE/RNN-LSTM-in-numpy/blob/master/RNN_LSTM_from_scratch.ipynb
- https://mmuratarat.github.io/2019-02-07/bptt-of-rnn
Appendix:
Starting with W_hy:
This goes back to evaluating the gradient of L_t by o_t and the gradient of o_t by W_hy:
Where:
Now the remaining part, is to compute the gradient of o_t by W_hy: