"""
Adapted from: https://github.com/ai4co/routefinder/blob/main/routefinder/envs/mtvrp/generator.py
"""
import torch
from torch import Tensor
from tensordict import TensorDict
from typing import Optional, Union, Callable, Dict, Tuple
from maenvs4vrp.core.env_generator_builder import InstanceBuilder
from torch.distributions import Uniform
import os
import pickle
GENERATED_INSTANCES_PATH = 'gmtdvrp/data/generated'
def get_vehicle_capacity(num_loc: int) -> int:
"""
Get vehicle capacity.
Args:
num_loc(int): Number of nodes.
Returns:
capacity(int): Vehicle capacity.
"""
if num_loc > 1000:
extra_cap = 1000 // 5 + (num_loc - 1000) // 33.3
elif num_loc > 20:
extra_cap = num_loc // 5
else:
extra_cap = 0
return 30 + extra_cap
VARIANT_PRESETS = [
'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
'ovrpl', 'ovrpltw', 'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
'vrpltw', 'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw', 'vrptw'
]
VARIANT_PRESETS_UNMIXED = [
'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
'ovrpl', 'ovrpltw',
'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
'vrpltw', 'vrptw'
]
VARIANT_PRESETS_MIXED = [
'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw'
]
VARIANT_PROBS_PRESETS = { #Variant Probabilities
"all": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
"single_feat": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
"single_feat_otw": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5, "OTW": 0.5},
"cvrp": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 0.0},
"ovrp": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 0.0},
"vrpb": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 1.0},
"vrpl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 0.0},
"vrptw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 0.0},
"ovrptw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 0.0},
"ovrpb": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 1.0},
"ovrpl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 0.0},
"vrpbl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 1.0},
"vrpbtw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 1.0},
"vrpltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 0.0},
"ovrpbl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 1.0},
"ovrpbtw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 1.0},
"ovrpltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 0.0},
"vrpbltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 1.0},
"ovrpbltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 1.0},
}
[docs]
class InstanceGenerator(InstanceBuilder):
"""
GMTDVRP instance generation class.
"""
[docs]
@classmethod
def get_list_of_benchmark_instances(cls, mixed: bool = True):
"""
Get list of generated instances.
Args:
mixed(bool): If True, it gets all instances. If False, it gets only unmixed instances. Defaults to True.
Returns:
benchmark_instances(list): Generated instances.
"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
benchmark_instances = {}
generated = os.listdir(os.path.join(base_dir, GENERATED_INSTANCES_PATH))
for folder in generated:
benchmark_instances[folder] = {}
benchmark_instances[folder]['validation'] = []
benchmark_instances[folder]['test'] = []
if mixed:
for problem_type in VARIANT_PRESETS:
val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation')
test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test')
for s in os.listdir(os.path.join(base_dir, val_path)):
val = val_path + '/' + s.split('.')[0]
benchmark_instances[folder]['validation'].append(val)
for s in os.listdir(os.path.join(base_dir, test_path)):
test = test_path + '/' + s.split('.')[0]
benchmark_instances[folder]['test'].append(test)
else:
for problem_type in VARIANT_PRESETS_UNMIXED:
val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation')
test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test')
for s in os.listdir(os.path.join(base_dir, val_path)):
val = val_path + '/' + s.split('.')[0]
benchmark_instances[folder]['validation'].append(val)
for s in os.listdir(os.path.join(base_dir, test_path)):
test = test_path + '/' + s.split('.')[0]
benchmark_instances[folder]['test'].append(test)
return benchmark_instances
[docs]
def __init__(
self,
instance_type:str = 'validation',
set_of_instances:set = None,
device: Optional[str] = "cpu",
batch_size: Optional[torch.Size] = None,
seed: int = None
) -> None:
"""
Constructor. Instance generator.
Args:
instance_type(str): Instance type. Can be "validation" or "test". Defaults to "validation".
set_of_instances(set): Set of instances file names. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
batch_size(torch.Size, optional): Batch size. If not specified, defaults to 1.
seed(int): Random number generator seed. Defaults to None.
Returns:
None.
"""
if seed is None:
self._set_seed(self.DEFAULT_SEED)
else:
self._set_seed(seed)
self.device = device
if batch_size is None:
batch_size = [1]
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
assert instance_type in ['test', 'validation'] or instance_type is None or instance_type == '', f"Instance type must be 'test', 'validation', '' or None." #If None or empty, it loads both test and validation
self.set_of_instances = set_of_instances
if set_of_instances:
self.instance_type = instance_type
self.load_set_of_instances()
[docs]
def load_set_of_instances(
self,
set_of_instances:set = None
):
"""
Load every instance on set_of_instances set.
Args:
set_of_instances(set): Set of instances file names. Defaults to None.
Returns:
None.
"""
if set_of_instances:
self.set_of_instances = set_of_instances
self.instances_data = dict()
for instance_name in self.set_of_instances:
instance = self.read_instance_data(instance_name)
self.instances_data[instance_name] = instance
[docs]
def read_instance_data(self, instance_name:str):
"""
Read instance data from file.
Args:
instance_name(str): Instance file name.
Returns:
Dict: Instance data.
"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
generated_file = '{path_to_generated_instances}/{instance}.pkl' \
.format(path_to_generated_instances=base_dir,
instance=instance_name)
with open(generated_file, 'rb') as fp:
instance = pickle.load(fp)
self.batch_size = instance['data'].batch_size
instance['data'] = instance['data'].to(self.device)
return instance
[docs]
def get_instance(self, instance_name:str, num_agents:int=None) -> Dict:
"""
Get an instance with custom number of agents.
Args:
instance_name(str): Instance file name.
num_agents(int): Number of agents. Defaults to None.
Returns:
Dict: Instance data.
"""
instance = self.instances_data.get(instance_name)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
instance['num_agents'] = num_agents
return instance
[docs]
def subsample_variant(
self,
prob_open_routes: float = 0.5,
prob_time_windows: float = 0.5,
prob_limit: float = 0.5,
prob_backhaul: float = 0.5,
td: TensorDict = None,
variant_preset = None,
) -> torch.Tensor:
"""
Subsample variant. If variant_preset is specified, it loads that variant. Otherwise it samples variant's parameters across batches based on probabilities.
Args:
prob_open_routes(float): Probability of open routes. Defaults to 0.5.
prob_time_windows(float): Probability of time windows. Defaults to 0.5.
prob_limit(float): Probability of distance limits. Defaults to 0.5.
prob_backhaul(float): Probability of backhaul. Defaults to 0.5.
td(TensorDict): Environment instance tensor. Defaults to None.
variant_preset(TensorDict): Variant preset. Defaults to None.
Returns:
td(TensorDict): Environment instance tensor.
"""
td['has_open_routes'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_time_windows'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_distance_limits'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_backhauls'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
if variant_preset is not None:
variant_probs = VARIANT_PROBS_PRESETS.get(variant_preset)
assert variant_probs is not None, f"Variant preset {variant_preset} not found! \
Avaliable presets are {VARIANT_PROBS_PRESETS.keys()} with probabilities {VARIANT_PROBS_PRESETS.values()}"
else:
variant_probs = {
"O": prob_open_routes,
"TW": prob_time_windows,
"L": prob_limit,
"B": prob_backhaul
}
for key, prob in variant_probs.items():
assert 0 <= prob <= 1, f"Probability {key} must be between 0 and 1"
self.variant_probs = variant_probs
self.variant_preset = variant_preset
variant_probs = torch.Tensor(list(self.variant_probs.values())) #Convert dict into tensor
if self.use_combinations:
keep_mask = torch.rand(*self.batch_size, 4) >= variant_probs #O, TW, L, B
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
else:
if self.variant_preset in list(VARIANT_PROBS_PRESETS.keys()) and self.variant_preset not in ("all", "cvrp", "single_feat", "single_feat_otw"):
cvrp_prob = 0
else:
cvrp_prob = 0.5
if self.variant_preset in ("all", "cvrp", "single_feat", "single_feat_otw"):
indexes = torch.distributions.Categorical(
torch.Tensor(list(self.variant_probs.values()) + [cvrp_prob])[
None
].repeat(*self.batch_size, 1)
).sample()
if self.variant_preset == "single_feat_otw":
keep_mask = torch.zeros((*self.batch_size, 6), dtype=torch.bool)
keep_mask[torch.arange(*self.batch_size), indexes] = True
keep_mask[:, :2] |= keep_mask[:, 4:5]
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
td['has_open_routes'][keep_mask[:, 4]] = True
td['has_time_windows'][keep_mask[:, 4]] = True
else:
keep_mask = torch.zeros((*self.batch_size, 5), dtype=torch.bool)
keep_mask[torch.arange(*self.batch_size), indexes] = True
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
else:
keep_mask = torch.zeros((*self.batch_size, 4), dtype=torch.bool)
indexes = torch.nonzero(variant_probs).squeeze()
keep_mask[:, indexes] = True
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
td = self._default_open(td, ~keep_mask[:, 0])
td = self._default_time_windows(td, ~keep_mask[:, 1])
td = self._default_distance_limit(td, ~keep_mask[:, 2])
td = self._default_backhaul(td, ~keep_mask[:, 3])
self.keep_mask = keep_mask
return td
[docs]
def random_generate_instance(
self,
num_depots: int = None,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: float = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = None,
max_distance_limit: float = None,
speed: float = None,
initial_load: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
seed: int = None,
device: Optional[str] = "cpu"
) -> TensorDict:
"""
Generate random instance.
Args:
num_depots(int): Total number of depots. Defaults to None.
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
initial_load(float): Vehicles' initial load. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_depots is not None:
assert num_depots>0, f"Number of depots must be greater than 0!"
if num_agents is not None:
assert num_agents>0, f"Number of agents must be greater than 0!"
if num_nodes is not None:
assert num_nodes>0, f"Number of nodes must be greater than 0!"
if capacity is not None:
assert capacity>0, f"Capacity must be greater than 0!"
if service_time is not None:
assert service_time>0, f"Service times must be greater than 0!"
if max_time is not None:
assert max_time>0, f"Service times must be greater than 0!"
if max_distance_limit is not None:
assert max_distance_limit>0, f"Distance limit must be greater than 0!"
if speed is not None:
assert speed>0, f"Speed must be greater than 0!"
if backhaul_class is not None:
assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!"
if batch_size is None:
batch_size = self.batch_size
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=batch_size, device=self.device)
#Depot generation. All 0.
instance['num_agents'] = torch.full((*self.batch_size, 1), num_agents)
instance['num_depots'] = torch.full((*self.batch_size, 1), num_depots)
self.depot_idx = torch.arange(self.num_depots, device=self.device).repeat((*self.batch_size, 1))
instance['depot_idx'] = self.depot_idx
#Coords unfiform generation
coords = torch.FloatTensor(*self.batch_size, num_nodes, 2).uniform_(min_coords, max_coords) #Nodes. (x,y)
instance['coords'] = coords
#Capacity
vehicle_capacity = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32)
instance['capacity'] = vehicle_capacity
instance['original_capacity'] = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32)
#Demands
linehaul_demands, backhaul_demands = self.generate_demands(batch_size=self.batch_size, num_nodes=self.num_nodes)
linehaul_demands[:, self.depot_idx] = 0.0
backhaul_demands[:, self.depot_idx] = 0.0
instance['linehaul_demands'] = linehaul_demands
instance['backhaul_demands'] = backhaul_demands
#Backhaul Class. If sample true it's random. Otherwise it's defined in constructor.
backhaul_class = self.generate_backhaul_class(shape=(*self.batch_size, 1), sample=self.sample_backhaul_class)
instance['backhaul_class'] = backhaul_class
#Open routes
instance['open_routes'] = torch.ones(*self.batch_size, 1, dtype=torch.bool)
#Speed
instance['speed'] = torch.full((*self.batch_size, 1), self.speed, dtype=torch.float32)
#Time windows and service times
time_windows, service_time = self.generate_time_windows(coords, self.speed)
instance['time_windows'] = time_windows
instance['service_time'] = service_time
instance['tw_low'] = time_windows[:, :, 0]
instance['tw_high'] = time_windows[:, :, 1]
instance['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device)
instance['is_depot'][:, self.depot_idx] = True
#Start time and end time
instance['start_time'] = time_windows[:, :, 0].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
instance['end_time'] = time_windows[:, :, 1].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
#Distance limits
distance_limits = self.generate_distance_limit(shape=(*self.batch_size, 1), coords=coords)
instance['distance_limits'] = distance_limits
#Initial load
instance['initial_load'] = torch.full((*self.batch_size, self.num_agents_total), initial_load, dtype=torch.float32)
if self.subsample:
instance = self.subsample_variant(td=instance, variant_preset=self.variant_preset)
instance_info = {'name': 'random_instance',
'num_depots': num_depots,
'num_nodes': num_nodes,
'num_agents': num_agents,
'data': instance}
return instance_info
[docs]
def augment_generate_instance(
self,
num_depots: int = None,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: float = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = None,
max_distance_limit: float = None,
speed: float = None,
initial_load: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
n_augment:int = 2,
seed: int = None,
device: Optional[str] = "cpu"
) -> TensorDict:
"""
Generate augmented instance.
Args:
num_depots(int): Total number of depots. Defaults to None.
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
initial_load(float): Vehicles' initial load. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int): Number of augmentations. Defaults to 2.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_depots is not None:
assert num_depots>0, f"Number of depots must be greater than 0!"
if num_agents is not None:
assert num_agents>0, f"Number of agents must be greater than 0!"
if num_nodes is not None:
assert num_nodes>0, f"Number of nodes must be greater than 0!"
if capacity is not None:
assert capacity>0, f"Capacity must be greater than 0!"
if service_time is not None:
assert service_time>0, f"Service times must be greater than 0!"
if max_time is not None:
assert max_time>0, f"Service times must be greater than 0!"
if max_distance_limit is not None:
assert max_distance_limit>0, f"Distance limit must be greater than 0!"
if speed is not None:
assert max_time>0, f"Speed must be greater than 0!"
if backhaul_class is not None:
assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!"
assert self.batch_size.numel()%n_augment == 0, f"Batch size must be divisible by n_augment!"
s_batch_size = self.batch_size.numel() // n_augment #Same batch size
self.s_batch_size = torch.Size([s_batch_size])
instance_info_s = self.random_generate_instance( #Generate random instance
num_depots = num_depots,
num_agents = num_agents,
num_nodes = num_nodes,
min_coords = min_coords,
max_coords = max_coords,
capacity = capacity,
service_time = service_time,
min_demands = min_demands,
max_demands = max_demands,
min_backhaul = min_backhaul,
max_backhaul = max_backhaul,
max_time = max_time,
backhaul_ratio = backhaul_ratio,
backhaul_class = backhaul_class,
sample_backhaul_class = sample_backhaul_class,
max_distance_limit = max_distance_limit,
speed = speed,
initial_load=initial_load,
subsample = subsample,
variant_preset = variant_preset,
use_combinations = use_combinations,
batch_size = self.s_batch_size,
seed = seed,
device = device
)
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=self.batch_size, device=self.device)
for key in instance_info_s['data'].keys():
if len(instance_info_s['data'][key].shape) == 3: #3 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1, 1)
elif len(instance_info_s['data'][key].shape) == 2: #2 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1)
elif len(instance_info_s['data'][key].shape) == 1: #1 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment)
instance_info = {'name':'augmented_instance',
'num_depots': num_depots,
'num_nodes': num_nodes,
'num_agents': num_agents,
'data':instance}
return instance_info
[docs]
def sample_name_from_set(self, seed:int=None)-> str:
"""
Sample one instance from instance set.
Args:
seed(int): Random number generator seed. Defaults to None.
Returns:
str: Instance name.
"""
if seed is not None:
self._set_seed(seed)
assert len(self.set_of_instances)>0, f"set_of_instances has to have at least one instance!"
return list(self.set_of_instances)[torch.randint(0, len(self.set_of_instances), (1,)).item()]
[docs]
def sample_instance(
self,
num_depots: int = 3,
num_agents: int = 2,
num_nodes: int = 15,
min_coords: float = 0.0,
max_coords: float = 1.0,
capacity: float = None,
service_time: float = 0.2,
min_demands: int = 1,
max_demands: int = 10,
min_backhaul: int = 1,
max_backhaul: int = 10,
max_time: float = 4.6,
backhaul_ratio: float = 0.2,
backhaul_class: int = 1,
sample_backhaul_class: bool = False,
max_distance_limit: float = 2.8,
speed: float = 1.0,
initial_load: float = None,
subsample: bool = True,
variant_preset='all',
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
n_augment: Optional[int] = 2,
sample_type: str = 'random',
instance_name: str = None,
seed: int = None,
device: Optional[str] = "cpu"
):
"""
Sample one instance from instance space.
Args:
num_depots(int): Total number of depots. Defaults to 3.
num_agents(int): Total number of agents. Defaults to 2.
num_nodes(int): Total number of nodes. Defaults to 15.
min_coords(float): Minimum number of coords. Defaults to 0.0.
max_coords(float): Maximum number of coords. Defaults to 1.0.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to 0.2.
min_demands(int): Minimum number of demands. Defaults to 1.
max_demands(int): Maximum number of demands. Defaults to 10.
min_backhaul(int): Minimum number of backhauls. Defaults to 1.
max_backhaul(int): Maximum number of backhauls. Defaults to 10.
max_time(float): Maximum route time. Defaults to 4.6.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to 0.2.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to 1.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to False.
max_distance_limit(float): Route distance limits. Defaults to 2.8.
speed(float): Vehicles' speed. Defaults to 1.0.
initial_load(float): Vehicles' initial load. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to "all".
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int): Number of augmentations. Defaults to 2.
sample_type(str): Sample type. It can be "random", "augment" or "saved". Defaults to "random".
instance_name(str): Instance file path. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if batch_size is not None:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
if self.set_of_instances is None:
random_sample = True
else:
random_sample = False
if instance_name==None and random_sample==False:
instance_name = self.sample_name_from_set(seed=seed)
elif instance_name==None and random_sample==True:
instance_name = 'random_instance'
else:
instance_name = instance_name
if num_depots is None:
self.num_depots = 3
else:
self.num_depots = num_depots
if num_agents is None:
self.num_agents = 2
self.num_agents_total = self.num_agents * self.num_depots
else:
self.num_agents = num_agents
self.num_agents_total = self.num_agents * self.num_depots
if num_nodes is None:
self.num_nodes = 15
else:
self.num_nodes = num_nodes
if min_coords is None:
self.min_coords = 0.0
else:
self.min_coords = min_coords
if max_coords is None:
self.max_coords = 1.0
else:
self.max_coords = max_coords
if capacity is None:
self.capacity = get_vehicle_capacity(self.num_nodes)
else:
self.capacity = capacity
if service_time is None:
self.service_time = 0.2
else:
self.service_time = service_time
if min_demands is None:
self.min_demands = 1
else:
self.min_demands = min_demands
if max_demands is None:
self.max_demands = 10
else:
self.max_demands = max_demands
if min_backhaul is None:
self.min_backhaul = 1
else:
self.min_backhaul = min_backhaul
if max_backhaul is None:
self.max_backhaul = 10
else:
self.max_backhaul = max_backhaul
if max_time is None:
self.max_time = 4.6
else:
self.max_time = max_time
if backhaul_ratio is None:
self.backhaul_ratio = 0.2
else:
self.backhaul_ratio = backhaul_ratio
if backhaul_class is None:
self.backhaul_class = 1
else:
self.backhaul_class = backhaul_class
if sample_backhaul_class is None:
self.sample_backhaul_class = False
else:
self.sample_backhaul_class = sample_backhaul_class
if max_distance_limit is None:
self.max_distance_limit = 2.8
else:
self.max_distance_limit = max_distance_limit
if speed is None:
self.speed = 1.0
else:
self.speed = speed
if initial_load is None:
self.initial_load = self.capacity
else:
self.initial_load = initial_load
if variant_preset is None:
self.variant_preset = 'all'
else:
self.variant_preset = variant_preset
self.subsample = subsample
self.use_combinations = use_combinations
self.seed = seed
self.device = device
self.n_augment = n_augment
if variant_preset in VARIANT_PRESETS_MIXED:
self.variant_preset = variant_preset.replace('m', '', 1)
self.backhaul_class = 2
if sample_type == 'random':
instance_info = self.random_generate_instance(
num_depots = self.num_depots,
num_agents = self.num_agents,
num_nodes = self.num_nodes,
min_coords = self.min_coords,
max_coords = self.max_coords,
capacity = self.capacity,
service_time = self.service_time,
min_demands = self.min_demands,
max_demands = self.max_demands,
min_backhaul = self.min_backhaul,
max_backhaul = self.max_backhaul,
max_time = self.max_time,
backhaul_ratio = self.backhaul_ratio,
backhaul_class = self.backhaul_class,
sample_backhaul_class = self.sample_backhaul_class,
max_distance_limit = self.max_distance_limit,
speed = self.speed,
initial_load = self.initial_load,
subsample = self.subsample,
variant_preset = self.variant_preset,
use_combinations = self.use_combinations,
batch_size = self.batch_size,
seed = self.seed,
device = self.device
)
elif sample_type == 'augment':
instance_info = self.augment_generate_instance(
num_depots = self.num_depots,
num_agents = self.num_agents,
num_nodes = self.num_nodes,
min_coords = self.min_coords,
max_coords = self.max_coords,
capacity = self.capacity,
service_time = self.service_time,
min_demands = self.min_demands,
max_demands = self.max_demands,
min_backhaul = self.min_backhaul,
max_backhaul = self.max_backhaul,
max_time = self.max_time,
backhaul_ratio = self.backhaul_ratio,
backhaul_class = self.backhaul_class,
sample_backhaul_class = self.sample_backhaul_class,
max_distance_limit = self.max_distance_limit,
speed = self.speed,
initial_load = self.initial_load,
subsample = self.subsample,
variant_preset = self.variant_preset,
use_combinations = self.use_combinations,
n_augment=self.n_augment,
batch_size = self.batch_size,
seed = self.seed,
device = self.device
)
elif sample_type=='saved':
instance_info = self.get_instance(instance_name, num_agents=num_agents)
return instance_info
@staticmethod
def _default_open(td, remove):
td['open_routes'][remove] = False
return td
@staticmethod
def _default_time_windows(td, remove):
default_tw = torch.zeros_like(td['time_windows'])
default_tw[..., 1] = float('inf')
td['time_windows'][remove] = default_tw[remove]
td['service_time'][remove] = torch.zeros_like(td['service_time'][remove])
return td
@staticmethod
def _default_distance_limit(td, remove):
td['distance_limits'][remove] = float('inf')
return td
@staticmethod
def _default_backhaul(td, remove):
td['linehaul_demands'][remove] = (
td['linehaul_demands'][remove] + td['backhaul_demands'][remove]
)
td['backhaul_demands'][remove] = 0
return td
[docs]
def generate_demands(self, batch_size: int, num_nodes: int) -> torch.Tensor:
"""
Generate demands.
Args:
batch_size(int): Batch size.
num_nodes(int): Number of nodes.
Returns:
torch.Tensor: Linehaul and backhaul demands.
"""
linehaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_(
self.min_demands - 1, self.max_demands - 1
)
linehaul_demand = (linehaul_demand.int() + 1).float()
# Backhaul demand sampling
backhaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_(
self.min_backhaul - 1, self.max_backhaul - 1
)
backhaul_demand = (backhaul_demand.int() + 1).float()
is_linehaul = torch.rand(*batch_size, num_nodes) > self.backhaul_ratio
backhaul_demand = (
backhaul_demand * ~is_linehaul
) # keep only values where they are not linehauls
linehaul_demand = linehaul_demand * is_linehaul
return linehaul_demand, backhaul_demand
[docs]
def generate_backhaul_class(self, shape: Tuple[int, int], sample: bool = False):
"""
Generate backhaul class.
Args:
shape(Tuple): Tensor shape.
sample(bool): Sample backhaul class. Defaults to False.
Returns:
torch.Tensor: Linehaul and backhaul demands.
"""
if sample:
return torch.randint(1, 3, shape, dtype=torch.float32)
else:
return torch.full(shape, self.backhaul_class, dtype=torch.float32)
[docs]
def generate_distance_limit(
self, shape: Tuple[int, int], coords: torch.Tensor
) -> torch.Tensor:
"""
Generate distance limits.
Args:
shape(Tuple): Tensor shape.
coords(torch.Tensor): Nodes coordinates.
Returns:
torch.Tensor: Distance limits.
"""
max_dist = torch.max(torch.cdist(coords[:, 0:1], coords[:, 1:]).squeeze(-2), dim=1)[0]
dist_lower_bound = 2 * max_dist + 1e-6
max_distance_limit = torch.maximum(
torch.full_like(dist_lower_bound, self.max_distance_limit),
dist_lower_bound + 1e-6,
)
# We need to sample from the `distribution` module to get the same distribution with a tensor as input
return torch.distributions.Uniform(dist_lower_bound, max_distance_limit).sample()[
..., None
]
[docs]
def get_distance(self, x: Tensor, y: Tensor):
"""
Euclidean distance between two tensors of shape `[..., n, dim].
Taken from: https://github.com/ai4co/rl4co/blob/main/rl4co/utils/ops.py
Args:
x(torch.Tensor): Point x.
y(torch.Tensor): Point y.
Returns:
torch.Tensor: Distance between x and y.
"""
return (x - y).norm(p=2, dim=-1)
[docs]
def generate_time_windows(
self,
coords: torch.Tensor = None,
speed: torch.Tensor = None,
) -> torch.Tensor:
"""
Generate time windows.
Args:
coords(torch.Tensor): Nodes coordinates.
speed(torch.Tensor): Agents speed.
Returns:
torch.Tensor: Time windows and service times.
"""
batch_size, n_loc = coords.shape[0], coords.shape[1] - self.num_depots # no depot
a, b, c = 0.15, 0.18, 0.2
service_time = a + (b - a) * torch.rand(batch_size, n_loc)
tw_length = b + (c - b) * torch.rand(batch_size, n_loc)
# Expand dimensions to make them compatible for broadcasting
tensor_a = coords[:, 0 : self.num_depots] # Shape: [B, num_depots, 2]
tensor_b = coords[:, self.num_depots :] # Shape: [B, N, 2]
tensor_a_expanded = tensor_a[:, :, None, :] # Shape: [B, num_depots, 1, 2]
tensor_b_expanded = tensor_b[:, None, :, :] # Shape: [B, 1, N, 2]
d_0i = self.get_distance(
tensor_a_expanded, tensor_b_expanded
) # Shape: [B, num_depots, N]
d_0i, _ = torch.max(d_0i, dim=1) # Shape: [B, N]
# d_0i = get_distance(locs[:, 0 : 1], locs[:, self.num_depots :]) # old version with 1 depot
h_max = (self.max_time - service_time - tw_length) / d_0i * speed - 1
tw_start = (1 + (h_max - 1) * torch.rand(batch_size, n_loc)) * d_0i / speed
tw_end = tw_start + tw_length
# Depot tw is [0, max_time]
time_windows = torch.stack(
(
torch.cat(
(torch.zeros(batch_size, self.num_depots), tw_start), dim=-1
), # start
torch.cat(
(torch.full((batch_size, self.num_depots), self.max_time), tw_end),
dim=-1,
),
), # end
dim=-1,
)
# depot service time is 0
service_time = torch.cat(
(torch.zeros(batch_size, self.num_depots), service_time), dim=-1
)
return time_windows, service_time # [B, num_depots + N, 2], [B, num_depots + N]
if __name__ == "__main__":
MIXED_PROBLEMS = ["ovrpmb", "ovrpmbl", "ovrpmbltw", "ovrpmbtw",
"vrpmb", "vrpmbl", "vrpmbltw", "vrpmbtw"]
number_instances = 2
print("Starting validation/test sets generation...")
print()
for num_nodes, n_agent, num_depots in [(26, 3, 3)]:
generator = InstanceGenerator(batch_size=32, seed=0)
for problem in VARIANT_PRESETS:
for k in range(number_instances):
#If problem is mixed, sample instance with another preset and backhaul_class=2
if problem not in MIXED_PROBLEMS:
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset=problem)
else:
if problem == "ovrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpb", backhaul_class=2)
elif problem == "ovrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbl", backhaul_class=2)
elif problem == "ovrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbltw", backhaul_class=2)
elif problem == "ovrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbtw", backhaul_class=2)
elif problem == "vrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpb", backhaul_class=2)
elif problem == "vrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbl", backhaul_class=2)
elif problem == "vrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbltw", backhaul_class=2)
elif problem == "vrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbtw", backhaul_class=2)
else:
raise Exception("Error generating validation set.")
name = f'generated_val_servs_{num_nodes}_agents_{n_agent}_depots_{num_depots}_{problem}_{k}'
instance['name'] = name
print("Generating validation data...")
if not os.path.exists(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/validation'):
os.makedirs(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/validation')
print(f"Creating directory: data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/validation")
with open(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/validation/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Dumped data into: data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/validation/{name}.pkl")
#If problem is mixed, sample instance with another preset and backhaul_class=2
if problem not in MIXED_PROBLEMS:
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset=problem)
else:
if problem == "ovrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpb", backhaul_class=2)
elif problem == "ovrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbl", backhaul_class=2)
elif problem == "ovrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbltw", backhaul_class=2)
elif problem == "ovrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="ovrpbtw", backhaul_class=2)
elif problem == "vrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpb", backhaul_class=2)
elif problem == "vrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbl", backhaul_class=2)
elif problem == "vrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbltw", backhaul_class=2)
elif problem == "vrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, num_depots=num_depots, variant_preset="vrpbtw", backhaul_class=2)
else:
raise Exception("Error generating test set.")
name = f'generated_test_servs_{num_nodes}_agents_{n_agent}_depots_{num_depots}_{problem}_{k}'
instance['name'] = name
print("Generating test data...")
if not os.path.exists(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/test'):
os.makedirs(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/test')
print(f"Creating directory: data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/test")
with open(f'data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/test/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Dumped data into: data/generated/servs_{num_nodes-num_depots}_agents_{n_agent}/{problem}/test/{name}.pkl")
print('Generation completed.')