A Deeper Explanation Of Recurrent Neural Networks

Part I

Tarek Benzina
11 min readNov 18, 2023

Motivation:

I have been trying to implement a recurrent neural network in Python but I encountered many complications:

  1. Knowledge is scattered across many articles, papers and code bases
  2. The mathematical notations are different and confusing
  3. The equations do not provide a straightforward way to implement RNNs from scratch, especially the backpropagation part

Contents:

Through this article:

  1. We will try to follow the running of a Vanilla RNN
  2. Explain backpropagation in RNN by calculate the gradients
  3. Implement an RNN from scratch

Problem:

Given a text: The cat plays
We would like to predict the next character given all the previous characters in the sentence:

  • Given input=T predict output=h
  • Given input=Th predict output=e

We could in the end represent this as follows:

Example of a Simple Input Output Diagram

Recurrent Neural Networks:

Toy Example:

Recurrent neural networks in their simplest form are models that input a sequence of given length and output a sequence of the same length. These are called Vanilla Recurrent Neural Networks

Let’s consider the case of a very simple text:

text="<BOS>aaba<EOS>"

In this case, we have a simple sequence (sentence). To train our RNN model to predict the next character, we can use our text to form an input and a target (output)

Input="<BOS>aaba"
Target="aaba<EOS>"

Our input and target sequence length is 5 in this case.

Each one of the characters can be represented a one hot encoded vector:

  • Our text has the unique characters: a,b,<BOS>,<EOS>
  • The size of this vector would be then 4

Which means that both the inputs and targets can be represented as follows:

The output of our trained model would be a list of probability vectors:

  • Each of these vectors has the same size of our vocabulary size which is 4
  • The probability vector shows the likelihood of each character as a potential output.
  • For instance, if the output vector is y_hat=[0.9,0.05,0.025,0.025] Then our model is telling us that the most likely vector would be the character a

Mathematical Formulation:

Let X and Y be the input and target sequences:

Where:

  • X is the input matrix and Y is the output or target matrix
  • x_t is the input character encoding at timestep t and y_t is the output character encoding at timestep t
  • T is the sequence length (in our toy example it was 5)
  • R is the the vocabulary size (in our case it was 4)
  • h_t: is the hidden state at timestep t. This entity is intended to transport information from one timestep to another
  • z_t: represents how we combine the previous hidden state and the input character at timestep t
  • tanh: is the activation function, in theory, we have the possibility not to use it. But tanh can help account for non linearity patterns
  • o_t: is the output vector
  • sigmoid: will transform the output vector o_t to a set of probabilities outputted as y_hat_t
Simple RNN cell Flowchart

Dimensions:

Forward Pass:

Example of Forward Propagation in a RNN with sequence length of 3 for inputs and outputs

In a forward pass:

  • We iteratively treat each character (timestep), in the same order of the sequence
  • h_0 is the initial hidden state of the network and can be initialized with zeros
  • All the weights are the same and shared across the different cells of the network
  • Each cell passes its hidden state to the next cell
  • At each timestep, the input x_t and the hidden state h_t are used to predict y_t

Implementation:

Note: We will be using tensorflow but only for tensor and matrix operations, the same can be done by numpy or any other similar library

Let’s start by building a simple RNN cell class:

import tensorflow as tf

class SimpleRNNCell(object):

def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t

def forward(self, x, h_prev, W_xh, W_hh, W_hy, b_h, b_y):
self.h_prev = h_prev
z_curr = W_hh@self.h_prev + W_xh@x + b_h
self.h_curr = tf.math.tanh(z_curr)
o_curr = W_hy@self.h_curr + b_y
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr
  • The set of weights will be passed on as arguments as they are not properties of the RNN cell but properties of the network that connects these cells.
  • The class will only keep track h_curr, y_hat and h_prev

To test this class:

rnn_units = 5
vocabulary_size = 6

W_hh=tf.random.normal(shape=(rnn_units,rnn_units),dtype=tf.float32)
W_xh=tf.random.normal(shape=(rnn_units,vocabulary_size),dtype=tf.float32)
W_hy=tf.random.normal(shape=(vocabulary_size,rnn_units),dtype=tf.float32)
b_h=tf.random.normal(shape=(rnn_units,1),dtype=tf.float32)
b_y=tf.random.normal(shape=(vocabulary_size,1),dtype=tf.float32)

x=tf.constant([0,1,0,0,0,0],shape=(vocabulary_size,1),dtype=tf.float32)
h_prev=tf.zeros(shape=(rnn_units,1),dtype=tf.float32)
rnn_cell=SimpleRNNCell()
y_hat, h_curr=rnn_cell.forward(
x=x,
h_prev=h_prev,
W_xh=W_xh,
W_hh=W_hh,
W_hy=W_hy,
b_h=b_h,
b_y=b_y)

##[output]> y_hat is a tensor of shape (6,1)
##[output]> h_curr is a tensor of shape (5,1)

Let’s now try to implement a Recurrent Neural Network for a whole input and output sequences:

class RNN(object):

def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):

self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units
## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))
## Initializing Weights
self.W_hh=tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32)
self.W_xh=tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32)
self.W_hy=tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32)
self.b_h=tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32)
self.b_y=tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}

def forward(self,x,h=None):
y_hat=[]
if h is None:
h=self.h_init

for i in range(0,self.sequence_length):

x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))

y_hat_t,h=self.cells[i].forward(
x_t,
h_prev=h,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1),h

As you can see this class is collection of sequence_length of the previous class.

To test this class:

sequence_length=3
rnn_units = 5
vocabulary_size = 6

x=tf.constant(
[
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0],
[0,0,0],
[0,0,1]
],
dtype=tf.float32)
## x is a sequence of character one hot encoded through a vocabulary size of 6
rnn=RNN(
input_dim=vocabulary_size,
output_dim=vocabulary_size,
sequence_length=sequence_length,
rnn_units=rnn_units
)
y_hat,h = rnn.forward(x=x)

##[output]> y_hat is a tensor of shape (6,3) which is the same shape of x
## it shows in each column the set of probabilities of producing one of the characters of the vocabulary
## h: hidden state of the last RNN cell

Backward Pass:

To train our RNN, we will need to update the weights to provide predictions as close as possible to the reality of the outputs. To do this, we need to:

  • Have a loss function that assesses the quality of our predictions
  • Compute the gradient of the loss with respect to the different weights
  • Use the gradients to update the weights in the correct direction

Loss Function:

In terms of choice of loss function at each timestep, we will be using the Cross Entropy Loss defined as follows:

And the total loss of network is:

Where:

  • R is the vocabulary size
  • T is the sequence length

Computing The Gradients:

To update the weights we have to figure out the following gradients:

Please refer to the Appendix at the end of this article on how we computed the gradients

After calculating the gradients, we have the following equations:

Now let’s follow step by step we calculate the different gradients. Since the last relationship is a recurrent relation between t and t+1, we need to start in reverse. Also, since all gradients are sums we can write:

  • At timestep t=T:
  • At time step t=T-1
  • At time step t=T-2
  • And so on until we reach the first time step. In the end the gradient of each weight would be the sum of the partial gradients computed at each timestep.
  • Once the gradients are computed, we can update the weights
d_W_hh = zeros(R,R)
d_W_xh = zeros(R,T)
d_b_h = zeros(R,1)
d_W_hy = zeros(T,R)
d_b_y = zeros(T,1)
# Initial d_t:
d_t = zeros(R,1)
For t from T to 1
d_L_h_t = transpose(W_hy).(y_hat[t]-y[t]) + d_t
d_W_hh = diag(1-h[t]^2).d_L_h_t.transpose(h[t-1])+d_W_hh
d_W_xh = diag(1-h[t]^2).d_L_h_t.transpose(x[t])+d_W_xh
d_b_h = diag(1-h[t]^2).d_L_h_t+d_b_h
d_W_hy = (y_hat[t]-y[t]).transpose(h[t])
d_b_y = y_hat[t]-y[t]

### d_t to be passed to the next step
d_t = diag(1-h[t]^2).W_hh.d_L_h_t
W_hh = W_hh - learning_rate * d_W_hh
W_xh = W_xh - learning_rate * d_W_xh
W_hy = W_hy - learning_rate * d_W_hy
b_h = b_h - learning_rate * d_b_h
b_y = b_y - learning_rate * d_b_y

We can now implement the above algorithm:

  • We will keep the previous code but we will replace the weights with a dictionary of weights
  • We will update the two classes SimpleRnnCell and RNN
  • We will define a method to compute the loss which is the cross entropy loss
import tensorflow as tf
class SimpleRNNCell(object):

def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t

def forward(self, x, h_prev, weights):
self.h_prev = h_prev
z_curr = weights["W_hh"]@self.h_prev + weights["W_xh"]@x + weights["b_h"]
self.h_curr = tf.math.tanh(z_curr)
o_curr = weights["W_hy"]@self.h_curr + weights["b_y"]
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr

def backward(self,y,x,next_gradients,weights):
d_L_by_h_curr=tf.transpose(weights["W_hy"])@(self.y_hat-y)+next_gradients["dh_next"]
d_L_by_W_hh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(self.h_prev)+next_gradients["d_L_by_W_hh"]
d_L_by_W_xh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(x)+next_gradients["d_L_by_W_xh"]
d_L_by_b_h=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr+next_gradients["d_L_by_b_h"]
d_L_by_W_hy=(self.y_hat-y)@tf.transpose(self.h_curr)+next_gradients["d_L_by_W_hy"]
d_L_by_b_y=self.y_hat-y+next_gradients["d_L_by_b_y"]
dh_next=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@weights["W_hh"]@d_L_by_h_curr
return {
"d_L_by_W_hh":d_L_by_W_hh,
"d_L_by_W_xh":d_L_by_W_xh,
"d_L_by_b_h":d_L_by_b_h,
"d_L_by_W_hy":d_L_by_W_hy,
"d_L_by_b_y":d_L_by_b_y,
"dh_next":dh_next
}
class RNN(object):

def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):

self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units

## Initializing Weights
self.weights={
"W_hh":tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32),
"W_xh":tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32),
"W_hy":tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32),
"b_h":tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32),
"b_y":tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
}

## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))

## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}

def forward(self,x,h=None):
y_hat=[]
if h is None:
h=self.h_init


for i in range(0,self.sequence_length):

x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))

y_hat_t,h=self.cells[i].forward(
x_t,
h_prev=h,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1),h

def loss(self,y,y_hat):
return tf.reduce_sum(-tf.reduce_mean(y*tf.math.log(y_hat),axis=0)).numpy()

def backward(self,x,y):
## Initializing gradients
partial_gradients = {
"d_L_by_W_hh":tf.zeros_like(self.weights["W_hh"]),
"d_L_by_W_xh":tf.zeros_like(self.weights["W_xh"]),
"d_L_by_b_h":tf.zeros_like(self.weights["b_h"]),
"d_L_by_W_hy":tf.zeros_like(self.weights["W_hy"]),
"d_L_by_b_y":tf.zeros_like(self.weights["b_y"]),
"dh_next":tf.zeros_like(self.h_init)
}

for i in reversed(range(0,self.sequence_length)):
y_curr=tf.reshape(y[:,i],shape=(self.output_dim,1))
x_curr=tf.reshape(x[:,i],shape=(self.input_dim,1))
partial_gradients=self.cells[i].backward(x=x_curr,y=y_curr,next_gradients=partial_gradients,weights=self.weights)
return partial_gradients

def update_weights(self,partial_gradients,learning_rate=0.01):
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*partial_gradients["d_L_by_W_hh"]
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*partial_gradients["d_L_by_W_xh"]
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*partial_gradients["d_L_by_W_hy"]
self.weights["b_h"]=self.weights["b_h"]-learning_rate*partial_gradients["d_L_by_b_h"]
self.weights["b_y"]=self.weights["b_y"]-learning_rate*partial_gradients["d_L_by_b_y"]

We can test the code to preview the results:

x=tf.constant(
[
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0],
[0,0,0],
[0,0,1]
],
dtype=tf.float32)
y=tf.constant(
[
[0,0,1],
[0,0,0],
[0,0,0],
[1,0,0],
[0,0,0],
[0,1,0]
],
dtype=tf.float32)
sequence_length=3
rnn_units = 5
vocabulary_size = 6
rnn=RNN(
input_dim=vocabulary_size,
output_dim=vocabulary_size,
sequence_length=sequence_length,
rnn_units=rnn_units
)
## Compute the first prediciton with the initial weights
y_hat,h=rnn.forward(x=x)
## Compute the loss
rnn.loss(y=y,y_hat=y_hat)
## Do a backward pass to get the weight gradients
gradients=rnn.backward(x,y)
## Update the weights following the gradients
rnn.update_weights(gradients,learning_rate=0.01)
## Get the new weights
rnn.weights

Training RNN:

The procedure to train a recurrent neural network follows:

For epoch in epochs:
losses=[]
For train_input_sequence,train_target_sequence in train_data:
y_hat,h=rnn_forward_pass(train_input_sequence,h_initial)
sequence_loss=compute_loss(y_hat,train_target_sequence)
gradients=rnn_backward_pass(train_input_sequence,train_target_sequence)
rnn_weights=update_rnn_weights(gradients,learning_rate)
losses.append(sequence_loss)
epoch_loss=average(losses)

In our case:

  • We will be using tensorflow dataset as it offers many interesting methods, like shuffling and batching
  • We will be using a mini batch gradient descent in each epoch. Meaning, we will, in each epoch, take only a subset of the input sequences to update the weights, then move on to the next batch in the next epoch
    def train(self,train_dataset,epochs=20,batch_size=50,learning_rate=0.01):
batch_train_dataset=train_dataset.batch(batch_size)
losses=[]
batch_iterator=iter(batch_train_dataset)
for epoch in range(epochs):
print(f"Epoch: {epoch}")
train_X,train_Y=next(batch_iterator)
loss=0
for i in range(len(train_X)):
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
print(f"Sequence: {i} Loss: {seq_loss}")
loss+=seq_loss/len(train_X) #we divide to get an average in the end
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate)

losses.append(loss.numpy())
## Return the loss in each epoch
return pd.DataFrame(zip(range(epochs),losses),columns=["epoch","loss"])

One can also address exploding gradients in RNN by using gradient norm clipping (setting a maximum norm for the gradients):

    def update_weights(self,gradients,learning_rate=0.01,clip_gradients_norm=5):
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hh"],clip_norm=clip_gradients_norm)
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_xh"],clip_norm=clip_gradients_norm)
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hy"],clip_norm=clip_gradients_norm)
self.weights["b_h"]=self.weights["b_h"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_h"],clip_norm=clip_gradients_norm)
self.weights["b_y"]=self.weights["b_y"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_y"],clip_norm=clip_gradients_norm)

def train(self,train_dataset,epochs=20,batch_size=50,learning_rate=0.01,clip_gradients_norm=1):
batch_train_dataset=train_dataset.batch(batch_size)
losses=[]
batch_iterator=iter(batch_train_dataset)
for epoch in range(epochs):
print(f"Epoch: {epoch}")
train_X,train_Y=next(batch_iterator)
loss=0
for i in range(len(train_X)):
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
print(f"Sequence: {i} Loss: {seq_loss}")
loss+=seq_loss/len(train_X)
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate,clip_gradients_norm=clip_gradients_norm)

losses.append(loss.numpy())
return pd.DataFrame(zip(range(epochs),losses),columns=["epoch","loss"])

We can also, add a way to evaluate the model on a validation dataset:

  • The validation dataset needs to go unbatched and internally it will used as a single batch to evaluate all the sequences
  • We need to implement an evaluate method to report the loss on the validation data

We can as well, use the library tqdm to show a progress bar to avoid having too many logs. It is also cooler.
Putting it all together:

import pandas as pd
from tqdm import tqdm
import tensorflow as tf

class SimpleRNNCell(object):

def __init__(self):
self.h_curr = None ##h at timestep t
self.h_prev = None ##h at timestep t-1
self.y_hat = None ##y_hat: prediction at timestep t

def forward(self, x, h_prev, weights):
self.h_prev = h_prev
z_curr = weights["W_hh"]@self.h_prev + weights["W_xh"]@x + weights["b_h"]
self.h_curr = tf.math.tanh(z_curr)
o_curr = weights["W_hy"]@self.h_curr + weights["b_y"]
self.y_hat = tf.math.softmax(o_curr,axis=0)
return self.y_hat, self.h_curr

def backward(self,y,x,next_gradients,weights):
d_L_by_h_curr=tf.transpose(weights["W_hy"])@(self.y_hat-y)+next_gradients["dh_next"]
d_L_by_W_hh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(self.h_prev)+next_gradients["d_L_by_W_hh"]
d_L_by_W_xh=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr@tf.transpose(x)+next_gradients["d_L_by_W_xh"]
d_L_by_b_h=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@d_L_by_h_curr+next_gradients["d_L_by_b_h"]
d_L_by_W_hy=(self.y_hat-y)@tf.transpose(self.h_curr)+next_gradients["d_L_by_W_hy"]
d_L_by_b_y=self.y_hat-y+next_gradients["d_L_by_b_y"]
dh_next=tf.linalg.diag(1-tf.reshape(self.h_curr**2,shape=-1))@weights["W_hh"]@d_L_by_h_curr
return {
"d_L_by_W_hh":d_L_by_W_hh,
"d_L_by_W_xh":d_L_by_W_xh,
"d_L_by_b_h":d_L_by_b_h,
"d_L_by_W_hy":d_L_by_W_hy,
"d_L_by_b_y":d_L_by_b_y,
"dh_next":dh_next
}

class RNN(object):

def __init__(self,input_dim,output_dim,sequence_length,rnn_units=1):

self.input_dim=input_dim
self.output_dim=output_dim
self.sequence_length=sequence_length
self.rnn_units=rnn_units

## Initializing Weights
self.weights={
"W_hh":tf.random.normal(shape=(self.rnn_units,self.rnn_units),dtype=tf.float32),
"W_xh":tf.random.normal(shape=(self.rnn_units,self.input_dim),dtype=tf.float32),
"W_hy":tf.random.normal(shape=(self.output_dim,self.rnn_units),dtype=tf.float32),
"b_h":tf.random.normal(shape=(self.rnn_units,1),dtype=tf.float32),
"b_y":tf.random.normal(shape=(self.output_dim,1),dtype=tf.float32)
}

## Initial hidden state
self.h_init=tf.zeros(shape=(self.rnn_units,1))

## Creating as many rnn cells as the sequence length
self.cells={i:SimpleRNNCell() for i in range(0,sequence_length)}

def forward(self,x):
y_hat=[]
h_prev=self.h_init

for i in range(0,self.sequence_length):

x_t=tf.reshape(x[:,i],shape=(self.input_dim,1))

y_hat_t,h_prev=self.cells[i].forward(
x_t,
h_prev=h_prev,
weights=self.weights)
y_hat.append(y_hat_t)
return tf.concat(y_hat,axis=1)

def loss(self,y,y_hat):
return tf.reduce_sum(-tf.reduce_mean(y*tf.math.log(y_hat),axis=0)).numpy()

def backward(self,x,y):
## Initializing gradients
partial_gradients = {
"d_L_by_W_hh":tf.zeros_like(self.weights["W_hh"]),
"d_L_by_W_xh":tf.zeros_like(self.weights["W_xh"]),
"d_L_by_b_h":tf.zeros_like(self.weights["b_h"]),
"d_L_by_W_hy":tf.zeros_like(self.weights["W_hy"]),
"d_L_by_b_y":tf.zeros_like(self.weights["b_y"]),
"dh_next":tf.zeros_like(self.h_init)
}

for i in reversed(range(0,self.sequence_length)):
y_curr=tf.reshape(y[:,i],shape=(self.output_dim,1))
x_curr=tf.reshape(x[:,i],shape=(self.input_dim,1))
partial_gradients=self.cells[i].backward(x=x_curr,y=y_curr,next_gradients=partial_gradients,weights=self.weights)
return partial_gradients

def update_weights(self,gradients,learning_rate=0.01,clip_gradients_norm=5):
## Updating the weights following the gradients
self.weights["W_hh"]=self.weights["W_hh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hh"],clip_norm=clip_gradients_norm)
self.weights["W_xh"]=self.weights["W_xh"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_xh"],clip_norm=clip_gradients_norm)
self.weights["W_hy"]=self.weights["W_hy"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_W_hy"],clip_norm=clip_gradients_norm)
self.weights["b_h"]=self.weights["b_h"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_h"],clip_norm=clip_gradients_norm)
self.weights["b_y"]=self.weights["b_y"]-learning_rate*tf.clip_by_norm(gradients["d_L_by_b_y"],clip_norm=clip_gradients_norm)

def train(self,train_dataset,validation_dataset=None,epochs=20,batch_size=50,learning_rate=0.01,clip_gradients_norm=1):
batch_train_dataset=train_dataset.batch(batch_size)
batch_validation_dataset=validation_dataset.batch(len(validation_dataset))
train_losses=[]
val_losses=[]
batch_iterator=iter(batch_train_dataset)

for epoch in range(epochs):
print(f"----------------------------- Epoch: {epoch} ----------------------------- ")
train_X,train_Y=next(batch_iterator)
train_loss=0
progress=tqdm(range(len(train_X)), desc=f'Training| Epoch:{epoch}', unit_scale=True, unit='Epoch')
for i in progress:
h=self.h_init
train_x=train_X[i]
train_y=train_Y[i]
train_y_hat,h=self.forward(train_x,h=h)
seq_loss=self.loss(y=train_y,y_hat=train_y_hat)
train_loss+=seq_loss/len(train_X)
progress.set_postfix({"Sequence: {i} Loss":train_loss})
gradients=self.backward(train_x,train_y)
self.update_weights(gradients=gradients,learning_rate=learning_rate,clip_gradients_norm=clip_gradients_norm)

train_losses.append(train_loss)
val_loss=self.evaluate(batch_validation_dataset)
val_losses.append(val_loss)
return pd.DataFrame(zip(range(epochs),train_losses,val_losses),columns=["epoch","train_loss","validation_loss"])

def evaluate(self,single_batch_dataset):
X,Y=next(iter(single_batch_dataset))
loss=0
progress=tqdm(range(len(X)), desc=f'Validating', unit_scale=True, unit='Epoch')
h=self.h_init
for i in progress:
y_hat,h=self.forward(X[i],h=h)
seq_loss=self.loss(y=Y[i],y_hat=y_hat)

loss+=seq_loss/len(X)
progress.set_postfix({"Validation Loss":loss})
return loss

Conclusion:

In the end:

  • We have in this part implemented a first working class to build and train a recurrent neural network
  • Few things were not discussed here, like the problem of vanishing and exploding gradient in RNNs
  • In the next part, we will try to use this class for text generation

I hope this article helps anyone in need of understanding RNNs under the hood. Please let me know if you have any questions!

References:

Appendix:

Starting with W_hy:

This goes back to evaluating the gradient of L_t by o_t and the gradient of o_t by W_hy:

Where:

Now the remaining part, is to compute the gradient of o_t by W_hy:

--

--

No responses yet