import torch
from tensordict import TensorDict
import os
from os import path
import pickle
from typing import Dict, Optional
from maenvs4vrp.core.env_generator_builder import InstanceBuilder
GENERATED_INSTANCES_PATH = 'mdvrptw/data/generated'
[docs]
class InstanceGenerator(InstanceBuilder):
"""
MDVRPTW instance generation class.
"""
[docs]
@classmethod
def get_list_of_benchmark_instances(cls):
"""
Get list of generated files.
Args:
n/a.
Returns:
None.
"""
base_dir = path.dirname(path.dirname(path.abspath(__file__)))
generated = os.listdir(path.join(base_dir, GENERATED_INSTANCES_PATH))
benchmark_instances = {}
for folder in generated:
val_path = path.join( GENERATED_INSTANCES_PATH, folder, 'validation')
test_path = path.join(GENERATED_INSTANCES_PATH, folder, 'test')
benchmark_instances[folder] = {'validation': [val_path + '/' + s.split('.')[0] for s in os.listdir(path.join(base_dir, val_path))],
'test':[test_path + '/' + s.split('.')[0] for s in os.listdir(path.join(base_dir, test_path))]}
return benchmark_instances
[docs]
def __init__(self,
instance_type:str='validation',
set_of_instances:set=None,
device: Optional[str] = "cpu",
batch_size: Optional[torch.Size] = None,
seed:int=None) -> None:
"""
Constructor. Instance generator.
Args:
instance_type(str): Instance type. Can be "validation" or "test". Defaults to "validation".
set_of_instances(set): Set of instances file names. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
batch_size(torch.Size, optional): Batch size. If not specified, defaults to 1.
seed(int): Random number generator seed. Defaults to None.
Returns:
None.
"""
# seed the generation process
if seed is None:
self._set_seed(self.DEFAULT_SEED)
else:
self._set_seed(seed)
self.device = device
if batch_size is None:
batch_size = [1]
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
self.max_num_agents = 5
self.max_num_nodes = 100
self.num_depots = 5
assert instance_type in ["validation", "test"], f"instance unknown type"
self.set_of_instances = set_of_instances
if set_of_instances:
self.instance_type = instance_type
self.load_set_of_instances()
[docs]
def read_instance_data(self, instance_name:str)-> Dict:
"""
Read instance data from file.
Args:
instance_name(str): instance file name.
Returns:
Dict: Instance data.
"""
base_dir = path.dirname(path.dirname(path.abspath(__file__)))
generated_file = '{path_to_generated_instances}/{instance}.pkl' \
.format(path_to_generated_instances=base_dir,
instance=instance_name)
with open(generated_file, 'rb') as fp:
instance = pickle.load(fp)
self.batch_size = instance['data'].batch_size
instance['data'] = instance['data'].to(self.device)
return instance
[docs]
def get_instance(self, instance_name:str, num_agents:int=None) -> Dict:
"""
Get an instance with custom number of agents.
Args:
instance_name(str): Instance file name.
num_agents(int): Number of agents. Defaults to None.
Returns:
Dict: Instance data.
"""
instance = self.instances_data.get(instance_name)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
instance['num_agents'] = num_agents
return instance
[docs]
def load_set_of_instances(self, set_of_instances:set=None):
"""
Load every instance on set_of_instances set.
Args:
set_of_instances(set): Set of instances file names. Defaults to None.
Returns:
None.
"""
if set_of_instances:
self.set_of_instances = set_of_instances
self.instances_data = dict()
for instance_name in self.set_of_instances:
instance = self.read_instance_data(instance_name)
self.instances_data[instance_name] = instance
[docs]
def get_time_windows(self,
instance:TensorDict=None,
batch_size:torch.Size=None,
seed:int=None)-> torch.tensor:
"""
Get time windows to reach the nodes.
Args:
instance(TensorDict): Data instance. Defaults to None.
batch_size(torch.Size): Batch size. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
Returns:
torch.Tensor: Nodes time windows.
"""
if seed is not None:
self._set_seed(seed)
time_windows = torch.zeros((*batch_size, self.max_num_nodes, 2), device=self.device)
depot_coord = instance['coords'].gather(1, instance['depot_idx'][:,:,None].expand(-1, -1, 2)).mean(dim=1).unsqueeze(1)
dist_depot = torch.pairwise_distance(depot_coord, instance['coords'], keepdim = True)
depot_start, depot_end = 0, 3
inf = depot_start + dist_depot
sup = depot_end - dist_depot - self.service_times
time_centers = inf.squeeze(-1) + torch.rand(*batch_size, self.max_num_nodes, device=self.device) * (sup-inf).squeeze(-1)
time_half_width = torch.empty((*batch_size, self.max_num_nodes), device=self.device).uniform_(self.service_times / 2 , depot_end / 3)
time_windows[:, :, 0] = torch.clip(time_centers - time_half_width, depot_start, depot_end)
time_windows[:, :, 1] = torch.clip(time_centers + time_half_width, depot_start, depot_end)
time_windows[:, self.depot_idx, 0] = depot_start
time_windows[:, self.depot_idx, 1] = depot_end
return time_windows
[docs]
def random_generate_instance(self,
num_agents:int=5,
num_nodes:int=100,
num_depots:int=5,
capacity:int=50,
service_times:int=0.2,
batch_size: Optional[torch.Size] = None,
seed:int=None)-> TensorDict:
"""
Generate random instance.
Args:
num_agents(int): Total number of agents. Defaults to 20.
num_nodes(int): Total number of nodes. Defaults to 100.
capacity(int): Total capacity for each agent. Defaults to 50.
service_times(int): Total time of service. Defaults to 0.2.
batch_size(torch.Size, optional): Batch size. Defaults to None.
seed(int, optional): Random number generator seed. Defaults to None.
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
self.max_num_agents = num_agents
if num_nodes is not None:
assert num_nodes>0, f"number of services must be grater them 0!"
self.max_num_nodes = num_nodes
if num_depots is not None:
assert num_depots>0, f"number depots must be grater them 0!"
self.num_depots = num_depots
if service_times is not None:
self.service_times = service_times
if capacity is not None:
assert capacity>0, f"agent capacity must be grater them 0!"
self.capacity = capacity
if batch_size is not None:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=self.batch_size, device=self.device)
self.depot_idx = torch.arange( self.num_depots, device=self.device).repeat((*self.batch_size, 1))
instance['depot_idx'] = self.depot_idx
coords = torch.rand(*self.batch_size, self.max_num_nodes, 2, dtype = torch.float, device=self.device)
instance['coords'] = coords
demands = torch.randint(low = 1, high=11, size = (*self.batch_size, num_nodes), dtype = torch.float, device=self.device)
demands.scatter_(1, self.depot_idx, 0)
instance['demands'] = demands
service_times = self.service_times * torch.ones((*self.batch_size, num_nodes), dtype = torch.float, device=self.device)
service_times.scatter_(1, self.depot_idx, 0)
instance['service_time'] = service_times
time_windows = self.get_time_windows(instance, self.batch_size, seed)
instance['tw_low'] = time_windows[:, :, 0].clone()
instance['tw_high'] = time_windows[:, :, 1].clone()
instance['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device)
instance['is_depot'][:, self.depot_idx] = True
instance['start_time'] = time_windows[:, :, 0].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
instance['end_time'] = time_windows[:, :, 1].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
instance['capacity'] = self.capacity * torch.ones((*self.batch_size, 1), dtype = torch.float, device=self.device)
instance_info = {'name':'random_instance',
'num_nodes': self.max_num_nodes,
'num_agents':self.max_num_agents,
'num_depots':self.num_depots,
'data':instance}
return instance_info
[docs]
def augment_generate_instance(self, num_agents:int=5,
num_nodes:int=100,
num_depots:int=5,
capacity:int=50,
service_times:int=0.2,
batch_size: Optional[torch.Size] = None,
n_augment:int = 2,
seed:int=None)-> TensorDict:
"""
Generate augmentated instance.
Args:
num_agents(int): Total number of agents. Defaults to 20.
num_nodes(int): Total number of nodes. Defaults to 100.
capacity(int): Total capacity for each agent. Defaults to 50.
service_times(int): Service time in the nodes. Defaults to 0.2.
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int): Data augmentation. Defaults to 2.
seed(int, optional): Random number generator seed. Defaults to None.
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
self.max_num_agents = num_agents
if num_nodes is not None:
assert num_nodes>0, f"number of services must be grater them 0!"
self.max_num_nodes = num_nodes
if num_depots is not None:
assert num_depots>0, f"number depots must be grater them 0!"
self.num_depots = num_depots
if service_times is not None:
self.service_times = service_times
if capacity is not None:
assert capacity>0, f"agent capacity must be grater them 0!"
self.capacity = capacity
if batch_size is not None:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
assert self.batch_size.numel()%n_augment == 0, f"batch_size must be divisible by n_augment"
s_batch_size = self.batch_size.numel() // n_augment
self.s_batch_size = torch.Size([s_batch_size])
instance_info_s = self.random_generate_instance(num_agents=num_agents,
num_nodes=num_nodes,
num_depots=num_depots,
capacity=capacity,
service_times=service_times,
batch_size = self.s_batch_size,
seed=seed)
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=self.batch_size, device=self.device)
for key in instance_info_s['data'].keys():
if len(instance_info_s['data'][key].shape) == 3:
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1, 1)
elif len(instance_info_s['data'][key].shape) == 2:
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1)
elif len(instance_info_s['data'][key].shape) == 1:
instance[key] = instance_info_s['data'][key].repeat(n_augment)
instance_info = {'name':'random_instance',
'num_nodes': self.max_num_nodes,
'num_agents':self.max_num_agents,
'num_depots':self.num_depots,
'data':instance}
return instance_info
[docs]
def sample_name_from_set(self, seed:int=None)-> str:
"""
Sample one instance from instance set.
Args:
seed(int): Random number generator seed. Defaults to None.
Returns:
str: Instance name.
"""
if seed is not None:
self._set_seed(seed)
assert len(self.set_of_instances)>0, f"set_of_instances has to have at least one instance!"
return list(self.set_of_instances)[torch.randint(0, len(self.set_of_instances), (1,)).item()]
[docs]
def sample_instance(self,
num_agents=None,
num_nodes=None,
num_depots=None,
capacity=50,
service_times=0.2,
instance_name:str=None,
sample_type:str='random',
batch_size: Optional[torch.Size] = None,
n_augment: Optional[int] = None,
seed:int=None)-> Dict:
"""
Sample one instance from instance space.
Args:
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
capacity(int): Total capacity for each agent. Defaults to 50.
service_times(int): Service time in the nodes. Defaults to 0.2.
instance_name(str): Instance name. Defaults to None.
sample_type(str): Sample type. It can be "random", "augment" or "saved". Defaults to "random".
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int, optional): Data augmentation. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
Returns:
Dict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if self.set_of_instances is None:
random_sample = True
else:
random_sample = False
if instance_name==None and random_sample==False:
instance_name = self.sample_name_from_set(seed=seed)
elif instance_name==None and random_sample==True:
instance_name = 'random_instance'
else:
instance_name = instance_name
if num_agents is None:
num_agents = 5
if num_nodes is None:
num_nodes = 100
if num_depots is None:
num_depots = 5
if capacity is None:
capacity = 50
if service_times is None:
service_times = 0.2
if batch_size is not None:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
if sample_type=='random':
instance_info = self.random_generate_instance(num_agents=num_agents,
num_nodes=num_nodes,
num_depots=num_depots,
capacity=capacity,
service_times=service_times,
batch_size = batch_size,
seed=seed)
elif sample_type=='augment':
instance_info = self.augment_generate_instance(num_agents=num_agents,
num_nodes=num_nodes,
num_depots=num_depots,
capacity=capacity,
service_times=service_times,
batch_size = batch_size,
n_augment = n_augment,
seed=seed)
elif sample_type=='saved':
instance_info = self.get_instance(instance_name, num_agents=num_agents)
return instance_info
if __name__ == '__main__':
number_instances = 64
print('starting valid/test sets generation')
# valid/test sets generation
for num_nodes, n_agent, num_depots in [(105, 5, 5), (55, 5, 5)]:
generator = InstanceGenerator(batch_size=32, seed=0)
for k in range(number_instances):
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots)
name = f'generated_val_servs_{num_nodes-1}_agents_{n_agent}_depots_{num_depots}_{k}'
instance['name'] = name
if not os.path.exists(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/validation'):
os.makedirs(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/validation')
with open(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/validation/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots)
name = f'generated_test_servs_{num_nodes-1}_agents_{n_agent}_depots_{num_depots}_{k}'
instance['name'] = name
if not os.path.exists(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/test'):
os.makedirs(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/test')
with open(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}_depots_{num_depots}/test/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
print('done')