Source code for maenvs4vrp.environments.mtvrp.instances_generator

"""
Adapted from: https://github.com/ai4co/routefinder/blob/main/routefinder/envs/mtvrp/generator.py 
"""


import torch
from torch import Tensor
from tensordict import TensorDict

from typing import Optional, Union, Callable, Dict, Tuple

from maenvs4vrp.core.env_generator_builder import InstanceBuilder

from torch.distributions import Uniform

import os

import pickle

GENERATED_INSTANCES_PATH = 'mtvrp/data/generated'

def get_vehicle_capacity(num_loc: int) -> int:
    """Capacity should be 30 + num_loc/5 if num_loc > 20 as described in Liu et al. 2024 (POMO-MTL).
    For every N over 1000, we add 1 of capacity every 33.3 nodes to align with Ye et al. 2024 (GLOP),
    i.e. 260 at 2K nodes, 350 at 5K nodes and 500 at 10K nodes.
    Note that this serves as a demand scaler.
    """
    if num_loc > 1000:
        extra_cap = 1000 // 5 + (num_loc - 1000) // 33.3
    elif num_loc > 20:
        extra_cap = num_loc // 5
    else:
        extra_cap = 0
    return 30 + extra_cap

VARIANT_PRESETS = [
    'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
    'ovrpl', 'ovrpltw', 'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
    'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
    'vrpltw', 'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw', 'vrptw'
    ]

VARIANT_PRESETS_UNMIXED = [
    'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
    'ovrpl', 'ovrpltw',
    'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
    'vrpltw', 'vrptw'
    ]

VARIANT_PRESETS_MIXED = [
    'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
    'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw'
]

VARIANT_PROBS_PRESETS = { #Variant Probabilities
        "all": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
        "single_feat": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5},
        "single_feat_otw": {"O": 0.5, "TW": 0.5, "L": 0.5, "B": 0.5, "OTW": 0.5},
        "cvrp": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 0.0},
        "ovrp": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 0.0},
        "vrpb": {"O": 0.0, "TW": 0.0, "L": 0.0, "B": 1.0},
        "vrpl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 0.0},
        "vrptw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 0.0},
        "ovrptw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 0.0},
        "ovrpb": {"O": 1.0, "TW": 0.0, "L": 0.0, "B": 1.0},
        "ovrpl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 0.0},
        "vrpbl": {"O": 0.0, "TW": 0.0, "L": 1.0, "B": 1.0},
        "vrpbtw": {"O": 0.0, "TW": 1.0, "L": 0.0, "B": 1.0},
        "vrpltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 0.0},
        "ovrpbl": {"O": 1.0, "TW": 0.0, "L": 1.0, "B": 1.0},
        "ovrpbtw": {"O": 1.0, "TW": 1.0, "L": 0.0, "B": 1.0},
        "ovrpltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 0.0},
        "vrpbltw": {"O": 0.0, "TW": 1.0, "L": 1.0, "B": 1.0},
        "ovrpbltw": {"O": 1.0, "TW": 1.0, "L": 1.0, "B": 1.0},
    }

[docs] class InstanceGenerator(InstanceBuilder): """ MTVRP instance generation class. """
[docs] @classmethod def get_list_of_benchmark_instances(cls, mixed: bool = True): """ Get list of generated instances. Args: mixed(bool): If True, it gets all instances. If False, it gets only unmixed instances. Defaults to True. Returns: benchmark_instances(list): Generated instances. """ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) benchmark_instances = {} generated = os.listdir(os.path.join(base_dir, GENERATED_INSTANCES_PATH)) for folder in generated: benchmark_instances[folder] = {} benchmark_instances[folder]['validation'] = [] benchmark_instances[folder]['test'] = [] if mixed: for problem_type in VARIANT_PRESETS: val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation') test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test') for s in os.listdir(os.path.join(base_dir, val_path)): val = val_path + '/' + s.split('.')[0] benchmark_instances[folder]['validation'].append(val) for s in os.listdir(os.path.join(base_dir, test_path)): test = test_path + '/' + s.split('.')[0] benchmark_instances[folder]['test'].append(test) else: for problem_type in VARIANT_PRESETS_UNMIXED: val_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'validation') test_path = os.path.join(GENERATED_INSTANCES_PATH, folder, problem_type, 'test') for s in os.listdir(os.path.join(base_dir, val_path)): val = val_path + '/' + s.split('.')[0] benchmark_instances[folder]['validation'].append(val) for s in os.listdir(os.path.join(base_dir, test_path)): test = test_path + '/' + s.split('.')[0] benchmark_instances[folder]['test'].append(test) return benchmark_instances
[docs] def __init__( self, instance_type:str = 'validation', set_of_instances:set = None, device: Optional[str] = "cpu", batch_size: Optional[torch.Size] = None, seed: int = None ) -> None: """ Constructor. Instance generator. Args: instance_type(str): Instance type. Can be "validation" or "test". Defaults to "validation". set_of_instances(set): Set of instances file names. Defaults to None. device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu". batch_size(torch.Size, optional): Batch size. If not specified, defaults to 1. seed(int): Random number generator seed. Defaults to None. Returns: None. """ if seed is None: self._set_seed(self.DEFAULT_SEED) else: self._set_seed(seed) self.device = device if batch_size is None: batch_size = [1] else: batch_size = [batch_size] if isinstance(batch_size, int) else batch_size self.batch_size = torch.Size(batch_size) assert instance_type in ['test', 'validation'] or instance_type is None or instance_type == '', f"Instance type must be 'test', 'validation', '' or None." #If None or empty, it loads both test and validation self.set_of_instances = set_of_instances if set_of_instances: self.instance_type = instance_type self.load_set_of_instances()
[docs] def load_set_of_instances( self, set_of_instances:set = None ): """ Load every instance on set_of_instances set. Args: set_of_instances(set): Set of instances file names. Defaults to None. Returns: None. """ if set_of_instances: self.set_of_instances = set_of_instances self.instances_data = dict() for instance_name in self.set_of_instances: instance = self.read_instance_data(instance_name) self.instances_data[instance_name] = instance
[docs] def read_instance_data(self, instance_name:str): """ Read instance data from file. Args: instance_name(str): Instance file name. Returns: Dict: Instance data. """ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) generated_file = '{path_to_generated_instances}/{instance}.pkl' \ .format(path_to_generated_instances=base_dir, instance=instance_name) with open(generated_file, 'rb') as fp: instance = pickle.load(fp) self.batch_size = instance['data'].batch_size instance['data'] = instance['data'].to(self.device) return instance
[docs] def get_instance(self, instance_name:str, num_agents:int=None) -> Dict: """ Get an instance with custom number of agents. Args: instance_name(str): Instance file name. num_agents(int): Number of agents. Defaults to None. Returns: Dict: Instance data. """ instance = self.instances_data.get(instance_name) if num_agents is not None: assert num_agents>0, f"number of agents must be grater them 0!" instance['num_agents'] = num_agents return instance
[docs] def subsample_variant( self, prob_open_routes: float = 0.5, prob_time_windows: float = 0.5, prob_limit: float = 0.5, prob_backhaul: float = 0.5, td: TensorDict = None, variant_preset = None, ) -> torch.Tensor: """ Subsample variant. If variant_preset is specified, it loads that variant. Otherwise it samples variant's parameters across batches based on probabilities. Args: prob_open_routes(float): Probability of open routes. Defaults to 0.5. prob_time_windows(float): Probability of time windows. Defaults to 0.5. prob_limit(float): Probability of distance limits. Defaults to 0.5. prob_backhaul(float): Probability of backhaul. Defaults to 0.5. td(TensorDict): Environment instance tensor. Defaults to None. variant_preset(TensorDict): Variant preset. Defaults to None. Returns: td(TensorDict): Environment instance tensor. """ td['has_open_routes'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool) td['has_time_windows'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool) td['has_distance_limits'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool) td['has_backhauls'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool) if variant_preset is not None: variant_probs = VARIANT_PROBS_PRESETS.get(variant_preset) assert variant_probs is not None, f"Variant preset {variant_preset} not found! \ Avaliable presets are {VARIANT_PROBS_PRESETS.keys()} with probabilities {VARIANT_PROBS_PRESETS.values()}" else: variant_probs = { "O": prob_open_routes, "TW": prob_time_windows, "L": prob_limit, "B": prob_backhaul } for key, prob in variant_probs.items(): assert 0 <= prob <= 1, f"Probability {key} must be between 0 and 1" self.variant_probs = variant_probs self.variant_preset = variant_preset variant_probs = torch.Tensor(list(self.variant_probs.values())) #Convert dict into tensor if self.use_combinations: keep_mask = torch.rand(*self.batch_size, 4) >= variant_probs #O, TW, L, B td['has_open_routes'][keep_mask[:, 0]] = True td['has_time_windows'][keep_mask[:, 1]] = True td['has_distance_limits'][keep_mask[:, 2]] = True td['has_backhauls'][keep_mask[:, 3]] = True else: if self.variant_preset in list(VARIANT_PROBS_PRESETS.keys()) and self.variant_preset not in ("all", "cvrp", "single_feat", "single_feat_otw"): cvrp_prob = 0 else: cvrp_prob = 0.5 if self.variant_preset in ("all", "cvrp", "single_feat", "single_feat_otw"): indexes = torch.distributions.Categorical( torch.Tensor(list(self.variant_probs.values()) + [cvrp_prob])[ None ].repeat(*self.batch_size, 1) ).sample() if self.variant_preset == "single_feat_otw": keep_mask = torch.zeros((*self.batch_size, 6), dtype=torch.bool) #O, TW, L, B, OTW, nothing keep_mask[torch.arange(*self.batch_size), indexes] = True keep_mask[:, :2] |= keep_mask[:, 4:5] td['has_open_routes'][keep_mask[:, 0]] = True td['has_time_windows'][keep_mask[:, 1]] = True td['has_distance_limits'][keep_mask[:, 2]] = True td['has_backhauls'][keep_mask[:, 3]] = True td['has_open_routes'][keep_mask[:, 4]] = True td['has_time_windows'][keep_mask[:, 4]] = True else: keep_mask = torch.zeros((*self.batch_size, 5), dtype=torch.bool) #O, TW, L, B, nothing keep_mask[torch.arange(*self.batch_size), indexes] = True td['has_open_routes'][keep_mask[:, 0]] = True td['has_time_windows'][keep_mask[:, 1]] = True td['has_distance_limits'][keep_mask[:, 2]] = True td['has_backhauls'][keep_mask[:, 3]] = True else: keep_mask = torch.zeros((*self.batch_size, 4), dtype=torch.bool) indexes = torch.nonzero(variant_probs).squeeze() keep_mask[:, indexes] = True td['has_open_routes'][keep_mask[:, 0]] = True td['has_time_windows'][keep_mask[:, 1]] = True td['has_distance_limits'][keep_mask[:, 2]] = True td['has_backhauls'][keep_mask[:, 3]] = True td = self._default_open(td, ~keep_mask[:, 0]) td = self._default_time_windows(td, ~keep_mask[:, 1]) td = self._default_distance_limit(td, ~keep_mask[:, 2]) td = self._default_backhaul(td, ~keep_mask[:, 3]) self.keep_mask = keep_mask return td
[docs] def random_generate_instance( self, num_agents: int = None, num_nodes: int = None, min_coords: float = None, max_coords: float = None, capacity: float = None, service_time: float = None, min_demands: int = None, max_demands: int = None, min_backhaul: int = None, max_backhaul: int = None, max_time: float = None, backhaul_ratio: float = None, backhaul_class: int = None, sample_backhaul_class: bool = None, max_distance_limit: float = None, speed: float = None, subsample: bool = True, variant_preset=None, use_combinations: bool = False, batch_size: Optional[torch.Size] = None, seed: int = None, device: Optional[str] = "cpu" ) -> TensorDict: """ Generate random instance. Args: num_agents(int): Total number of agents. Defaults to None. num_nodes(int): Total number of nodes. Defaults to None. min_coords(float): Minimum number of coords. Defaults to None. max_coords(float): Maximum number of coords. Defaults to None. capacity(int): Vehicles' capacity. Defaults to None. service_time(float): Service time. Defaults to None. min_demands(int): Minimum number of demands. Defaults to None. max_demands(int): Maximum number of demands. Defaults to None. min_backhaul(int): Minimum number of backhauls. Defaults to None. max_backhaul(int): Maximum number of backhauls. Defaults to None. max_time(float): Maximum route time. Defaults to None. backhaul_ratio(float): Ratio of backhaul demands. Defaults to None. backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None. sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None. max_distance_limit(float): Route distance limits. Defaults to None. speed(float): Vehicles' speed. Defaults to None. subsample(bool): If problem variants are to be sampled. Defaults to True. variant_preset(str): Variant preset to be sampled. Defaults to None. use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False. force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True. batch_size(torch.Size, optional): Batch size. Defaults to None. seed(int): Random number generator seed. Defaults to None. device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu". Returns: TensorDict: Instance data. """ if seed is not None: self._set_seed(seed) if num_agents is not None: assert num_agents>0, f"Number of agents must be greater than 0!" if num_nodes is not None: assert num_nodes>0, f"Number of nodes must be greater than 0!" if capacity is not None: assert capacity>0, f"Capacity must be greater than 0!" if service_time is not None: assert service_time>0, f"Service times must be greater than 0!" if max_time is not None: assert max_time>0, f"Service times must be greater than 0!" if max_distance_limit is not None: assert max_distance_limit>0, f"Distance limit must be greater than 0!" if speed is not None: assert speed>0, f"Speed must be greater than 0!" if backhaul_class is not None: assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!" if batch_size is None: batch_size = self.batch_size else: batch_size = [batch_size] if isinstance(batch_size, int) else batch_size self.batch_size = torch.Size(batch_size) instance = TensorDict({}, batch_size=batch_size, device=self.device) instance['num_agents'] = torch.full((*self.batch_size, 1), num_agents) self.depot_idx = 0 #Depot generation. All 0. instance['depot_idx'] = self.depot_idx * torch.ones((*self.batch_size, 1), dtype = torch.int64, device=self.device) #Coords unfiform generation coords = torch.FloatTensor(*self.batch_size, num_nodes, 2).uniform_(min_coords, max_coords) #Nodes. (x,y) instance['coords'] = coords #Capacity vehicle_capacity = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32) instance['capacity'] = vehicle_capacity instance['original_capacity'] = torch.full((*self.batch_size, 1), self.capacity, dtype=torch.float32) #Demands linehaul_demands, backhaul_demands = self.generate_demands(batch_size=self.batch_size, num_nodes=self.num_nodes) linehaul_demands[:, self.depot_idx] = 0.0 backhaul_demands[:, self.depot_idx] = 0.0 instance['linehaul_demands'] = linehaul_demands instance['backhaul_demands'] = backhaul_demands #Backhaul Class. If sample true it's random. Otherwise it's defined in constructor. backhaul_class = self.generate_backhaul_class(shape=(*self.batch_size, 1), sample=self.sample_backhaul_class) instance['backhaul_class'] = backhaul_class #Open routes instance['open_routes'] = torch.ones(*self.batch_size, 1, dtype=torch.bool) #Speed instance['speed'] = torch.full((*self.batch_size, 1), self.speed, dtype=torch.float32) #Time windows and service times time_windows, service_time = self.generate_time_windows(coords, self.speed) instance['time_windows'] = time_windows instance['service_time'] = service_time instance['tw_low'] = time_windows[:, :, 0] instance['tw_high'] = time_windows[:, :, 1] instance['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device) instance['is_depot'][:, self.depot_idx] = True #Start time and end time instance['start_time'] = time_windows[:, :, 0].gather(1, torch.zeros((*self.batch_size, 1), dtype=torch.int64, device=self.device)).squeeze(-1) instance['end_time'] = time_windows[:, :, 1].gather(1, torch.zeros((*self.batch_size, 1), dtype=torch.int64, device=self.device)).squeeze(-1) #Distance limits distance_limits = self.generate_distance_limit(shape=(*self.batch_size, 1), coords=coords) instance['distance_limits'] = distance_limits if self.subsample: instance = self.subsample_variant(td=instance, variant_preset=self.variant_preset) instance_info = {'name': 'random_instance', 'num_nodes': num_nodes, 'num_agents': num_agents, 'data': instance} return instance_info
[docs] def augment_generate_instance( self, num_agents: int = None, num_nodes: int = None, min_coords: float = None, max_coords: float = None, capacity: float = None, service_time: float = None, min_demands: int = None, max_demands: int = None, min_backhaul: int = None, max_backhaul: int = None, max_time: float = None, backhaul_ratio: float = None, backhaul_class: int = None, sample_backhaul_class: bool = None, max_distance_limit: float = None, speed: float = None, subsample: bool = True, variant_preset=None, use_combinations: bool = False, batch_size: Optional[torch.Size] = None, n_augment:int = 2, seed: int = None, device: Optional[str] = "cpu" ) -> TensorDict: """ Generate augmented instance. Args: num_agents(int): Total number of agents. Defaults to None. num_nodes(int): Total number of nodes. Defaults to None. min_coords(float): Minimum number of coords. Defaults to None. max_coords(float): Maximum number of coords. Defaults to None. capacity(int): Vehicles' capacity. Defaults to None. service_time(float): Service time. Defaults to None. min_demands(int): Minimum number of demands. Defaults to None. max_demands(int): Maximum number of demands. Defaults to None. min_backhaul(int): Minimum number of backhauls. Defaults to None. max_backhaul(int): Maximum number of backhauls. Defaults to None. max_time(float): Maximum route time. Defaults to None. backhaul_ratio(float): Ratio of backhaul demands. Defaults to None. backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None. sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to None. max_distance_limit(float): Route distance limits. Defaults to None. speed(float): Vehicles' speed. Defaults to None. subsample(bool): If problem variants are to be sampled. Defaults to True. variant_preset(str): Variant preset to be sampled. Defaults to None. use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False. force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True. batch_size(torch.Size, optional): Batch size. Defaults to None. n_augment(int): Number of augmentations. Defaults to 2. seed(int): Random number generator seed. Defaults to None. device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu". Returns: TensorDict: Instance data. """ if seed is not None: self._set_seed(seed) if num_agents is not None: assert num_agents>0, f"Number of agents must be greater than 0!" if num_nodes is not None: assert num_nodes>0, f"Number of nodes must be greater than 0!" if capacity is not None: assert capacity>0, f"Capacity must be greater than 0!" if service_time is not None: assert service_time>0, f"Service times must be greater than 0!" if max_time is not None: assert max_time>0, f"Service times must be greater than 0!" if max_distance_limit is not None: assert max_distance_limit>0, f"Distance limit must be greater than 0!" if speed is not None: assert max_time>0, f"Speed must be greater than 0!" if backhaul_class is not None: assert backhaul_class in (1, 2), f"Backhaul class must be in [1, 2]!" assert self.batch_size.numel()%n_augment == 0, f"Batch size must be divisible by n_augment!" s_batch_size = self.batch_size.numel() // n_augment #Same batch size self.s_batch_size = torch.Size([s_batch_size]) instance_info_s = self.random_generate_instance( #Generate random instance num_agents = num_agents, num_nodes = num_nodes, min_coords = min_coords, max_coords = max_coords, capacity = capacity, service_time = service_time, min_demands = min_demands, max_demands = max_demands, min_backhaul = min_backhaul, max_backhaul = max_backhaul, max_time = max_time, backhaul_ratio = backhaul_ratio, backhaul_class = backhaul_class, sample_backhaul_class = sample_backhaul_class, max_distance_limit = max_distance_limit, speed = speed, subsample = subsample, variant_preset = variant_preset, use_combinations = use_combinations, batch_size = self.s_batch_size, seed = seed, device = device ) self.batch_size = torch.Size(batch_size) instance = TensorDict({}, batch_size=self.batch_size, device=self.device) for key in instance_info_s['data'].keys(): if len(instance_info_s['data'][key].shape) == 3: #3 dimension tensors instance[key] = instance_info_s['data'][key].repeat(n_augment, 1, 1) elif len(instance_info_s['data'][key].shape) == 2: #2 dimension tensors instance[key] = instance_info_s['data'][key].repeat(n_augment, 1) elif len(instance_info_s['data'][key].shape) == 1: #1 dimension tensors instance[key] = instance_info_s['data'][key].repeat(n_augment) instance_info = {'name':'augmented_instance', 'num_nodes': num_nodes, 'num_agents': num_agents, 'data':instance} return instance_info
[docs] def sample_name_from_set(self, seed:int=None)-> str: """ Sample one instance from instance set. Args: seed(int): Random number generator seed. Defaults to None. Returns: str: Instance name. """ if seed is not None: self._set_seed(seed) assert len(self.set_of_instances)>0, f"set_of_instances has to have at least one instance!" return list(self.set_of_instances)[torch.randint(0, len(self.set_of_instances), (1,)).item()]
[docs] def sample_instance( self, num_agents: int = 2, num_nodes: int = 15, min_coords: float = 0.0, max_coords: float = 1.0, capacity: float = None, service_time: float = 0.2, min_demands: int = 1, max_demands: int = 10, min_backhaul: int = 1, max_backhaul: int = 10, max_time: float = 4.6, backhaul_ratio: float = 0.2, backhaul_class: int = 1, sample_backhaul_class: bool = False, max_distance_limit: float = 2.8, speed: float = 1.0, subsample: bool = True, variant_preset='all', use_combinations: bool = False, batch_size: Optional[torch.Size] = None, n_augment: Optional[int] = 2, sample_type: str = 'random', instance_name: str = None, seed: int = None, device: Optional[str] = "cpu" ): """ Sample one instance from instance space. Args: num_agents(int): Total number of agents. Defaults to 2. num_nodes(int): Total number of nodes. Defaults to 15. min_coords(float): Minimum number of coords. Defaults to 0.0. max_coords(float): Maximum number of coords. Defaults to 1.0. capacity(int): Vehicles' capacity. Defaults to None. service_time(float): Service time. Defaults to 0.2. min_demands(int): Minimum number of demands. Defaults to 1. max_demands(int): Maximum number of demands. Defaults to 10. min_backhaul(int): Minimum number of backhauls. Defaults to 1. max_backhaul(int): Maximum number of backhauls. Defaults to 10. max_time(float): Maximum route time. Defaults to 4.6. backhaul_ratio(float): Ratio of backhaul demands. Defaults to 0.2. backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to 1. sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to False. max_distance_limit(float): Route distance limits. Defaults to 2.8. speed(float): Vehicles' speed. Defaults to 1.0. subsample(bool): If problem variants are to be sampled. Defaults to True. variant_preset(str): Variant preset to be sampled. Defaults to "all". use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False. batch_size(torch.Size, optional): Batch size. Defaults to None. n_augment(int): Number of augmentations. Defaults to 2. sample_type(str): Sample type. It can be "random", "augment" or "saved". Defaults to "random". instance_name(str): Instance file path. Defaults to None. seed(int): Random number generator seed. Defaults to None. device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu". Returns: TensorDict: Instance data. """ if seed is not None: self._set_seed(seed) if batch_size is not None: batch_size = [batch_size] if isinstance(batch_size, int) else batch_size self.batch_size = torch.Size(batch_size) if self.set_of_instances is None: random_sample = True else: random_sample = False if instance_name==None and random_sample==False: instance_name = self.sample_name_from_set(seed=seed) elif instance_name==None and random_sample==True: instance_name = 'random_instance' else: instance_name = instance_name if num_agents is None: self.num_agents = 2 else: self.num_agents = num_agents if num_nodes is None: self.num_nodes = 15 else: self.num_nodes = num_nodes if min_coords is None: self.min_coords = 0.0 else: self.min_coords = min_coords if max_coords is None: self.max_coords = 1.0 else: self.max_coords = max_coords if capacity is None: self.capacity = get_vehicle_capacity(self.num_nodes) else: self.capacity = capacity if service_time is None: self.service_time = 0.2 else: self.service_time = service_time if min_demands is None: self.min_demands = 1 else: self.min_demands = min_demands if max_demands is None: self.max_demands = 10 else: self.max_demands = max_demands if min_backhaul is None: self.min_backhaul = 1 else: self.min_backhaul = min_backhaul if max_backhaul is None: self.max_backhaul = 10 else: self.max_backhaul = max_backhaul if max_time is None: self.max_time = 4.6 else: self.max_time = max_time if backhaul_ratio is None: self.backhaul_ratio = 0.2 else: self.backhaul_ratio = backhaul_ratio if backhaul_class is None: self.backhaul_class = 1 else: self.backhaul_class = backhaul_class if sample_backhaul_class is None: self.sample_backhaul_class = False else: self.sample_backhaul_class = sample_backhaul_class if max_distance_limit is None: self.max_distance_limit = 2.8 else: self.max_distance_limit = max_distance_limit if speed is None: self.speed = 1.0 else: self.speed = speed if variant_preset is None: self.variant_preset = 'all' else: self.variant_preset = variant_preset self.subsample = subsample self.use_combinations = use_combinations self.seed = seed self.device = device self.n_augment = n_augment if variant_preset in VARIANT_PRESETS_MIXED: self.variant_preset = variant_preset.replace('m', '', 1) self.backhaul_class = 2 if sample_type == 'random': instance_info = self.random_generate_instance( num_agents = self.num_agents, num_nodes = self.num_nodes, min_coords = self.min_coords, max_coords = self.max_coords, capacity = self.capacity, service_time = self.service_time, min_demands = self.min_demands, max_demands = self.max_demands, min_backhaul = self.min_backhaul, max_backhaul = self.max_backhaul, max_time = self.max_time, backhaul_ratio = self.backhaul_ratio, backhaul_class = self.backhaul_class, sample_backhaul_class = self.sample_backhaul_class, max_distance_limit = self.max_distance_limit, speed = self.speed, subsample = self.subsample, variant_preset = self.variant_preset, use_combinations = self.use_combinations, batch_size = self.batch_size, seed = self.seed, device = self.device ) elif sample_type == 'augment': instance_info = self.augment_generate_instance( num_agents = self.num_agents, num_nodes = self.num_nodes, min_coords = self.min_coords, max_coords = self.max_coords, capacity = self.capacity, service_time = self.service_time, min_demands = self.min_demands, max_demands = self.max_demands, min_backhaul = self.min_backhaul, max_backhaul = self.max_backhaul, max_time = self.max_time, backhaul_ratio = self.backhaul_ratio, backhaul_class = self.backhaul_class, sample_backhaul_class = self.sample_backhaul_class, max_distance_limit = self.max_distance_limit, speed = self.speed, subsample = self.subsample, variant_preset = self.variant_preset, use_combinations = self.use_combinations, n_augment=self.n_augment, batch_size = self.batch_size, seed = self.seed, device = self.device ) elif sample_type=='saved': instance_info = self.get_instance(instance_name, num_agents=num_agents) return instance_info
@staticmethod def _default_open(td, remove): td['open_routes'][remove] = False return td @staticmethod def _default_time_windows(td, remove): default_tw = torch.zeros_like(td['time_windows']) default_tw[..., 1] = float('inf') td['time_windows'][remove] = default_tw[remove] td['service_time'][remove] = torch.zeros_like(td['service_time'][remove]) return td @staticmethod def _default_distance_limit(td, remove): td['distance_limits'][remove] = float('inf') return td @staticmethod def _default_backhaul(td, remove): td['linehaul_demands'][remove] = ( td['linehaul_demands'][remove] + td['backhaul_demands'][remove] ) td['backhaul_demands'][remove] = 0 return td
[docs] def generate_demands(self, batch_size: int, num_nodes: int) -> torch.Tensor: """ Generate demands. Args: batch_size(int): Batch size. num_nodes(int): Number of nodes. Returns: torch.Tensor: Linehaul and backhaul demands. """ linehaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_( self.min_demands - 1, self.max_demands - 1 ) linehaul_demand = (linehaul_demand.int() + 1).float() # Backhaul demand sampling backhaul_demand = torch.FloatTensor(*batch_size, num_nodes).uniform_( self.min_backhaul - 1, self.max_backhaul - 1 ) backhaul_demand = (backhaul_demand.int() + 1).float() is_linehaul = torch.rand(*batch_size, num_nodes) > self.backhaul_ratio backhaul_demand = ( backhaul_demand * ~is_linehaul ) # keep only values where they are not linehauls linehaul_demand = linehaul_demand * is_linehaul return linehaul_demand, backhaul_demand
[docs] def generate_backhaul_class(self, shape: Tuple[int, int], sample: bool = False): """ Generate backhaul class. Args: shape(Tuple): Tensor shape. sample(bool): Sample backhaul class. Defaults to False. Returns: torch.Tensor: Linehaul and backhaul demands. """ if sample: return torch.randint(1, 3, shape, dtype=torch.float32) else: return torch.full(shape, self.backhaul_class, dtype=torch.float32)
[docs] def generate_distance_limit( self, shape: Tuple[int, int], coords: torch.Tensor ) -> torch.Tensor: """ Generate distance limits. Args: shape(Tuple): Tensor shape. coords(torch.Tensor): Nodes coordinates. Returns: torch.Tensor: Distance limits. """ max_dist = torch.max(torch.cdist(coords[:, 0:1], coords[:, 1:]).squeeze(-2), dim=1)[0] dist_lower_bound = 2 * max_dist + 1e-6 max_distance_limit = torch.maximum( torch.full_like(dist_lower_bound, self.max_distance_limit), dist_lower_bound + 1e-6, ) # We need to sample from the `distribution` module to get the same distribution with a tensor as input return torch.distributions.Uniform(dist_lower_bound, max_distance_limit).sample()[ ..., None ]
[docs] def get_distance(self, x: Tensor, y: Tensor): """ Euclidean distance between two tensors of shape `[..., n, dim]. Taken from: https://github.com/ai4co/rl4co/blob/main/rl4co/utils/ops.py Args: x(torch.Tensor): Point x. y(torch.Tensor): Point y. Returns: torch.Tensor: Distance between x and y. """ return (x - y).norm(p=2, dim=-1)
[docs] def generate_time_windows( self, coords: torch.Tensor = None, speed: torch.Tensor = None, ) -> torch.Tensor: """ Generate time windows. Args: coords(torch.Tensor): Nodes coordinates. speed(torch.Tensor): Agents speed. Returns: torch.Tensor: Time windows and service times. """ batch_size, n_loc = coords.shape[0], coords.shape[1] - 1 # no depot a, b, c = 0.15, 0.18, 0.2 service_time = a + (b - a) * torch.rand(batch_size, n_loc) tw_length = b + (c - b) * torch.rand(batch_size, n_loc) d_0i = self.get_distance(coords[:, 0:1], coords[:, 1:]) h_max = (self.max_time - service_time - tw_length) / d_0i * speed - 1 tw_start = (1 + (h_max - 1) * torch.rand(batch_size, n_loc)) * d_0i / speed tw_end = tw_start + tw_length # Depot tw is 0, max_time time_windows = torch.stack( ( torch.cat((torch.zeros(batch_size, 1), tw_start), -1), # start torch.cat((torch.full((batch_size, 1), self.max_time), tw_end), -1), ), # en dim=-1, ) # depot service time is 0 service_time = torch.cat((torch.zeros(batch_size, 1), service_time), dim=-1) return time_windows, service_time # [B, N+1, 2], [B, N+1]
if __name__ == "__main__": MIXED_PROBLEMS = ["ovrpmb", "ovrpmbl", "ovrpmbltw", "ovrpmbtw", "vrpmb", "vrpmbl", "vrpmbltw", "vrpmbtw"] number_instances = 2 print("Starting validation/test sets generation...") print() for num_nodes, n_agent in [(101, 25), (51, 25)]: generator = InstanceGenerator(batch_size=32, seed=0) for problem in VARIANT_PRESETS: for k in range(number_instances): #If problem is mixed, sample instance with another preset and backhaul_class=2 if problem not in MIXED_PROBLEMS: instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset=problem) else: if problem == "ovrpmb": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpb", backhaul_class=2) elif problem == "ovrpmbl": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbl", backhaul_class=2) elif problem == "ovrpmbltw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbltw", backhaul_class=2) elif problem == "ovrpmbtw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbtw", backhaul_class=2) elif problem == "vrpmb": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpb", backhaul_class=2) elif problem == "vrpmbl": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbl", backhaul_class=2) elif problem == "vrpmbltw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbltw", backhaul_class=2) elif problem == "vrpmbtw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbtw", backhaul_class=2) else: raise Exception("Error generating validation set.") name = f'generated_val_servs_{num_nodes-1}_agents_{n_agent}_{problem}_{k}' instance['name'] = name print("Generating validation data...") if not os.path.exists(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation'): os.makedirs(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation') print(f"Creating directory: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation") with open(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation/'+name+'.pkl', 'wb') as fp: pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL) print(f"Dumped data into: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/validation/{name}.pkl") #If problem is mixed, sample instance with another preset and backhaul_class=2 if problem not in MIXED_PROBLEMS: instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset=problem) else: if problem == "ovrpmb": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpb", backhaul_class=2) elif problem == "ovrpmbl": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbl", backhaul_class=2) elif problem == "ovrpmbltw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbltw", backhaul_class=2) elif problem == "ovrpmbtw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="ovrpbtw", backhaul_class=2) elif problem == "vrpmb": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpb", backhaul_class=2) elif problem == "vrpmbl": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbl", backhaul_class=2) elif problem == "vrpmbltw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbltw", backhaul_class=2) elif problem == "vrpmbtw": instance = generator.sample_instance(num_agents=n_agent, num_nodes=num_nodes, variant_preset="vrpbtw", backhaul_class=2) else: raise Exception("Error generating test set.") name = f'generated_test_servs_{num_nodes-1}_agents_{n_agent}_{problem}_{k}' instance['name'] = name print("Generating test data...") if not os.path.exists(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test'): os.makedirs(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test') print(f"Creating directory: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test") with open(f'data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test/'+name+'.pkl', 'wb') as fp: pickle.dump(instance, fp, protocol=pickle.HIGHEST_PROTOCOL) print(f"Dumped data into: data/generated/servs_{num_nodes-1}_agents_{n_agent}/{problem}/test/{name}.pkl") print('Generation completed.')