"""
Adapted from: https://github.com/ai4co/routefinder/blob/main/routefinder/envs/mtvrp/generator.py
"""
import torch
from torch import Tensor
from tensordict import TensorDict
from typing import Optional, Union, Callable, Dict, Tuple
from maenvs4vrp.core.env_generator_builder import InstanceBuilder
from torch.distributions import Uniform
import os
import pickle
GENERATED_INSTANCES_PATH = 'mtvrp/data/generated'
def get_vehicle_capacity(num_loc: int) -> int:
"""Capacity should be 30 + num_loc/5 if num_loc > 20 as described in Liu et al. 2024 (POMO-MTL).
For every N over 1000, we add 1 of capacity every 33.3 nodes to align with Ye et al. 2024 (GLOP),
i.e. 260 at 2K nodes, 350 at 5K nodes and 500 at 10K nodes.
Note that this serves as a demand scaler.
"""
if num_loc > 1000:
extra_cap = 1000 // 5 + (num_loc - 1000) // 33.3
elif num_loc > 20:
extra_cap = num_loc // 5
else:
extra_cap = 0
return 30 + extra_cap
VARIANT_PRESETS = [
'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
'ovrpl', 'ovrpltw', 'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
'vrpltw', 'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw', 'vrptw'
]
VARIANT_PRESETS_UNMIXED = [
'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
'ovrpl', 'ovrpltw',
'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
'vrpltw', 'vrptw'
]
VARIANT_PRESETS_MIXED = [
'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw'
]
VARIANT_PROBS_PRESETS = { #Variant Probabilities
"all": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
"single_feat": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
"single_feat_otw": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5, "OTW": 0.5},
"cvrp": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 0.0},
"ovrp": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 0.0},
"vrpb": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 1.0},
"vrpl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 0.0},
"vrptw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 0.0},
"ovrptw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 0.0},
"ovrpb": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 1.0},
"ovrpl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 0.0},
"vrpbl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 1.0},
"vrpbtw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 1.0},
"vrpltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 0.0},
"ovrpbl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 1.0},
"ovrpbtw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 1.0},
"ovrpltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 0.0},
"vrpbltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 1.0},
"ovrpbltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 1.0},
}
[docs]
class InstanceGenerator(InstanceBuilder):
"""
MTVRP instance generation class.
"""
[docs]
@classmethod
def get_list_of_benchmark_instances(cls, mixed: bool = True):
"""
Get list of generated instances.
Args:
mixed(bool): If True, it gets all instances. If False, it gets only unmixed instances. Defaults to True.
Returns:
benchmark_instances(list): Generated instances.
"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
benchmark_instances = {}
generated = os.listdir(os.path.join(base_dir, GENERATED_INSTANCES_PATH))
for folder in generated:
benchmark_instances[folder] = {}
benchmark_instances[folder]['validation'] = []
benchmark_instances[folder]['test'] = []
if mixed:
for problem_type in VARIANT_PRESETS:
val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation')
test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test')
for s in os.listdir(os.path.join(base_dir, val_path)):
val = val_path + '/' + s.split('.')[0]
benchmark_instances[folder]['validation'].append(val)
for s in os.listdir(os.path.join(base_dir, test_path)):
test = test_path + '/' + s.split('.')[0]
benchmark_instances[folder]['test'].append(test)
else:
for problem_type in VARIANT_PRESETS_UNMIXED:
val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation')
test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test')
for s in os.listdir(os.path.join(base_dir, val_path)):
val = val_path + '/' + s.split('.')[0]
benchmark_instances[folder]['validation'].append(val)
for s in os.listdir(os.path.join(base_dir, test_path)):
test = test_path + '/' + s.split('.')[0]
benchmark_instances[folder]['test'].append(test)
return benchmark_instances
[docs]
def __init__(
self,
instance_type:str = 'validation',
set_of_instances:set = None,
device: Optional[str] = "cpu",
batch_size: Optional[torch.Size] = None,
seed: int = None
) -> None:
"""
Constructor. Instance generator.
Args:
instance_type(str): Instance type. Can be "validation" or "test". Defaults to "validation".
set_of_instances(set): Set of instances file names. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
batch_size(torch.Size, optional): Batch size. If not specified, defaults to 1.
seed(int): Random number generator seed. Defaults to None.
Returns:
None.
"""
if seed is None:
self._set_seed(self.DEFAULT_SEED)
else:
self._set_seed(seed)
self.device = device
if batch_size is None:
batch_size = [1]
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
assert instance_type in ['test', 'validation'] or instance_type is None or instance_type == '', f"Instance type must be 'test', 'validation', '' or None." #If None or empty, it loads both test and validation
self.set_of_instances = set_of_instances
if set_of_instances:
self.instance_type = instance_type
self.load_set_of_instances()
[docs]
def load_set_of_instances(
self,
set_of_instances:set = None
):
"""
Load every instance on set_of_instances set.
Args:
set_of_instances(set): Set of instances file names. Defaults to None.
Returns:
None.
"""
if set_of_instances:
self.set_of_instances = set_of_instances
self.instances_data = dict()
for instance_name in self.set_of_instances:
instance = self.read_instance_data(instance_name)
self.instances_data[instance_name] = instance
[docs]
def read_instance_data(self, instance_name:str):
"""
Read instance data from file.
Args:
instance_name(str): Instance file name.
Returns:
Dict: Instance data.
"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
generated_file = '{path_to_generated_instances}/{instance}.pkl' \
.format(path_to_generated_instances=base_dir,
instance=instance_name)
with open(generated_file, 'rb') as fp:
instance = pickle.load(fp)
self.batch_size = instance['data'].batch_size
instance['data'] = instance['data'].to(self.device)
return instance
[docs]
def get_instance(self, instance_name:str, num_agents:int=None) -> Dict:
"""
Get an instance with custom number of agents.
Args:
instance_name(str): Instance file name.
num_agents(int): Number of agents. Defaults to None.
Returns:
Dict: Instance data.
"""
instance = self.instances_data.get(instance_name)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
instance['num_agents'] = num_agents
return instance
[docs]
def subsample_variant(
self,
prob_open_routes: float = 0.5,
prob_time_windows: float = 0.5,
prob_limit: float = 0.5,
prob_backhaul: float = 0.5,
td: TensorDict = None,
variant_preset = None,
) -> torch.Tensor:
"""
Subsample variant. If variant_preset is specified, it loads that variant. Otherwise it samples variant's parameters across batches based on probabilities.
Args:
prob_open_routes(float): Probability of open routes. Defaults to 0.5.
prob_time_windows(float): Probability of time windows. Defaults to 0.5.
prob_limit(float): Probability of distance limits. Defaults to 0.5.
prob_backhaul(float): Probability of backhaul. Defaults to 0.5.
td(TensorDict): Environment instance tensor. Defaults to None.
variant_preset(TensorDict): Variant preset. Defaults to None.
Returns:
td(TensorDict): Environment instance tensor.
"""
td['has_open_routes'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_time_windows'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_distance_limits'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
td['has_backhauls'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool)
if variant_preset is not None:
variant_probs = VARIANT_PROBS_PRESETS.get(variant_preset)
assert variant_probs is not None, f"Variant preset {variant_preset} not found! \
Avaliable presets are {VARIANT_PROBS_PRESETS.keys()} with probabilities {VARIANT_PROBS_PRESETS.values()}"
else:
variant_probs = {
"O": prob_open_routes,
"TW": prob_time_windows,
"L": prob_limit,
"B": prob_backhaul
}
for key, prob in variant_probs.items():
assert 0 <= prob <= 1, f"Probability {key} must be between 0 and 1"
self.variant_probs = variant_probs
self.variant_preset = variant_preset
variant_probs = torch.Tensor(list(self.variant_probs.values())) #Convert dict into tensor
if self.use_combinations:
keep_mask = torch.rand(*self.batch_size, 4) >= variant_probs #O, TW, L, B
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
else:
if self.variant_preset in list(VARIANT_PROBS_PRESETS.keys()) and self.variant_preset not in ("all", "cvrp", "single_feat", "single_feat_otw"):
cvrp_prob = 0
else:
cvrp_prob = 0.5
if self.variant_preset in ("all", "cvrp", "single_feat", "single_feat_otw"):
indexes = torch.distributions.Categorical(
torch.Tensor(list(self.variant_probs.values()) + [cvrp_prob])[
None
].repeat(*self.batch_size, 1)
).sample()
if self.variant_preset == "single_feat_otw":
keep_mask = torch.zeros((*self.batch_size, 6), dtype=torch.bool) #O, TW, L, B, OTW, nothing
keep_mask[torch.arange(*self.batch_size), indexes] = True
keep_mask[:, :2] |= keep_mask[:, 4:5]
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
td['has_open_routes'][keep_mask[:, 4]] = True
td['has_time_windows'][keep_mask[:, 4]] = True
else:
keep_mask = torch.zeros((*self.batch_size, 5), dtype=torch.bool) #O, TW, L, B, nothing
keep_mask[torch.arange(*self.batch_size), indexes] = True
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
else:
keep_mask = torch.zeros((*self.batch_size, 4), dtype=torch.bool)
indexes = torch.nonzero(variant_probs).squeeze()
keep_mask[:, indexes] = True
td['has_open_routes'][keep_mask[:, 0]] = True
td['has_time_windows'][keep_mask[:, 1]] = True
td['has_distance_limits'][keep_mask[:, 2]] = True
td['has_backhauls'][keep_mask[:, 3]] = True
td = self._default_open(td, ~keep_mask[:, 0])
td = self._default_time_windows(td, ~keep_mask[:, 1])
td = self._default_distance_limit(td, ~keep_mask[:, 2])
td = self._default_backhaul(td, ~keep_mask[:, 3])
self.keep_mask = keep_mask
return td
[docs]
def random_generate_instance(
self,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: float = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = None,
max_distance_limit: float = None,
speed: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
seed: int = None,
device: Optional[str] = "cpu"
) -> TensorDict:
"""
Generate random instance.
Args:
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_agents is not None:
assert num_agents>0, f"Number of agents must be greater than 0!"
if num_nodes is not None:
assert num_nodes>0, f"Number of nodes must be greater than 0!"
if capacity is not None:
assert capacity>0, f"Capacity must be greater than 0!"
if service_time is not None:
assert service_time>0, f"Service times must be greater than 0!"
if max_time is not None:
assert max_time>0, f"Service times must be greater than 0!"
if max_distance_limit is not None:
assert max_distance_limit>0, f"Distance limit must be greater than 0!"
if speed is not None:
assert speed>0, f"Speed must be greater than 0!"
if backhaul_class is not None:
assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!"
if batch_size is None:
batch_size = self.batch_size
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=batch_size, device=self.device)
instance['num_agents'] = torch.full((*self.batch_size, 1), num_agents)
self.depot_idx = 0
#Depot generation. All 0.
instance['depot_idx'] = self.depot_idx * torch.ones((*self.batch_size, 1), dtype = torch.int64, device=self.device)
#Coords unfiform generation
coords = torch.FloatTensor(*self.batch_size, num_nodes, 2).uniform_(min_coords, max_coords) #Nodes. (x,y)
instance['coords'] = coords
#Capacity
vehicle_capacity = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32)
instance['capacity'] = vehicle_capacity
instance['original_capacity'] = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32)
#Demands
linehaul_demands, backhaul_demands = self.generate_demands(batch_size=self.batch_size, num_nodes=self.num_nodes)
linehaul_demands[:, self.depot_idx] = 0.0
backhaul_demands[:, self.depot_idx] = 0.0
instance['linehaul_demands'] = linehaul_demands
instance['backhaul_demands'] = backhaul_demands
#Backhaul Class. If sample true it's random. Otherwise it's defined in constructor.
backhaul_class = self.generate_backhaul_class(shape=(*self.batch_size, 1), sample=self.sample_backhaul_class)
instance['backhaul_class'] = backhaul_class
#Open routes
instance['open_routes'] = torch.ones(*self.batch_size, 1, dtype=torch.bool)
#Speed
instance['speed'] = torch.full((*self.batch_size, 1), self.speed, dtype=torch.float32)
#Time windows and service times
time_windows, service_time = self.generate_time_windows(coords, self.speed)
instance['time_windows'] = time_windows
instance['service_time'] = service_time
instance['tw_low'] = time_windows[:, :, 0]
instance['tw_high'] = time_windows[:, :, 1]
instance['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device)
instance['is_depot'][:, self.depot_idx] = True
#Start time and end time
instance['start_time'] = time_windows[:, :, 0].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
instance['end_time'] = time_windows[:, :, 1].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
#Distance limits
distance_limits = self.generate_distance_limit(shape=(*self.batch_size, 1), coords=coords)
instance['distance_limits'] = distance_limits
if self.subsample:
instance = self.subsample_variant(td=instance, variant_preset=self.variant_preset)
instance_info = {'name': 'random_instance',
'num_nodes': num_nodes,
'num_agents': num_agents,
'data': instance}
return instance_info
[docs]
def augment_generate_instance(
self,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: float = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = None,
max_distance_limit: float = None,
speed: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
n_augment:int = 2,
seed: int = None,
device: Optional[str] = "cpu"
) -> TensorDict:
"""
Generate augmented instance.
Args:
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int): Number of augmentations. Defaults to 2.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if num_agents is not None:
assert num_agents>0, f"Number of agents must be greater than 0!"
if num_nodes is not None:
assert num_nodes>0, f"Number of nodes must be greater than 0!"
if capacity is not None:
assert capacity>0, f"Capacity must be greater than 0!"
if service_time is not None:
assert service_time>0, f"Service times must be greater than 0!"
if max_time is not None:
assert max_time>0, f"Service times must be greater than 0!"
if max_distance_limit is not None:
assert max_distance_limit>0, f"Distance limit must be greater than 0!"
if speed is not None:
assert max_time>0, f"Speed must be greater than 0!"
if backhaul_class is not None:
assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!"
assert self.batch_size.numel()%n_augment == 0, f"Batch size must be divisible by n_augment!"
s_batch_size = self.batch_size.numel() // n_augment #Same batch size
self.s_batch_size = torch.Size([s_batch_size])
instance_info_s = self.random_generate_instance( #Generate random instance
num_agents = num_agents,
num_nodes = num_nodes,
min_coords = min_coords,
max_coords = max_coords,
capacity = capacity,
service_time = service_time,
min_demands = min_demands,
max_demands = max_demands,
min_backhaul = min_backhaul,
max_backhaul = max_backhaul,
max_time = max_time,
backhaul_ratio = backhaul_ratio,
backhaul_class = backhaul_class,
sample_backhaul_class = sample_backhaul_class,
max_distance_limit = max_distance_limit,
speed = speed,
subsample = subsample,
variant_preset = variant_preset,
use_combinations = use_combinations,
batch_size = self.s_batch_size,
seed = seed,
device = device
)
self.batch_size = torch.Size(batch_size)
instance = TensorDict({}, batch_size=self.batch_size, device=self.device)
for key in instance_info_s['data'].keys():
if len(instance_info_s['data'][key].shape) == 3: #3 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1, 1)
elif len(instance_info_s['data'][key].shape) == 2: #2 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment, 1)
elif len(instance_info_s['data'][key].shape) == 1: #1 dimension tensors
instance[key] = instance_info_s['data'][key].repeat(n_augment)
instance_info = {'name':'augmented_instance',
'num_nodes': num_nodes,
'num_agents': num_agents,
'data':instance}
return instance_info
[docs]
def sample_name_from_set(self, seed:int=None)-> str:
"""
Sample one instance from instance set.
Args:
seed(int): Random number generator seed. Defaults to None.
Returns:
str: Instance name.
"""
if seed is not None:
self._set_seed(seed)
assert len(self.set_of_instances)>0, f"set_of_instances has to have at least one instance!"
return list(self.set_of_instances)[torch.randint(0, len(self.set_of_instances), (1,)).item()]
[docs]
def sample_instance(
self,
num_agents: int = 2,
num_nodes: int = 15,
min_coords: float = 0.0,
max_coords: float = 1.0,
capacity: float = None,
service_time: float = 0.2,
min_demands: int = 1,
max_demands: int = 10,
min_backhaul: int = 1,
max_backhaul: int = 10,
max_time: float = 4.6,
backhaul_ratio: float = 0.2,
backhaul_class: int = 1,
sample_backhaul_class: bool = False,
max_distance_limit: float = 2.8,
speed: float = 1.0,
subsample: bool = True,
variant_preset='all',
use_combinations: bool = False,
batch_size: Optional[torch.Size] = None,
n_augment: Optional[int] = 2,
sample_type: str = 'random',
instance_name: str = None,
seed: int = None,
device: Optional[str] = "cpu"
):
"""
Sample one instance from instance space.
Args:
num_agents(int): Total number of agents. Defaults to 2.
num_nodes(int): Total number of nodes. Defaults to 15.
min_coords(float): Minimum number of coords. Defaults to 0.0.
max_coords(float): Maximum number of coords. Defaults to 1.0.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to 0.2.
min_demands(int): Minimum number of demands. Defaults to 1.
max_demands(int): Maximum number of demands. Defaults to 10.
min_backhaul(int): Minimum number of backhauls. Defaults to 1.
max_backhaul(int): Maximum number of backhauls. Defaults to 10.
max_time(float): Maximum route time. Defaults to 4.6.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to 0.2.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to 1.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to False.
max_distance_limit(float): Route distance limits. Defaults to 2.8.
speed(float): Vehicles' speed. Defaults to 1.0.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to "all".
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
batch_size(torch.Size, optional): Batch size. Defaults to None.
n_augment(int): Number of augmentations. Defaults to 2.
sample_type(str): Sample type. It can be "random", "augment" or "saved". Defaults to "random".
instance_name(str): Instance file path. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
TensorDict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if batch_size is not None:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
if self.set_of_instances is None:
random_sample = True
else:
random_sample = False
if instance_name==None and random_sample==False:
instance_name = self.sample_name_from_set(seed=seed)
elif instance_name==None and random_sample==True:
instance_name = 'random_instance'
else:
instance_name = instance_name
if num_agents is None:
self.num_agents = 2
else:
self.num_agents = num_agents
if num_nodes is None:
self.num_nodes = 15
else:
self.num_nodes = num_nodes
if min_coords is None:
self.min_coords = 0.0
else:
self.min_coords = min_coords
if max_coords is None:
self.max_coords = 1.0
else:
self.max_coords = max_coords
if capacity is None:
self.capacity = get_vehicle_capacity(self.num_nodes)
else:
self.capacity = capacity
if service_time is None:
self.service_time = 0.2
else:
self.service_time = service_time
if min_demands is None:
self.min_demands = 1
else:
self.min_demands = min_demands
if max_demands is None:
self.max_demands = 10
else:
self.max_demands = max_demands
if min_backhaul is None:
self.min_backhaul = 1
else:
self.min_backhaul = min_backhaul
if max_backhaul is None:
self.max_backhaul = 10
else:
self.max_backhaul = max_backhaul
if max_time is None:
self.max_time = 4.6
else:
self.max_time = max_time
if backhaul_ratio is None:
self.backhaul_ratio = 0.2
else:
self.backhaul_ratio = backhaul_ratio
if backhaul_class is None:
self.backhaul_class = 1
else:
self.backhaul_class = backhaul_class
if sample_backhaul_class is None:
self.sample_backhaul_class = False
else:
self.sample_backhaul_class = sample_backhaul_class
if max_distance_limit is None:
self.max_distance_limit = 2.8
else:
self.max_distance_limit = max_distance_limit
if speed is None:
self.speed = 1.0
else:
self.speed = speed
if variant_preset is None:
self.variant_preset = 'all'
else:
self.variant_preset = variant_preset
self.subsample = subsample
self.use_combinations = use_combinations
self.seed = seed
self.device = device
self.n_augment = n_augment
if variant_preset in VARIANT_PRESETS_MIXED:
self.variant_preset = variant_preset.replace('m', '', 1)
self.backhaul_class = 2
if sample_type == 'random':
instance_info = self.random_generate_instance(
num_agents = self.num_agents,
num_nodes = self.num_nodes,
min_coords = self.min_coords,
max_coords = self.max_coords,
capacity = self.capacity,
service_time = self.service_time,
min_demands = self.min_demands,
max_demands = self.max_demands,
min_backhaul = self.min_backhaul,
max_backhaul = self.max_backhaul,
max_time = self.max_time,
backhaul_ratio = self.backhaul_ratio,
backhaul_class = self.backhaul_class,
sample_backhaul_class = self.sample_backhaul_class,
max_distance_limit = self.max_distance_limit,
speed = self.speed,
subsample = self.subsample,
variant_preset = self.variant_preset,
use_combinations = self.use_combinations,
batch_size = self.batch_size,
seed = self.seed,
device = self.device
)
elif sample_type == 'augment':
instance_info = self.augment_generate_instance(
num_agents = self.num_agents,
num_nodes = self.num_nodes,
min_coords = self.min_coords,
max_coords = self.max_coords,
capacity = self.capacity,
service_time = self.service_time,
min_demands = self.min_demands,
max_demands = self.max_demands,
min_backhaul = self.min_backhaul,
max_backhaul = self.max_backhaul,
max_time = self.max_time,
backhaul_ratio = self.backhaul_ratio,
backhaul_class = self.backhaul_class,
sample_backhaul_class = self.sample_backhaul_class,
max_distance_limit = self.max_distance_limit,
speed = self.speed,
subsample = self.subsample,
variant_preset = self.variant_preset,
use_combinations = self.use_combinations,
n_augment=self.n_augment,
batch_size = self.batch_size,
seed = self.seed,
device = self.device
)
elif sample_type=='saved':
instance_info = self.get_instance(instance_name, num_agents=num_agents)
return instance_info
@staticmethod
def _default_open(td, remove):
td['open_routes'][remove] = False
return td
@staticmethod
def _default_time_windows(td, remove):
default_tw = torch.zeros_like(td['time_windows'])
default_tw[..., 1] = float('inf')
td['time_windows'][remove] = default_tw[remove]
td['service_time'][remove] = torch.zeros_like(td['service_time'][remove])
return td
@staticmethod
def _default_distance_limit(td, remove):
td['distance_limits'][remove] = float('inf')
return td
@staticmethod
def _default_backhaul(td, remove):
td['linehaul_demands'][remove] = (
td['linehaul_demands'][remove] + td['backhaul_demands'][remove]
)
td['backhaul_demands'][remove] = 0
return td
[docs]
def generate_demands(self, batch_size: int, num_nodes: int) -> torch.Tensor:
"""
Generate demands.
Args:
batch_size(int): Batch size.
num_nodes(int): Number of nodes.
Returns:
torch.Tensor: Linehaul and backhaul demands.
"""
linehaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_(
self.min_demands - 1, self.max_demands - 1
)
linehaul_demand = (linehaul_demand.int() + 1).float()
# Backhaul demand sampling
backhaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_(
self.min_backhaul - 1, self.max_backhaul - 1
)
backhaul_demand = (backhaul_demand.int() + 1).float()
is_linehaul = torch.rand(*batch_size, num_nodes) > self.backhaul_ratio
backhaul_demand = (
backhaul_demand * ~is_linehaul
) # keep only values where they are not linehauls
linehaul_demand = linehaul_demand * is_linehaul
return linehaul_demand, backhaul_demand
[docs]
def generate_backhaul_class(self, shape: Tuple[int, int], sample: bool = False):
"""
Generate backhaul class.
Args:
shape(Tuple): Tensor shape.
sample(bool): Sample backhaul class. Defaults to False.
Returns:
torch.Tensor: Linehaul and backhaul demands.
"""
if sample:
return torch.randint(1, 3, shape, dtype=torch.float32)
else:
return torch.full(shape, self.backhaul_class, dtype=torch.float32)
[docs]
def generate_distance_limit(
self, shape: Tuple[int, int], coords: torch.Tensor
) -> torch.Tensor:
"""
Generate distance limits.
Args:
shape(Tuple): Tensor shape.
coords(torch.Tensor): Nodes coordinates.
Returns:
torch.Tensor: Distance limits.
"""
max_dist = torch.max(torch.cdist(coords[:, 0:1], coords[:, 1:]).squeeze(-2), dim=1)[0]
dist_lower_bound = 2 * max_dist + 1e-6
max_distance_limit = torch.maximum(
torch.full_like(dist_lower_bound, self.max_distance_limit),
dist_lower_bound + 1e-6,
)
# We need to sample from the `distribution` module to get the same distribution with a tensor as input
return torch.distributions.Uniform(dist_lower_bound, max_distance_limit).sample()[
..., None
]
[docs]
def get_distance(self, x: Tensor, y: Tensor):
"""
Euclidean distance between two tensors of shape `[..., n, dim].
Taken from: https://github.com/ai4co/rl4co/blob/main/rl4co/utils/ops.py
Args:
x(torch.Tensor): Point x.
y(torch.Tensor): Point y.
Returns:
torch.Tensor: Distance between x and y.
"""
return (x - y).norm(p=2, dim=-1)
[docs]
def generate_time_windows(
self,
coords: torch.Tensor = None,
speed: torch.Tensor = None,
) -> torch.Tensor:
"""
Generate time windows.
Args:
coords(torch.Tensor): Nodes coordinates.
speed(torch.Tensor): Agents speed.
Returns:
torch.Tensor: Time windows and service times.
"""
batch_size, n_loc = coords.shape[0], coords.shape[1] - 1 # no depot
a, b, c = 0.15, 0.18, 0.2
service_time = a + (b - a) * torch.rand(batch_size, n_loc)
tw_length = b + (c - b) * torch.rand(batch_size, n_loc)
d_0i = self.get_distance(coords[:, 0:1], coords[:, 1:])
h_max = (self.max_time - service_time - tw_length) / d_0i * speed - 1
tw_start = (1 + (h_max - 1) * torch.rand(batch_size, n_loc)) * d_0i / speed
tw_end = tw_start + tw_length
# Depot tw is 0, max_time
time_windows = torch.stack(
(
torch.cat((torch.zeros(batch_size, 1), tw_start), -1), # start
torch.cat((torch.full((batch_size, 1), self.max_time), tw_end), -1),
), # en
dim=-1,
)
# depot service time is 0
service_time = torch.cat((torch.zeros(batch_size, 1), service_time), dim=-1)
return time_windows, service_time # [B, N+1, 2], [B, N+1]
if __name__ == "__main__":
MIXED_PROBLEMS = ["ovrpmb", "ovrpmbl", "ovrpmbltw", "ovrpmbtw",
"vrpmb", "vrpmbl", "vrpmbltw", "vrpmbtw"]
number_instances = 2
print("Starting validation/test sets generation...")
print()
for num_nodes, n_agent in [(101, 25), (51, 25)]:
generator = InstanceGenerator(batch_size=32, seed=0)
for problem in VARIANT_PRESETS:
for k in range(number_instances):
#If problem is mixed, sample instance with another preset and backhaul_class=2
if problem not in MIXED_PROBLEMS:
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset=problem)
else:
if problem == "ovrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpb", backhaul_class=2)
elif problem == "ovrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbl", backhaul_class=2)
elif problem == "ovrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbltw", backhaul_class=2)
elif problem == "ovrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbtw", backhaul_class=2)
elif problem == "vrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpb", backhaul_class=2)
elif problem == "vrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbl", backhaul_class=2)
elif problem == "vrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbltw", backhaul_class=2)
elif problem == "vrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbtw", backhaul_class=2)
else:
raise Exception("Error generating validation set.")
name = f'generated_val_servs_{num_nodes-1}_agents_{n_agent}_{problem}_{k}'
instance['name'] = name
print("Generating validation data...")
if not os.path.exists(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation'):
os.makedirs(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation')
print(f"Creating directory: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation")
with open(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Dumped data into: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation/{name}.pkl")
#If problem is mixed, sample instance with another preset and backhaul_class=2
if problem not in MIXED_PROBLEMS:
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset=problem)
else:
if problem == "ovrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpb", backhaul_class=2)
elif problem == "ovrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbl", backhaul_class=2)
elif problem == "ovrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbltw", backhaul_class=2)
elif problem == "ovrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbtw", backhaul_class=2)
elif problem == "vrpmb":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpb", backhaul_class=2)
elif problem == "vrpmbl":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbl", backhaul_class=2)
elif problem == "vrpmbltw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbltw", backhaul_class=2)
elif problem == "vrpmbtw":
instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbtw", backhaul_class=2)
else:
raise Exception("Error generating test set.")
name = f'generated_test_servs_{num_nodes-1}_agents_{n_agent}_{problem}_{k}'
instance['name'] = name
print("Generating test data...")
if not os.path.exists(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test'):
os.makedirs(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test')
print(f"Creating directory: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test")
with open(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test/'+name+'.pkl', 'wb') as fp:
pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Dumped data into: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test/{name}.pkl")
print('Generation completed.')