import torch
from tensordict import TensorDict
import os
from os import path
from typing import Optional, Dict
import numpy as np
from maenvs4vrp.core.env_generator_builder import InstanceBuilder
from huggingface_hub import hf_hub_download
import shutil
import logging
BENCHMARK_INSTANCES_PATH = 'gmtdvrp/data/benchmark'
VARIANT_PRESETS = [
'cvrp', 'ovrp', 'ovrpb', 'ovrpbl', 'ovrpbltw', 'ovrpbtw',
'ovrpl', 'ovrpltw', 'ovrpmb', 'ovrpmbl', 'ovrpmbltw', 'ovrpmbtw',
'ovrptw', 'vrpb', 'vrpbl', 'vrpbltw', 'vrpbtw', 'vrpl',
'vrpltw', 'vrpmb', 'vrpmbl', 'vrpmbltw', 'vrpmbtw', 'vrptw'
]
log = logging.getLogger(__name__)
[docs]
class BenchmarkInstanceGenerator(InstanceBuilder):
"""
GMTDVRP benchmark instance generation class.
"""
[docs]
@classmethod
def get_list_of_benchmark_instances(cls):
"""
Get list of possible instances from benchmark files.
Args:
n/a.
Returns:
None.
"""
cls.download_and_copy_instances()
dataset = ['50_test', '100_test','50_validation', '100_validation']
base_dir = path.dirname(os.path.dirname(os.path.abspath(__file__)))
inst_dic = {}
for pset in dataset:
data_files = []
numb, settype = pset.split('_')
for problem in VARIANT_PRESETS:
full_dir = path.join(base_dir, BENCHMARK_INSTANCES_PATH, problem)
data_dir = os.listdir(os.path.join(full_dir, settype))
data_files += [BENCHMARK_INSTANCES_PATH+'/'+problem+'/'+settype+'/'+item.split('.')[0] for item in data_dir if numb in item]
inst_dic[pset] = data_files
return inst_dic
def check_instance_folders(base_dir, env):
base_benchmark_dir = os.path.join(base_dir, env, "data/benchmark")
for variant in VARIANT_PRESETS:
for instance_type in ["validation", "test"]:
for instance_name in ["100.npz", "50.npz"]:
file_path = os.path.join(base_benchmark_dir, variant, instance_type, instance_name)
if not os.path.isfile(file_path):
log.warning(
f"Missing instance: {file_path}. If you wish to re-download the original benchmark files, delete your {env}/data/benchmark folder and instantiate the BenchmarkInstanceGenerator class again."
)
[docs]
@classmethod
def download_and_copy_instances(cls):
"""
Download benchmark instances from HuggingFace if they are not locally present.
Args:
n/a.
Returns:
None.
"""
base_dir = path.dirname(path.dirname(path.abspath(__file__)))
env = 'gmtdvrp'
direct = "data/benchmark"
directory_to_be_created = os.path.join(base_dir, env, direct)
if os.path.isdir(directory_to_be_created):
cls.check_instance_folders(base_dir, env)
if not (os.path.isdir(directory_to_be_created)):
os.makedirs(directory_to_be_created)
log.warning(f"Downloading benchmark files from HuggingFace to {directory_to_be_created}...")
for variant in VARIANT_PRESETS:
for instance_type in ['val', 'test']:
for instance_name in ['100.npz', '50.npz']:
fname=f"data/{variant}/{instance_type}/{instance_name}"
file_path = hf_hub_download(
repo_id="ai4co/routefinder",
repo_type="dataset",
filename=fname
)
instance_type_2 = "validation" if instance_type == "val" else "test"
full_directory = os.path.join(base_dir, env, f"data/benchmark/{variant}/{instance_type_2}")
if not (os.path.isdir(full_directory)):
os.makedirs(full_directory)
shutil.copy(file_path, full_directory)
[docs]
def __init__(
self,
problem_type:set = 'all',
instance_type:str = None,
set_of_instances:set = None,
device: Optional[str] = 'cpu',
batch_size: Optional[torch.Size] = 1000,
seed: int = None
) -> None:
"""
Constructor. Create an instance space of one or several sets of data.
Args:
problem_type(set): Problem type. Defaults to "all".
instance_type(str): Instance type. It must be "50_test", "100_test", "50_validation" or "100_validation". Defaults to None.
set_of_instances(set): Set of instances paths. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
batch_size(torch.Size, optional): Batch size. If not specified, defaults to 1000.
seed(int): Random number generator seed. Defaults to None.
Returns:
None.
"""
self.download_and_copy_instances() #If instances are not on local machine, they'll be downloaded from RouteFinder's HuggingFace
if seed is None:
self._set_seed(self.DEFAULT_SEED)
else:
self._set_seed(seed)
self.device = device
if batch_size is None:
batch_size = [1]
else:
batch_size = [batch_size] if isinstance(batch_size, int) else batch_size
self.batch_size = torch.Size(batch_size)
if problem_type is None or 'all':
problem_type = VARIANT_PRESETS
assert problem_type is not None and len(problem_type)>0, f"Set of problem variants is not > 0."
assert all(item in VARIANT_PRESETS for item in problem_type), f"Invalid variant preset."
assert instance_type in ['50_test', '100_test','50_validation', '100_validation'] or instance_type is None or instance_type == '', f"Instance type must be '50_test', '100_test','50_validation', '100_validation'."
assert len(set_of_instances)>0, f"Set of instances not > 0."
if set_of_instances:
self.problem_type = problem_type
self.instance_type = instance_type
self.set_of_instances = set_of_instances
self.load_set_of_instances()
[docs]
def load_set_of_instances(self, set_of_instances:set=None):
"""
Load every instance on set_of_instances set.
Args:
set_of_instances(set): Set of instances paths. Defaults to None.
Returns:
None.
"""
if set_of_instances:
self.set_of_instances = set_of_instances
self.instances_data = dict()
for instance_name in self.set_of_instances:
instance = self.read_parse_instance_data(instance_name)
self.instances_data[instance_name] = instance
[docs]
def read_parse_instance_data(self, instance_name:str)-> Dict:
"""
Read instance data from file. Benchmark's instance keys are translated into our keys.
Args:
instance_name(str): Instance path.
Returns:
Dict: Instance data.
"""
base_dir = path.dirname(path.dirname(path.abspath(__file__)))
file_path = '{path_to_generated_instances}/{instance}.npz' \
.format(path_to_generated_instances=base_dir,
instance=instance_name)
instance = dict()
instance['name'] = instance_name
loaded_data = np.load(file_path)
np_instance = {key: loaded_data[key] for key in loaded_data.files}
data = TensorDict({}, batch_size=self.batch_size, device=self.device)
for key in np_instance:
data[key] = torch.from_numpy(np_instance[key])
instance['num_agents'] = 1
instance['num_nodes'] = data['locs'].shape[1]
num_agents = instance['num_agents']
num_nodes = instance['num_nodes']
batch_size = data['locs'].shape[0]
instance['batch_size'] = batch_size
instance['name'] = instance['name'] + '_samp'
new_data = TensorDict({}, batch_size=self.batch_size, device=self.device)
batch_idx = torch.arange(batch_size, device=self.device).unsqueeze(-1)
new_data['coords'] = data['locs'] #There're always coords
zeros = torch.zeros((*self.batch_size, 1), dtype = torch.int64, device=self.device)
new_data['linehaul_demands'] = torch.concat([zeros, data['demand_linehaul']], dim=1) #There're always linehauls
new_data['capacity'] = data['vehicle_capacity'] #There're always capacities
self.depot_idx = 0
new_data['depot_idx'] = self.depot_idx * torch.ones((*self.batch_size, 1), dtype = torch.int64, device=self.device)
new_data['speed'] = data['speed'] #There's always speeed etc.
if 'demand_backhaul' in data.keys():
zeros = torch.zeros((*self.batch_size, 1), dtype = torch.int64, device=self.device)
new_data['backhaul_demands'] = torch.concat([zeros, data['demand_backhaul']], dim=1)
else:
new_data['backhaul_demands'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.float32, device=self.device)
if 'backhaul_class' in data.keys():
new_data['backhaul_class'] = data['backhaul_class']
if 'time_windows' in data.keys():
new_data['time_windows'] = data['time_windows']
else:
new_data['time_windows'] = torch.zeros((*self.batch_size, num_nodes, 2), dtype=torch.float32, device=self.device)
new_data['time_windows'][:,:,1] = float('inf')
if 'service_time' in data.keys():
new_data['service_time'] = data['service_time']
if 'distance_limit' in data.keys():
new_data['distance_limits'] = data['distance_limit']
else:
new_data['distance_limits'] = torch.full((*self.batch_size, 1), float('inf'))
if 'open_route' in data.keys():
new_data['open_routes'] = data['open_route']
else:
new_data['open_routes'] = torch.zeros((*self.batch_size, 1), dtype=torch.bool, device=self.device)
if 'time_windows' in data.keys():
new_data['end_time'] = data['time_windows'][:,:,1].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
new_data['start_time'] = data['time_windows'][:,:,0].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
new_data['tw_low'] = data['time_windows'][:,:,0]
new_data['tw_high'] = data['time_windows'][:,:,1]
else:
new_data['end_time'] = torch.full((*self.batch_size, 1), float('inf'))
new_data['start_time'] = torch.zeros((*self.batch_size, 1), dtype=torch.int64, device=self.device)
new_data['tw_low'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.float32, device=self.device)
new_data['tw_high'] = torch.full((*self.batch_size, num_nodes), float('inf'))
new_data['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device)
new_data['is_depot'][:, self.depot_idx] = True
instance['data'] = new_data
return instance
[docs]
def get_instance(self, instance_name:str, num_agents:int=None) -> Dict:
"""
Get an instance with custom number of agents.
Args:
instance_name(str): Instance path.
num_agents(int): Number of agents. Defaults to None.
Returns:
Dict: Instance data.
"""
instance = self.instances_data.get(instance_name)
if num_agents is not None:
assert num_agents>0, f"number of agents must be grater them 0!"
instance['num_agents'] = num_agents
instance['num_depots'] = 1
return instance
[docs]
def random_sample_instance(self,
instance_name:str=None,
num_depots: int = None,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: int = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = False,
max_distance_limit: float = None,
speed: float = None,
initial_load: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
force_visit: bool = True,
batch_size: Optional[torch.Size] = None,
seed: int = None,
device: Optional[str] = None)-> Dict:
"""
Sample one instance from instance space, randomly adjusting the nodes.
Args:
instance_name(str): Instance file path. Defaults to None.
num_depots(int): Total number of depots. Defaults to None.
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to False.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
initial_load(float): Vehicles' initial load. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
Dict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
new_instance = dict()
instance = self.get_instance(instance_name, num_agents)
new_instance['num_agents'] = instance['num_agents']
if num_nodes is not None:
num_nodes = min(num_nodes, instance['num_nodes'])
new_instance['num_nodes'] = num_nodes
else:
num_nodes = instance['num_nodes']
new_instance['num_nodes'] = instance['num_nodes']
batch_size = instance['batch_size']
idxs = torch.arange(0, num_nodes, device=self.device).expand(batch_size, num_nodes)
depots = idxs[:, 0:1]
non_depots = idxs[:, 1:]
indices = torch.argsort(torch.rand(*non_depots.shape), dim=-1)
index = torch.cat([depots, indices], dim=1)
index = index[:, :num_nodes]
new_data = TensorDict({}, batch_size=self.batch_size, device=self.device)
batch_idx = torch.arange(batch_size, device=self.device).unsqueeze(-1)
data = instance['data']
new_data['coords'] = data['coords'][batch_idx, index] #There're always coords
new_data['linehaul_demands'] = data['linehaul_demands'][batch_idx, index] #There're always linehauls
new_data['capacity'] = data['capacity'] #There're always capacities
self.depot_idx = 0
new_data['depot_idx'] = self.depot_idx * torch.ones((*self.batch_size, 1), dtype = torch.int64, device=self.device)
new_data['speed'] = data['speed'] #There's always speeed etc.
if 'backhaul_demands' in data.keys():
new_data['backhaul_demands'] = data['backhaul_demands'][batch_idx, index]
if 'backhaul_class' in data.keys():
new_data['backhaul_class'] = data['backhaul_class']
if 'time_windows' in data.keys():
new_data['time_windows'] = data['time_windows'][batch_idx, index]
if 'service_time' in data.keys():
new_data['service_time'] = data['service_time'][batch_idx, index]
if 'distance_limits' in data.keys():
new_data['distance_limits'] = data['distance_limits']
else:
new_data['distance_limits'] = torch.full((*self.batch_size, 1), float('inf'))
if 'open_routes' in data.keys():
new_data['open_routes'] = data['open_routes']
if 'time_windows' in data.keys():
new_data['end_time'] = data['time_windows'][:,:,1].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
new_data['start_time'] = data['time_windows'][:,:,0].gather(1, torch.zeros((*self.batch_size, 1),
dtype=torch.int64, device=self.device)).squeeze(-1)
new_data['tw_low'] = new_data['time_windows'][:,:,0]
new_data['tw_high'] = new_data['time_windows'][:,:,1]
else:
new_data['end_time'] = torch.full((*self.batch_size, 1), float('inf'))
new_data['start_time'] = torch.zeros((*self.batch_size, 1), dtype=torch.int64, device=self.device)
new_data['tw_low'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.float32, device=self.device)
new_data['tw_high'] = torch.full((*self.batch_size, num_nodes), float('inf'))
new_data['is_depot'] = torch.zeros((*self.batch_size, num_nodes), dtype=torch.bool, device=self.device)
new_data['is_depot'][:, self.depot_idx] = True
new_data['initial_load'] = torch.full((*self.batch_size, self.num_agents), initial_load, dtype=torch.float32)
new_instance['data'] = new_data
new_instance['num_depots'] = num_depots
return new_instance
[docs]
def sample_name_from_set(self, seed:int=None)-> str:
"""
Sample one instance from instance set.
Args:
seed(int): Random number generator seed. Defaults to None.
Returns:
str: Instance sample name.
"""
if seed is not None:
self._set_seed(seed)
assert len(self.set_of_instances)>0, f"set_of_instances has to have at least one instance!"
return list(self.set_of_instances)[torch.randint(0, len(self.set_of_instances), (1,)).item()]
[docs]
def sample_instance(self,
sample_type: str = 'random',
instance_name:str=None,
num_depots: int = None,
num_agents: int = None,
num_nodes: int = None,
min_coords: float = None,
max_coords: float = None,
capacity: int = None,
service_time: float = None,
min_demands: int = None,
max_demands: int = None,
min_backhaul: int = None,
max_backhaul: int = None,
max_time: float = None,
backhaul_ratio: float = None,
backhaul_class: int = None,
sample_backhaul_class: bool = False,
max_distance_limit: float = None,
speed: float = None,
initial_load: float = None,
subsample: bool = True,
variant_preset=None,
use_combinations: bool = False,
force_visit: bool = True,
batch_size: Optional[torch.Size] = None,
seed: int = None,
n_augment: Optional[int] = None,
device: Optional[str] = None)-> Dict:
"""
Sample one instance from instance space.
Args:
sample_type(str): Type of instance to sample. It can be "random" or "augment". Defaults to "random".
instance_name(str): Instance file path. Defaults to None.
num_depots(int): Total number of depots. Defaults to None.
num_agents(int): Total number of agents. Defaults to None.
num_nodes(int): Total number of nodes. Defaults to None.
min_coords(float): Minimum number of coords. Defaults to None.
max_coords(float): Maximum number of coords. Defaults to None.
capacity(int): Vehicles' capacity. Defaults to None.
service_time(float): Service time. Defaults to None.
min_demands(int): Minimum number of demands. Defaults to None.
max_demands(int): Maximum number of demands. Defaults to None.
min_backhaul(int): Minimum number of backhauls. Defaults to None.
max_backhaul(int): Maximum number of backhauls. Defaults to None.
max_time(float): Maximum route time. Defaults to None.
backhaul_ratio(float): Ratio of backhaul demands. Defaults to None.
backhaul_class(int): Class of backhaul problem. If 1, it's unmixed, if 2, it's mixed. Defaults to None.
sample_backhaul_class(bool): If backhaul class is sampled across batches. Defaults to False.
max_distance_limit(float): Route distance limits. Defaults to None.
speed(float): Vehicles' speed. Defaults to None.
initial_load(float): Vehicles' initial load. Defaults to None.
subsample(bool): If problem variants are to be sampled. Defaults to True.
variant_preset(str): Variant preset to be sampled. Defaults to None.
use_combinations(bool): It considers combinations for which sampling mask the instance is defined. Defaults to False.
force_visit(bool): It forces the agent to visit all feasible nodes before going back to depot. Defaults to True.
batch_size(torch.Size, optional): Batch size. Defaults to None.
seed(int): Random number generator seed. Defaults to None.
n_augment(int, optional): Number of augmentations. Defaults to None.
device(str, optional): Type of processing. It can be "cpu" or "gpu". Defaults to "cpu".
Returns:
Dict: Instance data.
"""
if seed is not None:
self._set_seed(seed)
if instance_name==None:
instance_name = self.sample_name_from_set(seed=seed)
else:
instance_name = instance_name
if num_depots is None:
self.num_depots = 1
else:
self.num_depots = num_depots
if num_agents is None:
self.num_agents = 2
else:
self.num_agents = num_agents
if num_nodes is None:
self.num_nodes = 15
else:
self.num_nodes = num_nodes
if min_coords is None:
self.min_coords = 0.0
else:
self.min_coords = min_coords
if max_coords is None:
self.max_coords = 1.0
else:
self.max_coords = max_coords
if capacity is None:
self.capacity = 50.
else:
self.capacity = capacity
if service_time is None:
self.service_time = 0.2
else:
self.service_time = service_time
if min_demands is None:
self.min_demands = 1
else:
self.min_demands = min_demands
if max_demands is None:
self.max_demands = 10
else:
self.max_demands = max_demands
if min_backhaul is None:
self.min_backhaul = 1
else:
self.min_backhaul = min_backhaul
if max_backhaul is None:
self.max_backhaul = 10
else:
self.max_backhaul = max_backhaul
if max_time is None:
self.max_time = 4.6
else:
self.max_time = max_time
if backhaul_ratio is None:
self.backhaul_ratio = 0.2
else:
self.backhaul_ratio = backhaul_ratio
if backhaul_class is None:
self.backhaul_class = 1
else:
self.backhaul_class = backhaul_class
if sample_backhaul_class is None:
self.sample_backhaul_class = False
else:
self.sample_backhaul_class = sample_backhaul_class
if max_distance_limit is None:
self.max_distance_limit = 2.8
else:
self.max_distance_limit = max_distance_limit
if speed is None:
self.speed = 1.0
else:
self.speed = speed
if initial_load is None:
self.initial_load = self.capacity
else:
self.initial_load = initial_load
if sample_type=='random':
instance = self.random_sample_instance( instance_name=instance_name,
num_depots = self.num_depots,
num_agents = self.num_agents,
num_nodes = self.num_nodes,
min_coords = self.min_coords,
max_coords = self.max_coords,
capacity = self.capacity,
service_time = self.service_time,
min_demands = self.min_demands,
max_demands = self.max_demands,
min_backhaul = self.min_backhaul,
max_backhaul = self.max_backhaul,
max_time = self.max_time,
backhaul_ratio = self.backhaul_ratio,
backhaul_class = self.backhaul_class,
sample_backhaul_class = self.sample_backhaul_class,
max_distance_limit = self.max_distance_limit,
speed = self.speed,
initial_load = self.initial_load,
subsample = subsample,
variant_preset = variant_preset,
use_combinations = use_combinations,
force_visit = force_visit,
batch_size = batch_size,
seed = seed,
device = device)
else:
instance = self.get_instance(instance_name, num_agents=num_agents)
return instance