nntrain (1/n): Datasets and Dataloaders

dataloading
training
collation
sampler
Author

Lucas van Walstijn

Published

August 14, 2023

In this series, I want to discuss the creation of a small PyTorch based library for training neural networks: nntrain. It’s based off the excellent part 2 of Practical Deep Learning for Coders by Jeremy Howard, in which from lessons 13 to 18 (roughly) the development of the miniai library is discussed.

We’ll try to build everything as much as possible from scratch to understand how things work. Once the main functionality of components is implemented and verified, we can switch over to PyTorch’s version. This is similar to how things are done in the course. However, this is not just a “copy / paste” of the course: on many occasions I take a different route, and most of the code is my own. That is not to say that all of this is meant to be extremely innovative, instead I had the following goals:

nb_dev is another great project from the fastai community, which allows python libraries to be written in jupyter notebooks. This may sound a bit weird and controversial, but it has the advantage that we can create the source code for our library in the very same environment in which we want to experiment and interact with our methods, objects and structure while we are building the library. For more details on why this is a good idea and other nice features of nb_dev, see here.

So without further ado, let’s start with where we left off in the previous post:

End of last post

from datasets import load_dataset,load_dataset_builder
import torchvision.transforms.functional as TF
import torch
import torch.nn as nn
import torch.nn.functional as F

name = "fashion_mnist"
ds_builder = load_dataset_builder(name)
ds_hf = load_dataset(name, split='train')

x_train = torch.stack([TF.to_tensor(i).view(-1) for i in ds_hf['image']])
y_train = torch.stack([torch.tensor(i) for i in ds_hf['label']])

def fit(epochs):
    for epoch in range(epochs):
        for i in range(0,len(x_train), bs):
            xb = x_train[i:i+bs]
            yb = y_train[i:i+bs]

            preds = model(xb)
            acc = accuracy(preds, yb)
            loss = loss_func(preds, yb)
            loss.backward()

            opt.step()
            opt.zero_grad()
        print(f'{epoch=} | {loss=:.3f} | {acc=:.3f}')

def accuracy(preds, targs):
    return (preds.argmax(dim=1) == targs).float().mean()        

def get_model_opt():
    layers = [nn.Linear(n_in, n_h), nn.ReLU(), nn.Linear(n_h, n_out)]
    model = nn.Sequential(*layers)
    
    opt = torch.optim.SGD(model.parameters(), lr)
    
    return model, opt

n_in  = 28*28
n_h   = 50
n_out = 10
lr    = 0.01
bs    = 1024
loss_func = F.cross_entropy

model, opt = get_model_opt()
fit(5)

Datasets

This post will be about improving the minibatch construct we currently have in the training loop on lines 16-18:

...
for i in range(0,len(x_train), bs):
    xb = x_train[i:i+bs]
    yb = y_train[i:i+bs]
...

As a first refactor, we will create a Dataset object, which allows us to simplify:

...
for i in range(0,len(x_train), bs):
    xb, yb = dataset[i:i+bs]
...

This is pretty straight-forward, a Dataset is something that holds our data and upon “indexing into” it returns a sample of the data:

class Dataset():
    
    def __init__(self, x_train, y_train):
        self.x_train = x_train
        self.y_train = y_train
        
    def __getitem__(self, i):
        return self.x_train[i], self.y_train[i]
    
    def __len__(self):
        return len(self.x_train)
ds = Dataset(x_train, y_train)
print([i.shape for i in ds[0]])
[torch.Size([784]), torch.Size([])]

Next, we want to further improve the training loop and get to this behavior:

...
for xb, yb in dataloader:
...

So our dataloader needs to wrap the dataset, and provide some kind of an iterator returning batches of data, based on the specified batch size. Let’s create one:

class DataLoader():
    
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        
    def __iter__(self):
        for i in range(len(self.dataset), self.batch_size):
            yield self.dataset[i:i+self.batch_size]

Now the training loop is simplified to:

def fit(epochs):
    for epoch in range(epochs):
        for xb, yb in dl:
            preds = model(xb)
            acc = accuracy(preds, yb)
            loss = loss_func(preds, yb)
            loss.backward()

            opt.step()
            opt.zero_grad()
        print(f'{epoch=} | {loss=:.3f} | {acc=:.3f}')
dl = DataLoader(ds, bs)

model, opt = get_model_opt()
fit(5)
epoch=0 | loss=2.062 | acc=0.441
epoch=1 | loss=1.785 | acc=0.597
epoch=2 | loss=1.531 | acc=0.637
epoch=3 | loss=1.334 | acc=0.645
epoch=4 | loss=1.190 | acc=0.660

Next up: shuffling the data

The next change will improve the training of the model. So far, we cycle each epoch through the data in the exact same order. This means that all training samples are always batched together with the exact same other samples. This is not good for training our model, instead we want to shuffle the data each epoch. So that each epoch, we have batches of data that have not yet been batched together. This additional variation helps the model to generalize as we will see.

The simplest implementation would be to create a list of indices, which we put in between the dataset and the sampling of the mini-batches. This list will function as a map. In case we don’t need to shuffle, this list will simply be [0, 1, ... len(dataset)].

import random

class DataLoader():
    
    def __init__(self, dataset, batch_size, shuffle):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        
    def __iter__(self):
        self.indices = list(range(len(self.dataset)))
        if self.shuffle: 
            random.shuffle(self.indices)
            
        for i in range(0,len(self.dataset),self.batch_size):
            yield self.dataset[self.indices[i:i+self.batch_size]]
model, opt = get_model_opt()
dl = DataLoader(ds, bs, shuffle=True)
fit(5)
epoch=0 | loss=2.067 | acc=0.429
epoch=1 | loss=1.800 | acc=0.515
epoch=2 | loss=1.539 | acc=0.592
epoch=3 | loss=1.358 | acc=0.618
epoch=4 | loss=1.187 | acc=0.692

This works just fine, but let’s see if we can encapsulate this logic in a separate class. We start with a simple Sampler class that we can iterate through and either gives indices in order, or shuffled:

class Sampler():
    def __init__(self, ds, shuffle=False):
        self.range = list(range(0, len(ds)))
        self.shuffle = shuffle
        
    def __iter__(self):
        if self.shuffle: random.shuffle(self.range)
        for i in self.range:
            yield i
s = Sampler(ds, False)           # shuffle = False
for i, sample in enumerate(s): 
    print(sample, end=', ')
    if i == 5: break
0, 1, 2, 3, 4, 5, 
s = Sampler(ds, True)            # shuffle = TRUE
for i, sample in enumerate(s): 
    print(sample, end=', ')
    if i == 5: break
58844, 19394, 36509, 38262, 51037, 46835, 

Next, let’s create a BatchSampler that does the same, but returns the indexes in batches. For that we can use the islice() function from the itertools module:

from itertools import islice

def printlist(this): print(list(this))

lst = list(range(0, 10))         # create a list of 10 numbers

printlist(islice(lst, 0, 3))     # with islice we can get a slice out of the list
printlist(islice(lst, 5, 10))
[0, 1, 2]
[5, 6, 7, 8, 9]
printlist(islice(lst, 4))        # we can also get the "next" 4 elements
printlist(islice(lst, 4))        # doing that twice gives the same first 4 elements
[0, 1, 2, 3]
[0, 1, 2, 3]
lst = iter(lst)                  # however if we put an iterator on the list:

printlist(islice(lst, 4))        # first 4 elements
printlist(islice(lst, 4))        # second 4 elements
printlist(islice(lst, 4))        # remaining 2 elements
printlist(islice(lst, 4))        # iterator has finished..
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]
[]

And thus we create our BatchSampler:

class BatchSampler():
    def __init__(self, sampler, batch_size):
        self.sampler = sampler
        self.batch_size = batch_size
        
    def __iter__(self):
        it = iter(self.sampler)
        while True:
            res = list(islice(it, self.batch_size))
            if len(res) == 0:    # return when the iterator has finished          
                return           
            yield res

Let’s see the BatchSamepler in action:

s = Sampler(list(range(0,10)), shuffle=False)
batchs = BatchSampler(s, 4)
for i in batchs:
    printlist(i)
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]

And let’s incorporate it into the DataLoader:

class DataLoader():
    
    def __init__(self, dataset, batch_sampler):
        self.dataset = dataset
        self.batch_sampler = batch_sampler
        
    def __iter__(self):
        for batch in self.batch_sampler:
            yield self.dataset[batch]
s = Sampler(ds, shuffle=True)
dl = DataLoader(ds, BatchSampler(s, bs))

model, opt = get_model_opt()
fit(5)
epoch=0 | loss=1.981 | acc=0.462
epoch=1 | loss=1.698 | acc=0.567
epoch=2 | loss=1.468 | acc=0.620
epoch=3 | loss=1.346 | acc=0.613
epoch=4 | loss=1.202 | acc=0.656

Collation

With the Sampler, the Dataloader and the Dataset we have made some good progress. However, there is one other thing we need to fix. In the very beginning of this post we did:

x_train = torch.stack([TF.to_tensor(i).view(-1) for i in ds_hf['image']])
y_train = torch.stack([torch.tensor(i) for i in ds_hf['label']])

And we ideally would like these transformations to be part of the Dataloaders / Dataset paradigm. So instead of first transforming the Huggingface Dataset into x_train and y_train, we want to directly use the dataset. We can do so by adding a collate function. This wraps around a list of individual samples into the datasets, and receives a list of individual x,y tuples ([(x1,y1), (x2,y2), ..]) as argument. In that function, we can determine how to treat these items and parse it in a way that is suitable to our needs. i.e.:

  • batch the x and y, so that we transform from [(x1,y1), (x2,y2), ..] to [(x1,x2, ..), (y1,y2, ..)]
  • move individual items x_i and y_i to tensors
  • stack the x tensors and y tensors respectively into one big tensor

So let’s update our DataLoader with a collate_func that wraps around individual samples:

class DataLoader():
    
    def __init__(self, dataset, batch_sampler, collate_func):
        self.dataset = dataset
        self.batch_sampler = batch_sampler
        self.collate_func = collate_func
        
    def __iter__(self):
        for batch in self.batch_sampler:
            yield self.collate_func(self.dataset[sample] for sample in batch)

And now let’s create a custom collate function to deal with our data. Specifically, remember that a sample of our huggingface dataset is a dictionary (and not a tuple) with keys image and label holding a PIL.Image.image object and a number (representing any out of 10 classes) respectively.

So our collate_func should:

  1. transform the dictionary into a tuple
  2. move everything to a tensor
  3. zip the results so that x and y are batched
  4. and combine the list of tensors for x and y respectively into one big tensor
def collate_func(data):
    data = [(TF.to_tensor(sample['image']).view(-1), torch.tensor(sample['label'])) for sample in data]
    x, y = zip(*data)
    return torch.stack(x), torch.stack(y)

And let’s see it in action, now using the huggingface dataset ds_hf:

s = Sampler(ds_hf, shuffle=True)
dl = DataLoader(ds_hf, BatchSampler(s, bs), collate_func)

model, opt = get_model_opt()
fit(5)
epoch=0 | loss=2.125 | acc=0.345
epoch=1 | loss=1.899 | acc=0.497
epoch=2 | loss=1.635 | acc=0.609
epoch=3 | loss=1.389 | acc=0.640
epoch=4 | loss=1.260 | acc=0.641

Not bad, we have replicated the main logic of PyTorch’s DataLoader. The version from PyTorch has a slightly different API as we don’t have to specify the BatchSampler, instead we can just pass shuffle=True:

from torch.utils.data import DataLoader

s = Sampler(ds_hf, shuffle=True)
dl = DataLoader(ds_hf, batch_size=bs, shuffle=True, collate_fn=collate_func)

model, opt = get_model_opt()
fit(5)
epoch=0 | loss=2.107 | acc=0.434
epoch=1 | loss=1.840 | acc=0.620
epoch=2 | loss=1.605 | acc=0.641
epoch=3 | loss=1.354 | acc=0.641
epoch=4 | loss=1.258 | acc=0.618

Validation set

Let’s add a validation set to make sure we validate on data we are not training on. For that we are going to pull the data from the datasets library without the splits argument, which will give us a dataset dictionary containing both a training and a test dataset:

hf_dd = load_dataset(name)
hf_dd
Reusing dataset fashion_mnist (/root/.cache/huggingface/datasets/fashion_mnist/fashion_mnist/1.0.0/8d6c32399aa01613d96e2cbc9b13638f359ef62bb33612b077b4c247f6ef99c1)
DatasetDict({
    train: Dataset({
        features: ['image', 'label'],
        num_rows: 60000
    })
    test: Dataset({
        features: ['image', 'label'],
        num_rows: 10000
    })
})

And let’s create two dataloaders, one for the train and one for the validation set. For the validation loader we can double the batch size since we won’t be computing gradients for the forward pass:

train_loader = DataLoader(hf_dd['train'], batch_size=bs, shuffle=True, collate_fn=collate_func)
valid_loader = DataLoader(hf_dd['test'], batch_size=2*bs, shuffle=False, collate_fn=collate_func)

We change the training loop in a couple of ways:

  • compute loss and metrics more correctly, by taking care of the batch-size and taking the average over all data
  • add a seperate forward pass for the validation set
def fit(epochs):
    for epoch in range(epochs):
        model.train()                                       # put the model in "train" mode
        n_t = train_loss_s = 0                              # initialize variables for computing averages
        for xb, yb in train_loader:
            preds = model(xb)
            train_loss = loss_func(preds, yb)
            train_loss.backward()
            
            n_t += len(xb)
            train_loss_s += train_loss.item() * len(xb)
            
            opt.step()
            opt.zero_grad()
        
        model.eval()                                        # put the model in "eval" mode
        n_v = valid_loss_s = acc_s = 0                      # initialize variables for computing averages
        for xb, yb in valid_loader:
            with torch.no_grad():                           # no need to compute gradients on validation set
                preds = model(xb)
                valid_loss = loss_func(preds, yb)
                
                n_v += len(xb)
                valid_loss_s += valid_loss.item() * len(xb)
                acc_s += accuracy(preds, yb) * len(xb)
        
        train_loss = train_loss_s / n_t                     # compute averages of loss and metrics
        valid_loss = valid_loss_s / n_v
        acc = acc_s / n_v
        print(f'{epoch=} | {train_loss=:.3f} | {valid_loss=:.3f} | {acc=:.3f}')
model, opt = get_model_opt()

fit(5)
epoch=0 | train_loss=2.198 | valid_loss=2.095 | acc=0.276
epoch=1 | train_loss=1.980 | valid_loss=1.852 | acc=0.539
epoch=2 | train_loss=1.718 | valid_loss=1.591 | acc=0.617
epoch=3 | train_loss=1.481 | valid_loss=1.387 | acc=0.624
epoch=4 | train_loss=1.305 | valid_loss=1.241 | acc=0.637

And that’s it for this post (almost)! We have seen a lot of details on Datasets, Dataloaders and the transformation of data. We have used these concepts to improve our training loop: shuffling the training data on each epoch, and the computation of the metrics on the validation set. But before we close off, let’s make our very first exports into the library, so that next time we can continue where we finished off.

First exports

When exporting code to a module with nbdev the first thing we need to do is declare the default_exp directive. This makes sure that when we run the export, the module will be exported to dataloaders.py

 #| default_exp dataloaders

Next, we can export any code into the module by adding #|export on top of the cell we want to export. For example:

 #| export

def print_hello():
    print('hello')

To export, we simply execute:

import nbdev; nbdev.nbdev_export()

This will create a file called dataloaders.py in the library folder (in my case nntrain) with the contents:

# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/01_dataloaders.ipynb.

# %% auto 0
__all__ = ['func']

# %% ../nbs/01_dataloaders.ipynb 59
def print_hello():
    print('hello')

So what do we want to export here? Let’s see if we can create some generic code for loading data from the Huggingface datasets library into a PyTorch Dataloader:

 #|export
import torchvision.transforms.functional as TF
from torch.utils.data import DataLoader
import torch
import PIL
 #|export

def hf_ds_collate_fn(data, flatten=True):
    '''
    Collation function for building a PyTorch DataLoader from a a huggingface dataset.
    Tries to put all items from an entry into the dataset to tensor.
    PIL images are converted to tensor, either flattened or not 
    '''

    def to_tensor(i, flatten):
        if isinstance(i, PIL.Image.Image):
            if flatten:
                return torch.flatten(TF.to_tensor(i))
            return TF.to_tensor(i)
        else:
            return torch.tensor(i)
    
    to_tensor = partial(to_tensor, flatten=flatten)      # partially apply to_tensor() with flatten arg
    data = [map(to_tensor, el.values()) for el in data]  # map each item from a dataset entry through to_tensor()
    data = zip(*data)                                    # zip data of any length not just (x,y) but also (x,y,z)
    return (torch.stack(i) for i in data)
 #|export
class DataLoaders:
    def __init__(self, train, valid):
        '''Class that exposes two PyTorch dataloaders as train and valid arguments'''
        self.train = train
        self.valid = valid
    
    @classmethod
    def _get_dls(cls, train_ds, valid_ds, bs, collate_fn, **kwargs):
        '''Helper function returning 2 PyTorch Dataloaders as a tuple for 2 Datasets. **kwargs are passed to the DataLoader'''
        return (DataLoader(train_ds, batch_size=bs, shuffle=True, collate_fn=collate_fn, **kwargs),
                DataLoader(valid_ds, batch_size=bs*2, collate_fn=collate_fn, **kwargs))
        
    @classmethod
    def from_hf_dd(cls, dd, batch_size, collate_fn=hf_ds_collate_fn, **kwargs):
        '''Factory method to create a Dataloaders object for a Huggingface Dataset dict,
        uses the `hf_ds_collate_func` collation function by default, **kwargs are passes to the DataLoaders'''
        return cls(*cls._get_dls(*dd.values(), batch_size, collate_fn, **kwargs))

With show_doc() we can include the documentations of class methods:

 #|hide
from nbdev.showdoc import *
show_doc(DataLoaders.from_hf_dd)

DataLoaders.from_hf_dd

 DataLoaders.from_hf_dd (dd, batch_size, collate_fn=<function
                         hf_ds_collate_fn>, **kwargs)

Factory method to create a Dataloaders object for a Huggingface Dataset dict, uses the hf_ds_collate_func collation function by default, **kwargs are passes to the DataLoaders

Example usage:

def fit(epochs):
    for epoch in range(epochs):
        model.train()                                       
        n_t = train_loss_s = 0                              
        for xb, yb in dls.train:
            preds = model(xb)
            train_loss = loss_func(preds, yb)
            train_loss.backward()
            
            n_t += len(xb)
            train_loss_s += train_loss.item() * len(xb)
            
            opt.step()
            opt.zero_grad()
        
        model.eval()                                        
        n_v = valid_loss_s = acc_s = 0                      
        for xb, yb in dls.valid: 
            with torch.no_grad():                           
                preds = model(xb)
                valid_loss = loss_func(preds, yb)
                
                n_v += len(xb)
                valid_loss_s += valid_loss.item() * len(xb)
                acc_s += accuracy(preds, yb) * len(xb)
        
        train_loss = train_loss_s / n_t                     
        valid_loss = valid_loss_s / n_v
        acc = acc_s / n_v
        print(f'{epoch=} | {train_loss=:.3f} | {valid_loss=:.3f} | {acc=:.3f}')
hf_dd = load_dataset('fashion_mnist')
bs    = 1024
dls = DataLoaders.from_hf_dd(hf_dd, bs)
Reusing dataset fashion_mnist (/root/.cache/huggingface/datasets/fashion_mnist/fashion_mnist/1.0.0/8d6c32399aa01613d96e2cbc9b13638f359ef62bb33612b077b4c247f6ef99c1)
model, opt = get_model_opt()

fit(1)
epoch=0 | loss=2.094 | acc=0.431
 #|hide
import nbdev; nbdev.nbdev_export()

And that’s it. We have created our first module of the nntrain library🕺. Links: