Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/changhaonan/LGMCTS-D
Browse files Browse the repository at this point in the history
  • Loading branch information
Kowndinya2000 committed Sep 11, 2023
2 parents 7bc9449 + 8622508 commit 6e811d2
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 112 deletions.
210 changes: 101 additions & 109 deletions lgmcts/algorithm/pattern_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,144 +13,136 @@
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import pytorch_lightning as pl
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint

import lgmcts.utils.misc_utils as utils
from lgmcts.algorithm.transformer import *

# -1 is the beginning, -2 is the end, -3 for unknown values
from lgmcts.components.patterns import PATTERN_DICT


def createPointDataset(seq_len):
merged_result = []
def createPatternData(rng, seq_len, sample_len=3, pattern_type="line"):
pattern_type = "line"
datas = []
img_size = (100, 100)
rng = np.random.Generator(np.random.PCG64(12345))
for i in range(seq_len):
# Define the dimensions of your 2D array
rows, cols = 100, 200

# Create a 2D array filled with zeros
true_locations = np.zeros((rows, cols))

# Define the number of true 1s you want to place
num_true_ones = 3

# Randomly select locations for the true 1s

# Place the true 1s in the array
true_locations[25][125] = 1
true_locations[25][75] = 1
true_locations[75][75] = 1

# Set the desired standard deviation
desired_std_deviation = 5.0

results = []
temp_array = np.zeros((rows, cols))
np.random.seed(None)
random_number = int(round(np.random.normal(0, desired_std_deviation)))
np.random.seed(None)
random_number_2 = int(round(np.random.normal(0, desired_std_deviation)))
random_dir = np.random.randint(0, 4)

newarray = None
if random_dir == 0:
newarray = [25-random_number, 125-random_number_2]
elif random_dir == 1:
newarray = [25-random_number, 125+random_number_2]
elif random_dir == 2:
newarray = [25+random_number, 125-random_number_2]
elif random_dir == 3:
newarray = [25+random_number, 125+random_number_2]

results.extend(newarray)

current_time = int(time.time())
np.random.seed(None)
random_number = int(round(np.random.normal(0, desired_std_deviation)))
np.random.seed(None)
random_number_2 = int(round(np.random.normal(0, desired_std_deviation)))
random_dir = np.random.randint(0, 4)
if random_dir == 0:
results.extend([25-random_number, 75-random_number_2])
elif random_dir == 1:
results.extend([25-random_number, 75+random_number_2])
elif random_dir == 2:
results.extend([25+random_number, 75-random_number_2])
elif random_dir == 3:
results.extend([25+random_number, 75+random_number_2])

current_time = int(time.time())
np.random.seed(None)
random_number = int(round(np.random.normal(0, desired_std_deviation)))
np.random.seed(None)
random_number_2 = int(round(np.random.normal(0, desired_std_deviation)))
random_dir = np.random.randint(0, 4)
if random_dir == 0:
results.extend([75-random_number, 75-random_number_2])
elif random_dir == 1:
results.extend([75-random_number, 75+random_number_2])
elif random_dir == 2:
results.extend([75+random_number, 75-random_number_2])
elif random_dir == 3:
results.extend([75+random_number, 75+random_number_2])
results.extend([0, 0, 0, 0, 0, 0])
merged_result.append(results)
return merged_result


class ReverseDataset(data.Dataset):
def __init__(self, seq_len):
prior = PATTERN_DICT[pattern_type].gen_prior(img_size=img_size, rng=rng)[0]
samples = utils.sample_distribution(prob=prior, rng=rng, n_samples=sample_len)
samples = samples.astype(np.float32)
samples[:, 0] = samples[:, 0] / float(img_size[0])
samples[:, 1] = samples[:, 1] / float(img_size[1])
# scale to [0, 1]
datas.append(samples.flatten())
return np.vstack(datas)


def pose_tokensize(pose: np.ndarray, resolution: float = 0.01):
"""Tokenize 2d pose"""
num_tokens = int(1 / resolution)
pose = np.clip(pose, 0, 1)
pose = (pose * num_tokens).astype(np.int32)
return pose[:, 0] * num_tokens + pose[:, 1]


class PatternDataset(data.Dataset):
def __init__(self, seq_len, device="cpu"):
super().__init__()
self.num_categories = 12
self.sample_len = 3
self.seq_len = seq_len
self.pattern_type = "line"
self.resolution = 0.01
self.rng = np.random.Generator(np.random.PCG64(12345))

output = torch.tensor(createPointDataset(seq_len))

self.size = len(output)
self.data = torch.tensor(output)
# create data
self.data = createPatternData(self.rng, self.seq_len, self.sample_len, self.pattern_type)
self.size = self.data.shape[0]
self.device = device

def __len__(self):
return self.size

def __getitem__(self, idx):
inp_data = self.data[idx]
labels = inp_data.clone().detach()

random_number = random.randint(0, 2)
if random_number == 0:
positions_to_replace = [2, 3, 4, 5]
for i in positions_to_replace:
inp_data[i] = -1
elif random_number == 1:
positions_to_replace = [0, 1, 4, 5]
for i in positions_to_replace:
inp_data[i] = -1
elif random_number == 2:
positions_to_replace = [0, 1, 2, 3]
for i in positions_to_replace:
inp_data[i] = -1

return inp_data.to(torch.float32), labels.to(torch.float32)


def train_reverse(**kwargs):
# Create a PyTorch Lightning trainer with the generation callback
root_dir = os.path.join(CHECKPOINT_PATH, "ReverseTask")
inp_data = inp_data.reshape(self.sample_len, 2)
labels = np.copy(inp_data)

# apply a random mask
num_masked = 1
mask_idx = self.rng.choice(self.sample_len, num_masked, replace=False)
inp_data[mask_idx] = 0

return torch.from_numpy(inp_data).to(self.device), torch.from_numpy(labels).to(self.device)


class PatternPredictor(TransformerPredictor):

def _calculate_loss(self, batch, mode="train"):
inp_data, labels = batch # inp_data: (batch_size, sample_len*2), labels: (batch_size, sample_len*2)

preds = self.forward(inp_data, add_positional_encoding=True)

# use a l2 loss
loss = F.mse_loss(preds, labels)

# calculate accuracy
batch_size = inp_data.shape[0]
preds = preds.detach().cpu().numpy().reshape(batch_size, -1)
labels = labels.detach().cpu().numpy().reshape(batch_size, -1)
acc = np.mean(np.linalg.norm(preds - labels, axis=-1) < 0.1)

# Logging
self.log(f"{mode}_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
self.log(f"{mode}_acc", acc, on_step=False, on_epoch=True, prog_bar=True)
return loss, acc

def training_step(self, batch, batch_idx):
loss, _ = self._calculate_loss(batch, mode="train")
return loss

def validation_step(self, batch, batch_idx):
_ = self._calculate_loss(batch, mode="val")

def test_step(self, batch, batch_idx):
_ = self._calculate_loss(batch, mode="test")


if __name__ == "__main__":
root_dir = "/home/robot-learning/Projects/LGMCTS-D/output/pattern_transformer"
device = "cpu"
os.makedirs(root_dir, exist_ok=True)
trainer = pl.Trainer(default_root_dir=root_dir,
callbacks=[ModelCheckpoint(save_weights_only=True, mode="max", monitor="val_acc")],
accelerator="gpu" if str(device).startswith("cuda") else "cpu",
devices=1,
max_epochs=3,
max_epochs=50,
gradient_clip_val=3)
trainer.logger._default_hp_metric = None # Optional logging argument that we don't need

model = ReversePredictor(max_iters=trainer.max_epochs*len(train_loader), **kwargs)
total_number = 10000
train_dataset, val_dataset, test_dataset = random_split(
PatternDataset(total_number, device=device), [int(total_number * 0.8), int(total_number * 0.1), int(total_number * 0.1)]
)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

model = PatternPredictor(input_dim=2,
model_dim=32,
num_heads=1,
output_dim=2,
num_layers=1,
dropout=0.0,
lr=5e-4,
max_iters=100,
warmup=50)
trainer.fit(model, train_loader, val_loader)

# Test best model on validation and test set

val_result = trainer.test(model, val_loader, verbose=False)
test_result = trainer.test(model, test_loader, verbose=False)
result = {"test_acc": test_result[0]["test_acc"], "val_acc": val_result[0]["test_acc"]}

model = model.to(device)
return model, result
7 changes: 4 additions & 3 deletions lgmcts/algorithm/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import torch.utils.data as data
import torch.optim as optim
import pytorch_lightning as pl
import math


def scaled_dot_product(q, k, v, mask=None):
Expand Down Expand Up @@ -191,12 +192,12 @@ def get_lr_factor(self, epoch):

class TransformerPredictor(pl.LightningModule):

def __init__(self, input_dim, model_dim, num_classes, num_heads, num_layers, lr, warmup, max_iters, dropout=0.0, input_dropout=0.0):
def __init__(self, input_dim, model_dim, output_dim, num_heads, num_layers, lr, warmup, max_iters, dropout=0.0, input_dropout=0.0):
"""
Inputs:
input_dim - Hidden dimensionality of the input
model_dim - Hidden dimensionality to use inside the Transformer
num_classes - Number of classes to predict per sequence element
output_dim - Output dimension
num_heads - Number of heads to use in the Multi-Head Attention blocks
num_layers - Number of encoder blocks to use.
lr - Learning rate in the optimizer
Expand Down Expand Up @@ -229,7 +230,7 @@ def _create_model(self):
nn.LayerNorm(self.hparams.model_dim),
nn.ReLU(inplace=True),
nn.Dropout(self.hparams.dropout),
nn.Linear(self.hparams.model_dim, self.hparams.num_classes)
nn.Linear(self.hparams.model_dim, self.hparams.output_dim)
)

def forward(self, x, mask=None, add_positional_encoding=True):
Expand Down

0 comments on commit 6e811d2

Please sign in to comment.