Boostcamp AI Tech

[Boostcamp Day-12] PyTorch - AutoGrad & Optimizer, Dataset & Dataloader

ju_young 2021. 8. 18. 17:38
728x90

torch.nn.Module

  • 딥러닝을 구성하는 Layer의 base class
  • Input, Output, Forward(순전파), Backward(역전파) 정의
  • 학습의 대상이 되는 parameter(tensor)정의

nn.Parameter

  • Tendor 객체의 상속 객체
  • nn.Module 내에 attribute가 될 때는 required_grad=True로 지정되어 학습 대상이 되는 Tensor
  • 대부분의 layer에는 weights 값들이 지정되어 있기 때문에 우리가 직접 지정할 일은 없음
class MyLiner(nn.Module):
    def __init__(self, in_features, out_features, bias=True):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weights = nn.Parameter(torch.randn(in_features, out_features))
        self.bias = nn.Parameter(torch.randn(out_features))
    def forward(self, x : Tensor):
        return x @ self.weights + self.bias

Backward

  • Layer에 있는 Parameter들의 미분을 수행
  • Forward의 결과값(model의 output=예측치)과 실제값의 차이(loss)에 대해 미분을 수행
  • 해당 값으로 parameter 업데이트
for epoch in range(epochs):

    optimizer.zero_grad() #gradient 초기화
    outputs = model(inputs) #예측값
    loss = criterion(outputs, labels) #loss
    loss.backward() #미분
    optimizer.step() #update parameter

Backward from the scratch

  • 실제 backward는 Module 단계에서 직접 지정가능
  • Module에서 backward와 optimizer 오버라이딩
class LR(nn.Module):
    def __init__(self, dim, lr=torch.scalar_tensor(0.01)):
        super().__init__()
        self.w = torch.zeros(dim, 1, dtype=torch.float).to(device)
        self.b = torch.scalar_tensor(0).to(device)
        self.grads = {"dw" : torch.zeros(dim, 1, dtype=torch.float).to(device),
                     "db" : torch.scalar_tensor(0).to(device)}
        self.lr = lr.to(device)

    def forward(self, x):
        z = torch.mm(self.w.T, x)
        a = self.sigmoid(z)
        return a

    def sigmoid(self, z):
        return 1 / (1 + torch.exp(-z))

    def backward(self, x, yhat, y):
        self.grads["dw"] = (1/x.shape[1])*torch.mm(x, (yhat-y).T)
        self.grads["db"] = (1/x.shape[1])*torch.sum(yhat-y)

    def optimize(self):
        self.w -= self.lr * self.grads["dw"]
        self.w -= self.lr * self.grads["db"]

Datasets

  • 데이터 입력 형태를 정의하는 클래스
  • 데이터를 입력하는 방식의 표준화
  • Image, Text, Audio 등에 따른 입력정의

import torch
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample

Dataset 클래스 생성시 유의점

  • 데이터 형태에 따라 각 함수를 다르게 정의
  • 모든 것을 데이터 생성 시점에 처리할 필요는 없음 : image의 Tensor 변화는 학습에 필요한 시점에 변환
  • 데이터 셋에 대한 표준화된 처리방법 제공 필요
  • 최근에는 HuggingFace 등 표준화된 라이브러리 사용

DataLoader 클래스

  • Data의 Batch를 생성해주는 클래스
  • 학습직전 데이터의 변환을 책임
  • Tensor로 변환 + Batch 처리가 메인 업무
  • 병렬적인 데이터 전처리 코드의 고민 필요
text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)
next(iter(MyDataLoader))
# {'Text': ['Glum', 'Sad'], 'Class': ['Negative', 'Negative']}

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)
for dataset in MyDataLoader:
    print(dataset)
# {'Text': ['Glum', 'Unhapy'], 'Class': ['Negative', 'Negative']}
# {'Text': ['Sad', 'Amazing'], 'Class': ['Negative', 'Positive']}
# {'Text': ['Happy'], 'Class': ['Positive']}
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,
batch_sampler=None, num_workers=0, collate_fn=None,
pin_memory=False, drop_last=False, timeout=0,
worker_init_fn=None, *, prefetch_factor=2,
persistent_workers=False)

"""
torch의 dataset은 다음과 같이 2가지 스타일이 있다.

- Map-style dataset
index가 존재하여 data[index]로 데이터를 참조할 수 있고 __getitem__과 __len__이 선언되어야한다.

- Iterable-style dataset
random으로 읽기에 어렵거나 data에 따라 batch size가 달라지는 데이터(dynamic batch size)에 적합(stream data, real-time log 등)하며 __iter__ 선언이 필요하다.
"""

# batch_size : 배치(batch)의 크기 지정
# shuffle : 데이터를 DataLoader에서 섞어서 사용할지를 설정(실험 재현을 위해 torch.manual_seed를 같이 사용)
# sampler : 데이터의 index를 원하는 방식대로 조정한다.(shuffle=False일때, map-style dataset일때 가능)
# batch_sampler : sampler와 거의 동일
# num_wrkers : 데이터 로딩에 사용하는 subprocess 개수(멀티프로세싱)
# collate_fn : map-style 데이터셋에서 sample list를 batch 단위로 바꾸기 위해 필요한 기능
# pin_memory : True로 선언하면 dataloader는 Tensor를 CUDA 고정 메모리에 올린다.
# drop_last : batch_size에 따라 마지막 batch 길이가 다를 경우 drop 시킨다.
# time_out : 양수로 주어지는 경우, DataLoader가 data를 불러오는데 제한시간
# worker_init_fn : 어떤 worker를 불러올 것 인가를 리스트로 전달

Sampler

1. SequentialSampler

"range(len(self.data_source))" index 값을 가지는 data sampling

class SequentialSampler(Sampler[int]):
    r"""Samples elements sequentially, always in the same order.

    Args:
        data_source (Dataset): dataset to sample from
    """
    data_source: Sized

    def __init__(self, data_source: Sized) -> None:
        self.data_source = data_source

    def __iter__(self) -> Iterator[int]:
        return iter(range(len(self.data_source)))

    def __len__(self) -> int:
        return len(self.data_source)

[실습코드]

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, SequentialSampler

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample

text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=False, sampler=SequentialSampler([0, 0, 1, 0]))

for dataset in MyDataLoader:
  print(dataset)

2. RandomSampler

(data_source만 주어졌을 때) "torch.randperm(n).tolist()" index 값을 가지는 data sampling

class RandomSampler(Sampler[int]):
    r"""Samples elements randomly. If without replacement, then sample from a shuffled dataset.
    If with replacement, then user can specify :attr:`num_samples` to draw.

    Args:
        data_source (Dataset): dataset to sample from
        replacement (bool): samples are drawn on-demand with replacement if ``True``, default=``False``
        num_samples (int): number of samples to draw, default=`len(dataset)`. This argument
            is supposed to be specified only when `replacement` is ``True``.
        generator (Generator): Generator used in sampling.
    """
    data_source: Sized
    replacement: bool

    def __init__(self, data_source: Sized, replacement: bool = False,
                 num_samples: Optional[int] = None, generator=None) -> None:
        self.data_source = data_source
        self.replacement = replacement
        self._num_samples = num_samples
        self.generator = generator

        if not isinstance(self.replacement, bool):
            raise TypeError("replacement should be a boolean value, but got "
                            "replacement={}".format(self.replacement))

        if self._num_samples is not None and not replacement:
            raise ValueError("With replacement=False, num_samples should not be specified, "
                             "since a random permute will be performed.")

        if not isinstance(self.num_samples, int) or self.num_samples <= 0:
            raise ValueError("num_samples should be a positive integer "
                             "value, but got num_samples={}".format(self.num_samples))

    @property
    def num_samples(self) -> int:
        # dataset size might change at runtime
        if self._num_samples is None:
            return len(self.data_source)
        return self._num_samples

    def __iter__(self) -> Iterator[int]:
        n = len(self.data_source)
        if self.generator is None:
            generator = torch.Generator()
            generator.manual_seed(int(torch.empty((), dtype=torch.int64).random_().item()))
        else:
            generator = self.generator
        if self.replacement:
            for _ in range(self.num_samples // 32):
                yield from torch.randint(high=n, size=(32,), dtype=torch.int64, generator=generator).tolist()
            yield from torch.randint(high=n, size=(self.num_samples % 32,), dtype=torch.int64, generator=generator).tolist()
        else:
            yield from torch.randperm(n, generator=generator).tolist()

    def __len__(self) -> int:
        return self.num_samples

[실습코드]

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, RandomSampler

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample

text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=False, sampler=RandomSampler([0, 1, 2]))

for dataset in MyDataLoader:
  print(dataset)

3. SubsetRandomSampler

"self.indices[i] for i in torch.randperm(len(self.indices))" index 값을 가지는 data sampling

class SubsetRandomSampler(Sampler[int]):
    r"""Samples elements randomly from a given list of indices, without replacement.

    Args:
        indices (sequence): a sequence of indices
        generator (Generator): Generator used in sampling.
    """
    indices: Sequence[int]

    def __init__(self, indices: Sequence[int], generator=None) -> None:
        self.indices = indices
        self.generator = generator

    def __iter__(self) -> Iterator[int]:
        return (self.indices[i] for i in torch.randperm(len(self.indices), generator=self.generator))

    def __len__(self) -> int:
        return len(self.indices)

[실습코드]

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, SubsetRandomSampler

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample



text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=False, sampler=SubsetRandomSampler([1, 1, 1]))

for dataset in MyDataLoader:
  print(dataset)

4. WeightedRandomSampler

"torch.multinomial(self.weights, self.num_samples)" index 값을 가지는 data sampling (Example 참고)

class WeightedRandomSampler(Sampler[int]):
    r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).

    Args:
        weights (sequence)   : a sequence of weights, not necessary summing up to one
        num_samples (int): number of samples to draw
        replacement (bool): if ``True``, samples are drawn with replacement.
            If not, they are drawn without replacement, which means that when a
            sample index is drawn for a row, it cannot be drawn again for that row.
        generator (Generator): Generator used in sampling.

    Example:
        >>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))
        [4, 4, 1, 4, 5]
        >>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))
        [0, 1, 4, 3, 2]
    """
    weights: Tensor
    num_samples: int
    replacement: bool

    def __init__(self, weights: Sequence[float], num_samples: int,
                 replacement: bool = True, generator=None) -> None:
        if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \
                num_samples <= 0:
            raise ValueError("num_samples should be a positive integer "
                             "value, but got num_samples={}".format(num_samples))
        if not isinstance(replacement, bool):
            raise ValueError("replacement should be a boolean value, but got "
                             "replacement={}".format(replacement))
        self.weights = torch.as_tensor(weights, dtype=torch.double)
        self.num_samples = num_samples
        self.replacement = replacement
        self.generator = generator

    def __iter__(self) -> Iterator[int]:
        rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)
        return iter(rand_tensor.tolist())

    def __len__(self) -> int:
        return self.num_samples

[실습코드]

import torch
from torch.utils.data import Dataset
from torch.utils.data import WeightedRandomSampler

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample


text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=False, sampler=WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5))

for dataset in MyDataLoader:
  print(dataset)

5. BatchSampler

Batch 단위로 sampling이 가능하다. 이 smapler의 경우에는 index가 list로 return되기 때문에 dataset class에서 getitem을 수정해야한다.

class BatchSampler(Sampler[List[int]]):
    r"""Wraps another sampler to yield a mini-batch of indices.

    Args:
        sampler (Sampler or Iterable): Base sampler. Can be any iterable object
        batch_size (int): Size of mini-batch.
        drop_last (bool): If ``True``, the sampler will drop the last batch if
            its size would be less than ``batch_size``

    Example:
        >>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=False))
        [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
        >>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=True))
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    """

    def __init__(self, sampler: Sampler[int], batch_size: int, drop_last: bool) -> None:
        # Since collections.abc.Iterable does not check for `__getitem__`, which
        # is one way for an object to be an iterable, we don't do an `isinstance`
        # check here.
        if not isinstance(batch_size, int) or isinstance(batch_size, bool) or \
                batch_size <= 0:
            raise ValueError("batch_size should be a positive integer value, "
                             "but got batch_size={}".format(batch_size))
        if not isinstance(drop_last, bool):
            raise ValueError("drop_last should be a boolean value, but got "
                             "drop_last={}".format(drop_last))
        self.sampler = sampler
        self.batch_size = batch_size
        self.drop_last = drop_last

    def __iter__(self) -> Iterator[List[int]]:
        batch = []
        for idx in self.sampler:
            batch.append(idx)
            if len(batch) == self.batch_size:
                yield batch
                batch = []
        if len(batch) > 0 and not self.drop_last:
            yield batch

    def __len__(self) -> int:
        # Can only be called if self.sampler has __len__ implemented
        # We cannot enforce this condition, so we turn off typechecking for the
        # implementation below.
        # Somewhat related: see NOTE [ Lack of Default `__len__` in Python Abstract Base Classes ]
        if self.drop_last:
            return len(self.sampler) // self.batch_size  # type: ignore[arg-type]
        else:
            return (len(self.sampler) + self.batch_size - 1) // self.batch_size  # type: ignore[arg-type]

[실습코드]

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, SequentialSampler, RandomSampler, SubsetRandomSampler, BatchSampler, DistributedSampler 

class CustomDataset(Dataset):
    def __init__(self, text, labels):
        self.labels = labels
        self.data = text
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        if isinstance(idx, list):
          label = [self.labels[i] for i in idx]
          text = [self.data[i] for i in idx]
        else:
          label = self.labels[idx]
          text = self.data[idx]
        sample = {"Text" : text, "Class" : label}
        return sample

text = ['Happy', 'Amazing', 'Sad', 'Unhapy', 'Glum']
labels = ['Positive', 'Positive', 'Negative', 'Negative', 'Negative']
MyDataset = CustomDataset(text, labels)

MyDataLoader = DataLoader(MyDataset, batch_size=2, shuffle=True)

MyDataLoader = DataLoader(MyDataset, batch_size=None, shuffle=False, sampler=BatchSampler(SequentialSampler(MyDataLoader), batch_size=2, drop_last=True))

for dataset in MyDataLoader:
  print(dataset)

6. DistributedSampler

class DistributedSampler(Sampler[T_co]):
    r"""Sampler that restricts data loading to a subset of the dataset.

    It is especially useful in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel`. In such a case, each
    process can pass a :class:`~torch.utils.data.DistributedSampler` instance as a
    :class:`~torch.utils.data.DataLoader` sampler, and load a subset of the
    original dataset that is exclusive to it.

    .. note::
        Dataset is assumed to be of constant size.

    Args:
        dataset: Dataset used for sampling.
        num_replicas (int, optional): Number of processes participating in
            distributed training. By default, :attr:`world_size` is retrieved from the
            current distributed group.
        rank (int, optional): Rank of the current process within :attr:`num_replicas`.
            By default, :attr:`rank` is retrieved from the current distributed
            group.
        shuffle (bool, optional): If ``True`` (default), sampler will shuffle the
            indices.
        seed (int, optional): random seed used to shuffle the sampler if
            :attr:`shuffle=True`. This number should be identical across all
            processes in the distributed group. Default: ``0``.
        drop_last (bool, optional): if ``True``, then the sampler will drop the
            tail of the data to make it evenly divisible across the number of
            replicas. If ``False``, the sampler will add extra indices to make
            the data evenly divisible across the replicas. Default: ``False``.

    .. warning::
        In distributed mode, calling the :meth:`set_epoch` method at
        the beginning of each epoch **before** creating the :class:`DataLoader` iterator
        is necessary to make shuffling work properly across multiple epochs. Otherwise,
        the same ordering will be always used.

    Example::

        >>> sampler = DistributedSampler(dataset) if is_distributed else None
        >>> loader = DataLoader(dataset, shuffle=(sampler is None),
        ...                     sampler=sampler)
        >>> for epoch in range(start_epoch, n_epochs):
        ...     if is_distributed:
        ...         sampler.set_epoch(epoch)
        ...     train(loader)
    """

    def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
                 rank: Optional[int] = None, shuffle: bool = True,
                 seed: int = 0, drop_last: bool = False) -> None:
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        if rank >= num_replicas or rank < 0:
            raise ValueError(
                "Invalid rank {}, rank should be in the interval"
                " [0, {}]".format(rank, num_replicas - 1))
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.drop_last = drop_last
        # If the dataset length is evenly divisible by # of replicas, then there
        # is no need to drop any data, since the dataset will be split equally.
        if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                # `type:ignore` is required because Dataset cannot provide a default __len__
                # see NOTE in pytorch/torch/utils/data/sampler.py
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]
        self.total_size = self.num_samples * self.num_replicas
        self.shuffle = shuffle
        self.seed = seed

    def __iter__(self) -> Iterator[T_co]:
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
        else:
            indices = list(range(len(self.dataset)))  # type: ignore[arg-type]

        if not self.drop_last:
            # add extra samples to make it evenly divisible
            padding_size = self.total_size - len(indices)
            if padding_size <= len(indices):
                indices += indices[:padding_size]
            else:
                indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
        else:
            # remove tail of data to make it evenly divisible.
            indices = indices[:self.total_size]
        assert len(indices) == self.total_size

        # subsample
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples

        return iter(indices)

    def __len__(self) -> int:
        return self.num_samples

    def set_epoch(self, epoch: int) -> None:
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.

        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch
728x90