First commit

This commit is contained in:
vd
2022-09-02 03:48:26 +03:00
commit 57dc12b305
8 changed files with 456 additions and 0 deletions

6
classifier_config.json Normal file
View File

@@ -0,0 +1,6 @@
{
"n_neighbors": 10,
"classes" : ["black_red", "green_orange", "yellow_grey"],
"path_to_dataset": "./triplet_dataset",
"embedding_model": "./model_embedding.pt"
}

4
download-dataset Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
gdown 1rP7GHDqx6BKTGTh9I6ecEmRgn5-HG1N0
unzip -q "triplet_dataset.zip"
rm ./"triplet_dataset.zip"

15
embedding_config.json Normal file
View File

@@ -0,0 +1,15 @@
{
"epochs": 4,
"embedding_dims": 128,
"batch_size": 32,
"classes" : ["black_red", "green_orange", "yellow_grey"],
"lr": 0.001,
"triplet-loss-margin": 1.0,
"triplet-loss-p": 2,
"train_size": 0.9,
"augmentations": true,
"env": "",
"path_to_dataset": "./triplet_dataset",
"visualize": true,
"pca_components": 16
}

BIN
embedding_loss.pdf Normal file

Binary file not shown.

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
torch
torchvision
sklearn

163
train-classifier.py Normal file
View File

@@ -0,0 +1,163 @@
import torch
import random
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torchvision import transforms
from sklearn.neighbors import KNeighborsClassifier as kNN
import pickle
import json
with open('classifier_config.json') as config_file:
config = json.load(config_file)
path_to_dataset = Path(config["path_to_dataset"])
if len(config["classes"]) == 0:
classes = sorted([x.name for x in path_to_dataset.iterdir() if x.is_dir()])
else:
classes = config["classes"]
class SquarePad:
def __call__(self, image):
_, w, h = image.size()
max_wh = max(w, h)
hp = int((max_wh - w) / 2)
vp = int((max_wh - h) / 2)
padding = (vp, hp, vp, hp)
return transforms.functional.pad(image, padding, 0, 'constant')
class Normalize01:
def __call__(self, image):
image -= image.min()
image /= image.max()
return image
class CustomDataset(Dataset):
def __init__(self, path, classes = None, augmentations=None, target_transform=None, size = (64, 64)):
self.path = path # path to directories
self.transform_aug = augmentations
self.target_transform = target_transform
self.size = size
self.classes = classes
self.paths_to_images = []
self.labels = []
for c in self.classes:
paths_to_class = list(Path(self.path, c).glob('*.jpg'))
self.labels += [c]*len(paths_to_class)
self.paths_to_images += paths_to_class
self.labels = np.array(self.labels)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
anchor_label = self.labels[idx]
positive_index_list = [i for i, x in enumerate(self.labels) if x == anchor_label]
negative_index_list = [i for i, x in enumerate(self.labels) if x != anchor_label]
positive_idx = np.random.choice(positive_index_list)
negative_idx = np.random.choice(negative_index_list)
images = []
for i in [idx, positive_idx, negative_idx]:
images += [read_image(str(self.paths_to_images[i])).float()]
transform = transforms.Compose([
SquarePad(),
transforms.Resize(self.size),
Normalize01()
])
for i, image in enumerate(images):
images[i] = transform(image)
if self.transform_aug is not None:
for i, image in enumerate(images):
images[i] = self.transform_aug(image)
return images, anchor_label
batch_size = 1
dataset = CustomDataset(path_to_dataset, classes=classes, size=(64, 64))
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
class EmbeddingModel(nn.Module):
def __init__(self, emb_dim=128):
super(EmbeddingModel, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 16, 3),
nn.BatchNorm2d(16),
nn.PReLU(),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, 3),
nn.BatchNorm2d(32),
nn.PReLU(32),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3),
nn.PReLU(),
nn.BatchNorm2d(64),
nn.MaxPool2d(2)
)
self.fc = nn.Sequential(
nn.Linear(64*6*6, 256),
nn.PReLU(),
nn.Linear(256, emb_dim)
)
def forward(self, x):
x = self.conv(x)
x = x.view(-1, 64*6*6)
x = self.fc(x)
return x
if torch.cuda.is_available():
print('Using GPU.')
device = 'cuda'
else:
print("CUDA not detected, using CPU.")
device = 'cpu'
model_embedding = EmbeddingModel()
model_embedding.load_state_dict(torch.load(config["embedding_model"]))
model_embedding.to(device)
model_embedding.eval()
X = []
labels = []
#images = []
for step, ((batch, _, _), label) in enumerate(loader):
X += [*model_embedding(batch.to(device)).cpu().detach().numpy()]
labels += [*label]
for x in batch:
x -= x.min()
x /= x.max()
#images += [transforms.functional.to_pil_image(x)]
X = np.array(X)
labels = np.array(labels)
model = kNN(config["n_neighbors"])
model.fit(X, labels)
with open(b'./model_classifier.obj', 'wb') as file:
pickle.dump(model, file)
score = model.score(X, labels)
print(f'Score: {score:.4f}')

265
train-embedding.py Normal file
View File

@@ -0,0 +1,265 @@
import time
import torch
import random
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import matplotlib as mpl
from pathlib import Path
import json
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torchvision import transforms
from PIL import Image, ImageOps
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
with open('embedding_config.json') as config_file:
config = json.load(config_file)
if config["env"] == "colab":
from tqdm.notebook import tqdm
elif config["env"] == "":
from tqdm import tqdm
path_to_dataset = Path(config["path_to_dataset"])
if len(config["classes"]) == 0:
classes = sorted([x.name for x in path_to_dataset.iterdir() if x.is_dir()])
else:
classes = config["classes"]
class SquarePad:
def __call__(self, image):
_, w, h = image.size()
max_wh = max(w, h)
hp = int((max_wh - w) / 2)
vp = int((max_wh - h) / 2)
padding = (vp, hp, vp, hp)
return transforms.functional.pad(image, padding, 0, 'constant')
class Normalize01:
def __call__(self, image):
image -= image.min()
image /= image.max()
return image
class CustomDataset(Dataset):
def __init__(self, path, classes = None, augmentations=None, target_transform=None, size = (64, 64)):
self.path = path # path to directories
self.transform_aug = augmentations
self.target_transform = target_transform
self.size = size
self.classes = classes
self.paths_to_images = []
self.labels = []
for c in self.classes:
paths_to_class = list(Path(self.path, c).glob('*.jpg'))
self.labels += [c]*len(paths_to_class)
self.paths_to_images += paths_to_class
self.labels = np.array(self.labels)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
anchor_label = self.labels[idx]
positive_index_list = [i for i, x in enumerate(self.labels) if x == anchor_label]
negative_index_list = [i for i, x in enumerate(self.labels) if x != anchor_label]
positive_idx = np.random.choice(positive_index_list)
negative_idx = np.random.choice(negative_index_list)
images = []
for i in [idx, positive_idx, negative_idx]:
images += [read_image(str(self.paths_to_images[i])).float()]
transform = transforms.Compose([
SquarePad(),
transforms.Resize(self.size),
Normalize01()
])
for i, image in enumerate(images):
images[i] = transform(image)
if self.transform_aug is not None:
for i, image in enumerate(images):
images[i] = self.transform_aug(image)
return images, anchor_label
class EmbeddingModel(nn.Module):
def __init__(self, emb_dim=128):
super(EmbeddingModel, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 16, 3),
nn.BatchNorm2d(16),
nn.PReLU(),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, 3),
nn.BatchNorm2d(32),
nn.PReLU(32),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3),
nn.PReLU(),
nn.BatchNorm2d(64),
nn.MaxPool2d(2)
)
self.fc = nn.Sequential(
nn.Linear(64*6*6, 256),
nn.PReLU(),
nn.Linear(256, emb_dim)
)
def forward(self, x):
x = self.conv(x)
x = x.view(-1, 64*6*6)
x = self.fc(x)
return x
augmentations = None
if config["augmentations"]:
augmentations = transforms.Compose(
[
transforms.RandomHorizontalFlip(),
transforms.RandomAdjustSharpness(sharpness_factor=2),
transforms.RandomAutocontrast(),
transforms.ColorJitter(brightness=0.3)
]
)
batch_size = config["batch_size"]
dataset = CustomDataset(path_to_dataset, augmentations=augmentations,
classes=classes, size=(64, 64))
train_size = int(config["train_size"] * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
if torch.cuda.is_available():
print('Using GPU.')
device = 'cuda'
else:
print("CUDA not detected, using CPU.")
device = 'cpu'
embedding_dims = config["embedding_dims"]
epochs = config["epochs"]
model = EmbeddingModel(embedding_dims).to(device)
optimizer = optim.Adam(model.parameters(), lr=config["lr"])
triplet_loss = nn.TripletMarginLoss(margin=config["triplet-loss-margin"], p=config["triplet-loss-p"])
model.train()
train_loss = []
test_loss = []
epoch_all_loss = []
for epoch in range(epochs):
train_one_epoch_loss = []
test_one_epoch_loss = []
for step, ((anchor_image, positive_image, negative_image), anchor_label) in enumerate(train_loader):
anchor_image = anchor_image.to(device)
positive_image = positive_image.to(device)
negative_image = negative_image.to(device)
optimizer.zero_grad()
anchor_pred = model(anchor_image)
positive_pred = model(positive_image)
negative_pred = model(negative_image)
loss = triplet_loss(anchor_pred, positive_pred, negative_pred)
loss.backward()
optimizer.step()
loss = loss.cpu().detach().numpy()
train_one_epoch_loss += [loss]
train_loss += [loss]
for step, ((anchor_image, positive_image, negative_image), anchor_label) in enumerate(test_loader):
anchor_image = anchor_image.to(device)
positive_image = positive_image.to(device)
negative_image = negative_image.to(device)
anchor_pred = model(anchor_image)
positive_pred = model(positive_image)
negative_pred = model(negative_image)
loss = triplet_loss(anchor_pred, positive_pred, negative_pred)
loss = loss.cpu().detach().numpy()
test_one_epoch_loss += [loss]
test_loss += [loss]
print(f"Epoch: {epoch+1}/{epochs} - Training loss: {np.mean(train_one_epoch_loss):.4f} - Test loss: {np.mean(test_one_epoch_loss):.4f}")
train_loss = np.array(train_loss)
test_loss = np.array(test_loss)
q = 10
plt.plot(np.convolve(train_loss, np.ones(len(train_loss) - len(test_loss) + q)/(len(train_loss) - len(test_loss) + q), mode = 'valid'), legend = 'Train loss')
plt.plot(np.convolve(test_loss, np.ones(q)/q, mode = 'valid'), legend = 'Test loss')
plt.legend()
plt.title("Epoch loss")
plt.savefig("embedding_loss.pdf")
if config["visualize"]:
X_train = []
labels_train = []
images_train = []
for step, ((batch, _, _), label) in enumerate(train_loader):
X_train += [*model(batch.to(device)).cpu().detach().numpy()]
labels_train += [*label]
for x in batch:
x -= x.min()
x /= x.max()
images_train += [transforms.functional.to_pil_image(x)]
X_train = np.array(X_train)
labels_train = np.array(labels_train)
pca = PCA(n_components=config["pca_components"])
X_train_pca = pca.fit_transform(X_train)
tsne = TSNE(n_components=2, learning_rate = 'auto', init = 'pca')
X_train_tsne = tsne.fit_transform(X_train_pca)
cmap=plt.get_cmap('tab20')
colors = cmap(np.linspace(0, 1, len(classes)))
classes_to_colors = dict(zip(classes, colors))
plt.figure(figsize=(15, 10))
for cls in classes:
plt.scatter(*X_train_tsne[labels_train == cls].T, color = classes_to_colors[cls], label = cls, s = 1);
legend = plt.legend(fontsize=10)
for i in range(len(classes)):
legend.legendHandles[i]._sizes = [30]
plt.savefig('visualized_simple.pdf')

BIN
visualized_simple.pdf Normal file

Binary file not shown.