Module earthvision.datasets.drone_deploy

Drone Deploy Dataset - Semantic Segmentation.

Expand source code
"""Drone Deploy Dataset - Semantic Segmentation."""
from PIL import Image
import sys
import os
import numpy as np
import random
import cv2

from typing import Any, Callable, Optional, Tuple
from .vision import VisionDataset
from earthvision.constants.DroneDeploy.config import (
    train_ids,
    val_ids,
    test_ids,
    LABELMAP,
    INV_LABELMAP,
)
from earthvision.datasets.utils import _urlretrieve


class DroneDeploy(VisionDataset):
    """Drone Deploy Semantic Dataset.

    Args:
        root (string): Root directory of dataset.
        dataset_type (string, optional): Choose dataset type.
        data_mode (int): 0 for train data, 1 for validation data, and 2 for testing data
        transform (callable, optional): A function/transform that  takes in an PIL image and
            returns a transformed version. E.g, transforms.RandomCrop
        target_transform (callable, optional): A function/transform that takes in the
            target and transforms it.
        download (bool, optional): If true, downloads the dataset from the internet and
            puts it in root directory. If dataset is already downloaded, it is not
            downloaded again.

    """

    resources = {
        "dataset-sample": "https://dl.dropboxusercontent.com/s/h8a8kev0rktf4kq/dataset-sample.tar.gz?dl=0",
        "dataset-medium": "https://dl.dropboxusercontent.com/s/r0dj9mhyv4bgbme/dataset-medium.tar.gz?dl=0",
    }

    def __init__(
        self,
        root: str,
        dataset_type="dataset-sample",
        data_mode: int = 0,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        download: bool = False,
    ) -> None:

        super(DroneDeploy, self).__init__(
            root, transform=transform, target_transform=target_transform
        )

        self.root = root
        self.dataset_type = dataset_type
        self.filename = f"{dataset_type}.tar.gz"
        self.filepath = os.path.join(self.root, self.filename)
        self.data_mode = data_mode
        self.label_path = f"{dataset_type}/label-chips"
        self.image_path = f"{dataset_type}/image-chips"

        if download and self._check_exists():
            print("file already exists.")

        if download and not self._check_exists():
            self.download()

        self.load_dataset()

    def download(self) -> None:
        """Download a dataset, extract it and create the tiles."""
        print(f'Downloading "{self.dataset_type}"')
        self.root = os.path.expanduser(self.root)
        fpath = os.path.join(self.root, self.filename)
        _urlretrieve(self.resources[self.dataset_type], fpath)

        if not os.path.exists(os.path.join(self.root, self.dataset_type)):
            print(f'Extracting "{self.filepath}"')
            os.system(f"tar -xvf {self.filepath}")
            os.system(f"mv {self.dataset_type} {self.root}")
        else:
            print(f'Folder "{self.dataset_type}" already exists.')

        image_chips = f"{self.dataset_type}/image-chips"
        label_chips = f"{self.dataset_type}/label-chips"

        if not os.path.exists(image_chips):
            os.mkdir(os.path.join(self.root, image_chips))
        if not os.path.exists(label_chips):
            os.mkdir(os.path.join(self.root, label_chips))

        run(os.path.join(self.root, self.dataset_type))

    def _check_exists(self) -> bool:
        if self.dataset_type not in self.resources.keys():
            print(f"Unknown dataset {self.dataset_type}")
            print(f"Available dataset : {self.resources.keys()}")
            sys.exit(0)

        if os.path.exists(self.filepath):
            return True
        else:
            return False

    def load_dataset(self):
        if self.data_mode == 0:
            list_chip = "train.txt"
        elif self.data_mode == 1:
            list_chip = "valid.txt"
        elif self.data_mode == 2:
            list_chip = "test.txt"

        files = [
            f"{os.path.join(self.root, self.dataset_type)}/image-chips/{fname}"
            for fname in load_lines(os.path.join(self.root, self.dataset_type, list_chip))
        ]
        self.image_files = files

    def __getitem__(self, idx) -> Tuple[Any, Any]:
        """
        Args:
            idx (int): Index
        Returns:
            tuple: (img, target) where target is index of the target class.
        """
        image_file = self.image_files[idx]
        label_file = image_file.replace(self.image_path, self.label_path)

        img = np.array(load_img(image_file))
        target = mask_to_classes(load_img(label_file))
        target = np.array(target)

        if self.transform is not None:
            img = Image.fromarray(img)
            img = self.transform(img)

        if self.target_transform is not None:
            target = Image.fromarray(target)
            target = self.target_transform(target)
        return img, target

    def __len__(self) -> int:
        return len(self.image_files)

    def on_epoch_end(self):
        random.shuffle(self.image_files)


def load_lines(fname):
    with open(fname, "r") as f:
        return [line.strip() for line in f.readlines()]


def load_img(fname):
    return np.array(Image.open(fname))


def mask_to_classes(mask):
    return to_categorical(mask[:, :, 0], 6)


def to_categorical(y, num_classes=None, dtype="float32"):
    """Converts a class vector (integers) to binary class matrix.
    E.g. for use with categorical_crossentropy.
    Args:
        y: class vector to be converted into a matrix
            (integers from 0 to num_classes).
        num_classes: total number of classes. If `None`, this would be inferred
          as the (largest number in `y`) + 1.
        dtype: The data type expected by the input. Default: `'float32'`.
    Returns:
        A binary matrix representation of the input. The classes axis is placed
        last.
    Raises:
        Value Error: If input contains string value
    """
    y = np.array(y, dtype="int")
    input_shape = y.shape
    if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:
        input_shape = tuple(input_shape[:-1])
    y = y.ravel()
    if not num_classes:
        num_classes = np.max(y) + 1
    n = y.shape[0]
    categorical = np.zeros((n, num_classes), dtype=dtype)
    categorical[np.arange(n), y] = 1
    output_shape = input_shape + (num_classes,)
    categorical = np.reshape(categorical, output_shape)
    return categorical


def get_split(scene):
    if scene in train_ids:
        return "train.txt"
    if scene in val_ids:
        return "valid.txt"
    if scene in test_ids:
        return "test.txt"


def color2class(orthochip, img):
    ret = np.zeros((img.shape[0], img.shape[1]), dtype="uint8")
    ret = np.dstack([ret, ret, ret])
    colors = np.unique(img.reshape(-1, img.shape[2]), axis=0)

    # Skip any chips that would contain magenta (IGNORE) pixels
    seen_colors = set([tuple(color) for color in colors])
    IGNORE_COLOR = LABELMAP[0]
    if IGNORE_COLOR in seen_colors:
        return None, None

    for color in colors:
        locs = np.where(
            (img[:, :, 0] == color[0]) & (img[:, :, 1] == color[1]) & (img[:, :, 2] == color[2])
        )
        ret[locs[0], locs[1], :] = INV_LABELMAP[tuple(color)] - 1

    return orthochip, ret


def image2tile(
    prefix,
    scene,
    dataset,
    orthofile,
    elevafile,
    labelfile,
    windowx,
    windowy,
    stridex,
    stridey,
):

    ortho = cv2.imread(orthofile)
    label = cv2.imread(labelfile)

    assert ortho.shape[0] == label.shape[0]
    assert ortho.shape[1] == label.shape[1]

    shape = ortho.shape
    xsize = shape[1]
    ysize = shape[0]
    print(f"converting {dataset} image {orthofile} {xsize}x{ysize} to chips ...")

    counter = 0
    for xi in range(0, shape[1] - windowx, stridex):
        for yi in range(0, shape[0] - windowy, stridey):
            orthochip = ortho[yi : yi + windowy, xi : xi + windowx, :]
            labelchip = label[yi : yi + windowy, xi : xi + windowx, :]

            orthochip, classchip = color2class(orthochip, labelchip)

            if classchip is None:
                continue

            orthochip_filename = os.path.join(
                prefix, "image-chips", scene + "-" + str(counter).zfill(6) + ".png"
            )
            labelchip_filename = os.path.join(
                prefix, "label-chips", scene + "-" + str(counter).zfill(6) + ".png"
            )

            with open(f"{prefix}/{dataset}", mode="a") as fd:
                fd.write(scene + "-" + str(counter).zfill(6) + ".png\n")

            cv2.imwrite(orthochip_filename, orthochip)
            cv2.imwrite(labelchip_filename, classchip)
            counter += 1


def run(prefix, size=300, stride=300):
    lines = [line for line in open(f"{prefix}/index.csv")]
    print(
        "converting images to chips - this may take a few minutes but only needs to be done once."
    )

    for lineno, line in enumerate(lines):
        line = line.strip().split(" ")
        scene = line[1]
        dataset = get_split(scene)

        orthofile = os.path.join(prefix, "images", scene + "-ortho.tif")
        elevafile = os.path.join(prefix, "elevations", scene + "-elev.tif")
        labelfile = os.path.join(prefix, "labels", scene + "-label.png")

        if os.path.exists(orthofile) and os.path.exists(labelfile):
            image2tile(
                prefix,
                scene,
                dataset,
                orthofile,
                elevafile,
                labelfile,
                windowx=size,
                windowy=size,
                stridex=stride,
                stridey=stride,
            )

Functions

def color2class(orthochip, img)
Expand source code
def color2class(orthochip, img):
    ret = np.zeros((img.shape[0], img.shape[1]), dtype="uint8")
    ret = np.dstack([ret, ret, ret])
    colors = np.unique(img.reshape(-1, img.shape[2]), axis=0)

    # Skip any chips that would contain magenta (IGNORE) pixels
    seen_colors = set([tuple(color) for color in colors])
    IGNORE_COLOR = LABELMAP[0]
    if IGNORE_COLOR in seen_colors:
        return None, None

    for color in colors:
        locs = np.where(
            (img[:, :, 0] == color[0]) & (img[:, :, 1] == color[1]) & (img[:, :, 2] == color[2])
        )
        ret[locs[0], locs[1], :] = INV_LABELMAP[tuple(color)] - 1

    return orthochip, ret
def get_split(scene)
Expand source code
def get_split(scene):
    if scene in train_ids:
        return "train.txt"
    if scene in val_ids:
        return "valid.txt"
    if scene in test_ids:
        return "test.txt"
def image2tile(prefix, scene, dataset, orthofile, elevafile, labelfile, windowx, windowy, stridex, stridey)
Expand source code
def image2tile(
    prefix,
    scene,
    dataset,
    orthofile,
    elevafile,
    labelfile,
    windowx,
    windowy,
    stridex,
    stridey,
):

    ortho = cv2.imread(orthofile)
    label = cv2.imread(labelfile)

    assert ortho.shape[0] == label.shape[0]
    assert ortho.shape[1] == label.shape[1]

    shape = ortho.shape
    xsize = shape[1]
    ysize = shape[0]
    print(f"converting {dataset} image {orthofile} {xsize}x{ysize} to chips ...")

    counter = 0
    for xi in range(0, shape[1] - windowx, stridex):
        for yi in range(0, shape[0] - windowy, stridey):
            orthochip = ortho[yi : yi + windowy, xi : xi + windowx, :]
            labelchip = label[yi : yi + windowy, xi : xi + windowx, :]

            orthochip, classchip = color2class(orthochip, labelchip)

            if classchip is None:
                continue

            orthochip_filename = os.path.join(
                prefix, "image-chips", scene + "-" + str(counter).zfill(6) + ".png"
            )
            labelchip_filename = os.path.join(
                prefix, "label-chips", scene + "-" + str(counter).zfill(6) + ".png"
            )

            with open(f"{prefix}/{dataset}", mode="a") as fd:
                fd.write(scene + "-" + str(counter).zfill(6) + ".png\n")

            cv2.imwrite(orthochip_filename, orthochip)
            cv2.imwrite(labelchip_filename, classchip)
            counter += 1
def load_img(fname)
Expand source code
def load_img(fname):
    return np.array(Image.open(fname))
def load_lines(fname)
Expand source code
def load_lines(fname):
    with open(fname, "r") as f:
        return [line.strip() for line in f.readlines()]
def mask_to_classes(mask)
Expand source code
def mask_to_classes(mask):
    return to_categorical(mask[:, :, 0], 6)
def run(prefix, size=300, stride=300)
Expand source code
def run(prefix, size=300, stride=300):
    lines = [line for line in open(f"{prefix}/index.csv")]
    print(
        "converting images to chips - this may take a few minutes but only needs to be done once."
    )

    for lineno, line in enumerate(lines):
        line = line.strip().split(" ")
        scene = line[1]
        dataset = get_split(scene)

        orthofile = os.path.join(prefix, "images", scene + "-ortho.tif")
        elevafile = os.path.join(prefix, "elevations", scene + "-elev.tif")
        labelfile = os.path.join(prefix, "labels", scene + "-label.png")

        if os.path.exists(orthofile) and os.path.exists(labelfile):
            image2tile(
                prefix,
                scene,
                dataset,
                orthofile,
                elevafile,
                labelfile,
                windowx=size,
                windowy=size,
                stridex=stride,
                stridey=stride,
            )
def to_categorical(y, num_classes=None, dtype='float32')

Converts a class vector (integers) to binary class matrix. E.g. for use with categorical_crossentropy.

Args

y
class vector to be converted into a matrix (integers from 0 to num_classes).
num_classes
total number of classes. If None, this would be inferred as the (largest number in y) + 1.
dtype
The data type expected by the input. Default: 'float32'.

Returns

A binary matrix representation of the input. The classes axis is placed last.

Raises

Value Error
If input contains string value
Expand source code
def to_categorical(y, num_classes=None, dtype="float32"):
    """Converts a class vector (integers) to binary class matrix.
    E.g. for use with categorical_crossentropy.
    Args:
        y: class vector to be converted into a matrix
            (integers from 0 to num_classes).
        num_classes: total number of classes. If `None`, this would be inferred
          as the (largest number in `y`) + 1.
        dtype: The data type expected by the input. Default: `'float32'`.
    Returns:
        A binary matrix representation of the input. The classes axis is placed
        last.
    Raises:
        Value Error: If input contains string value
    """
    y = np.array(y, dtype="int")
    input_shape = y.shape
    if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:
        input_shape = tuple(input_shape[:-1])
    y = y.ravel()
    if not num_classes:
        num_classes = np.max(y) + 1
    n = y.shape[0]
    categorical = np.zeros((n, num_classes), dtype=dtype)
    categorical[np.arange(n), y] = 1
    output_shape = input_shape + (num_classes,)
    categorical = np.reshape(categorical, output_shape)
    return categorical

Classes

class DroneDeploy (root: str, dataset_type='dataset-sample', data_mode: int = 0, transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, download: bool = False)

Drone Deploy Semantic Dataset.

Args

root : string
Root directory of dataset.
dataset_type : string, optional
Choose dataset type.
data_mode : int
0 for train data, 1 for validation data, and 2 for testing data
transform : callable, optional
A function/transform that takes in an PIL image and returns a transformed version. E.g, transforms.RandomCrop
target_transform : callable, optional
A function/transform that takes in the target and transforms it.
download : bool, optional
If true, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again.
Expand source code
class DroneDeploy(VisionDataset):
    """Drone Deploy Semantic Dataset.

    Args:
        root (string): Root directory of dataset.
        dataset_type (string, optional): Choose dataset type.
        data_mode (int): 0 for train data, 1 for validation data, and 2 for testing data
        transform (callable, optional): A function/transform that  takes in an PIL image and
            returns a transformed version. E.g, transforms.RandomCrop
        target_transform (callable, optional): A function/transform that takes in the
            target and transforms it.
        download (bool, optional): If true, downloads the dataset from the internet and
            puts it in root directory. If dataset is already downloaded, it is not
            downloaded again.

    """

    resources = {
        "dataset-sample": "https://dl.dropboxusercontent.com/s/h8a8kev0rktf4kq/dataset-sample.tar.gz?dl=0",
        "dataset-medium": "https://dl.dropboxusercontent.com/s/r0dj9mhyv4bgbme/dataset-medium.tar.gz?dl=0",
    }

    def __init__(
        self,
        root: str,
        dataset_type="dataset-sample",
        data_mode: int = 0,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        download: bool = False,
    ) -> None:

        super(DroneDeploy, self).__init__(
            root, transform=transform, target_transform=target_transform
        )

        self.root = root
        self.dataset_type = dataset_type
        self.filename = f"{dataset_type}.tar.gz"
        self.filepath = os.path.join(self.root, self.filename)
        self.data_mode = data_mode
        self.label_path = f"{dataset_type}/label-chips"
        self.image_path = f"{dataset_type}/image-chips"

        if download and self._check_exists():
            print("file already exists.")

        if download and not self._check_exists():
            self.download()

        self.load_dataset()

    def download(self) -> None:
        """Download a dataset, extract it and create the tiles."""
        print(f'Downloading "{self.dataset_type}"')
        self.root = os.path.expanduser(self.root)
        fpath = os.path.join(self.root, self.filename)
        _urlretrieve(self.resources[self.dataset_type], fpath)

        if not os.path.exists(os.path.join(self.root, self.dataset_type)):
            print(f'Extracting "{self.filepath}"')
            os.system(f"tar -xvf {self.filepath}")
            os.system(f"mv {self.dataset_type} {self.root}")
        else:
            print(f'Folder "{self.dataset_type}" already exists.')

        image_chips = f"{self.dataset_type}/image-chips"
        label_chips = f"{self.dataset_type}/label-chips"

        if not os.path.exists(image_chips):
            os.mkdir(os.path.join(self.root, image_chips))
        if not os.path.exists(label_chips):
            os.mkdir(os.path.join(self.root, label_chips))

        run(os.path.join(self.root, self.dataset_type))

    def _check_exists(self) -> bool:
        if self.dataset_type not in self.resources.keys():
            print(f"Unknown dataset {self.dataset_type}")
            print(f"Available dataset : {self.resources.keys()}")
            sys.exit(0)

        if os.path.exists(self.filepath):
            return True
        else:
            return False

    def load_dataset(self):
        if self.data_mode == 0:
            list_chip = "train.txt"
        elif self.data_mode == 1:
            list_chip = "valid.txt"
        elif self.data_mode == 2:
            list_chip = "test.txt"

        files = [
            f"{os.path.join(self.root, self.dataset_type)}/image-chips/{fname}"
            for fname in load_lines(os.path.join(self.root, self.dataset_type, list_chip))
        ]
        self.image_files = files

    def __getitem__(self, idx) -> Tuple[Any, Any]:
        """
        Args:
            idx (int): Index
        Returns:
            tuple: (img, target) where target is index of the target class.
        """
        image_file = self.image_files[idx]
        label_file = image_file.replace(self.image_path, self.label_path)

        img = np.array(load_img(image_file))
        target = mask_to_classes(load_img(label_file))
        target = np.array(target)

        if self.transform is not None:
            img = Image.fromarray(img)
            img = self.transform(img)

        if self.target_transform is not None:
            target = Image.fromarray(target)
            target = self.target_transform(target)
        return img, target

    def __len__(self) -> int:
        return len(self.image_files)

    def on_epoch_end(self):
        random.shuffle(self.image_files)

Ancestors

Class variables

var functions : Dict[str, Callable]
var resources

Methods

def download(self) ‑> None

Download a dataset, extract it and create the tiles.

Expand source code
def download(self) -> None:
    """Download a dataset, extract it and create the tiles."""
    print(f'Downloading "{self.dataset_type}"')
    self.root = os.path.expanduser(self.root)
    fpath = os.path.join(self.root, self.filename)
    _urlretrieve(self.resources[self.dataset_type], fpath)

    if not os.path.exists(os.path.join(self.root, self.dataset_type)):
        print(f'Extracting "{self.filepath}"')
        os.system(f"tar -xvf {self.filepath}")
        os.system(f"mv {self.dataset_type} {self.root}")
    else:
        print(f'Folder "{self.dataset_type}" already exists.')

    image_chips = f"{self.dataset_type}/image-chips"
    label_chips = f"{self.dataset_type}/label-chips"

    if not os.path.exists(image_chips):
        os.mkdir(os.path.join(self.root, image_chips))
    if not os.path.exists(label_chips):
        os.mkdir(os.path.join(self.root, label_chips))

    run(os.path.join(self.root, self.dataset_type))
def load_dataset(self)
Expand source code
def load_dataset(self):
    if self.data_mode == 0:
        list_chip = "train.txt"
    elif self.data_mode == 1:
        list_chip = "valid.txt"
    elif self.data_mode == 2:
        list_chip = "test.txt"

    files = [
        f"{os.path.join(self.root, self.dataset_type)}/image-chips/{fname}"
        for fname in load_lines(os.path.join(self.root, self.dataset_type, list_chip))
    ]
    self.image_files = files
def on_epoch_end(self)
Expand source code
def on_epoch_end(self):
    random.shuffle(self.image_files)