Exploring MAEnvs4VRP library¶
Install¶
Uncomment the following cells:
[ ]:
#!git clone https://github.com/ricgama/maenvs4vrp.git
[ ]:
# When using Colab
#%cd maenvs4vrp
#%mv maenvs4vrp/ repo_temp/
#%mv repo_temp/ ..
#%cd ..
#%cp maenvs4vrp/setup.py repo_temp/
#%rm -r maenvs4vrp
#%mv repo_temp/ maenvs4vrp/
#%cd maenvs4vrp/
#!pip install .
This notebook is designed to introduce you to the MAEnvs4VRP library through an interactive, step‑by‑step format. You’ll work through a series of concise, hands‑on coding exercises that gradually build your familiarity with the environment’s core features. By the end, you’ll have a solid foundation for leveraging MAEnvs4VRP in your research or applications.
[ ]:
import torch
import matplotlib.pyplot as plt
Basic API usage example:¶
We’ll begin by diving into the Team Orienteering Problem with Time Windows (TOPTW) environment.
[ ]:
from maenvs4vrp.environments.toptw.env import Environment
from maenvs4vrp.environments.toptw.env_agent_selector import AgentSelector
from maenvs4vrp.environments.toptw.observations import Observations
from maenvs4vrp.environments.toptw.instances_generator import InstanceGenerator
from maenvs4vrp.environments.toptw.env_agent_reward import DenseReward
[ ]:
gen = InstanceGenerator(batch_size = 8)
obs = Observations()
sel = AgentSelector()
rew = DenseReward()
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
A crucial attribute of the environment is env.td_state, which holds the entire state of the simulation and is populated the moment you initialize the environment with reset(). Let’s examine the contents of env.td_state both before and after calling reset():
[ ]:
env.td_state
[ ]:
td = env.reset(batch_size = 8, num_agents=4, num_nodes=16)
After reset() the env.td_state changes to:
[ ]:
env.td_state
This top‑level TensorDict represents a batch of 8 parallel environments (batch_size=[8]). Inside it:
``agents`` (TensorDict,
[8,4]or[8,4,16]):Tracks all 4 agents per environment:
active_agents_mask([8, 4]): which agents are still activecum_profit,step_profit([8, 4]): accumulated and per‑step rewardscur_node,cur_step,cur_time([8, 4]): each agent’s current location index, step count, and timestampfeasible_nodes,visited_nodes([8, 4, 16]): per‑agent masks over the 16 possible nodes
``cur_agent`` (TensorDict,
[8,1]or[8,16]):Details for the currently acting agent in each environment:
action_mask([8,16]): valid node choicescum_profit,step_profit,cur_node,cur_step,cur_time([8,1])
Global scene tensors:
coords([8,16,2]): x,y locations of all 16 nodesprofits,service_time,time2depot,tw_low,tw_high([8,16]): per‑node reward, service duration, travel back to depot, and time‑window boundsdepot_idx,depot_loc([8,1],[8,1,2]): index and coordinates of each environment’s depotnodes(TensorDict[8,16]):active_nodes_mask: which nodes remain unvisitedcur_profits: remaining reward at each node
Control/status flags:
done,is_last_step([8]): episode termination indicatorsis_depot([8,16]): mask marking which node is the depot
Timing & capacity:
start_time,end_time,max_tour_duration([8]): global time settingstime2depot([8,16]): travel times back to depot
``solution`` is an (empty) TensorDict reserved for building the final route outputs.
Together, this structure holds per-agent, per-node, and global information required to step and evaluate each batch of VRP episodes.
Also, as output we have the TensorDict td containing:
[ ]:
td
[ ]:
td["done"]
This TensorDict summarizes the current state of the environment and includes the observations of all active agents across a batch of 8 environments:
Top‑level fields (shape in brackets is per batch dimension):
agent_step([8, 1], int32): which step each environment’s current agent is on.cur_agent_idx([8, 1], int64): index (0–3) of the agent whose turn it is.cur_node_idx([8, 1], int64): the node index where that agent currently resides.done([8], bool): whether each episode has terminated.penalty&reward([8], float32): scalar penalty or reward resulting from the last action.
``observations`` (a nested
TensorDict, batch size [8]):action_mask([8, 16], bool): which of the 16 possible next nodes are valid.agent_obs([8, 4], float32): per‑agent features (e.g., remaining capacity, current time).agents_mask([8, 4], bool): which of the 4 agents are still active.global_obs([8, 3], float32): shared global features (e.g., elapsed time, total remaining profit).node_static_obs([8, 16, 7], float32): per‑node invariant features (e.g., coordinates, service times).node_dynamic_obs([8, 16, 8], float32): per‑node time‑varying features (e.g., remaining profit, open/closed status).other_agents_obs([8, 4, 7], float32): features of the other agents (e.g., their locations and statuses).
Now we can run an episode:
[ ]:
while not td["done"].all():
td = env.sample_action(td) # this is where we insert our policy
td = env.step(td)
at the end we have
[ ]:
td["done"]
and
[ ]:
env.td_state["solution"]
[ ]:
env.td_state["solution"]["actions"]
[ ]:
env.td_state["solution"]["agents"]
Together, these two ´tensors´ give you the full episodic routes for every agent in each environment
Quick walkthrough¶
Let’s now go through the library’s building blocks, exploring their functionalities.
Instance generation¶
[ ]:
instance = gen.sample_instance(num_agents=2, num_nodes=10)
[ ]:
instance.keys()
It’s possible to load a set of pre-generaded instances, to be used as validation/test sets. For example:
[ ]:
gen.get_list_of_benchmark_instances()['servs_100_agents_5']['validation']
[ ]:
set_of_instances = set(gen.get_list_of_benchmark_instances()['servs_100_agents_5']['validation'])
[ ]:
generator = InstanceGenerator(instance_type='validation', set_of_instances=set_of_instances)
[ ]:
instance = generator.sample_instance()
Let’s check instance dict keys:
[ ]:
instance.keys()
[ ]:
instance['name']
[ ]:
from maenvs4vrp.environments.toptw.benchmark_instances_generator import BenchmarkInstanceGenerator
In order to narrow the current gap between the test beds for algorithm benchmarking used in RL and OR communities, the library allows a straightforward integration of classical OR benchmark instances. For example, we can load a set of classical benchmark instances. Let’s see what benchmark instances we have for the TOPTW:
[ ]:
BenchmarkInstanceGenerator.get_list_of_benchmark_instances()
[ ]:
generator = BenchmarkInstanceGenerator(instance_type='Solomon', set_of_instances={'c101', 'c102'})
[ ]:
instance_c101 = generator.get_instance('c101')
[ ]:
instance_c101.keys()
[ ]:
instance_c101['name']
[ ]:
instance_c101['num_agents']
[ ]:
instance_c101['num_nodes']
Observations¶
Observation features, that will be available to the active agent while interacting with the environment, are handle by Observations class. The class has a default_feature_list attribute where the default configuration dictionary is defined.
[ ]:
obs.default_feature_list
Also, five possible features lists exist, detailing the available features in the class: POSSIBLE_NODES_STATIC_FEATURES, POSSIBLE_NODES_DYNAMIC_FEATURES, POSSIBLE_SELF_FEATURES, POSSIBLE_AGENTS_FEATURES, POSSIBLE_GLOBAL_FEATURES. For example:
[ ]:
obs.POSSIBLE_NODES_STATIC_FEATURES
[ ]:
obs.POSSIBLE_GLOBAL_FEATURES
While instantiating the Observations class, we can pass through a feature list dictionary specifying which features will be available for the agent:
[ ]:
import yaml
[ ]:
feature_list = yaml.safe_load("""
nodes_static:
x_coordinate_min_max:
feat: x_coordinate_min_max
norm: min_max
x_coordinate_min_max:
feat: x_coordinate_min_max
norm: min_max
tw_low_mm:
feat: tw_low
norm: min_max
tw_high:
feat: tw_high
norm: min_max
nodes_dynamic:
- time2open_div_end_time
- time2close_div_end_time
- time2open_after_step_div_end_time
- time2close_after_step_div_end_time
- fract_time_after_step_div_end_time
agent:
- x_coordinate_min_max
- y_coordinate_min_max
- frac_current_time
other_agents:
- x_coordinate_min_max
- y_coordinate_min_max
- frac_current_time
- dist2agent_div_end_time
global:
- frac_done_agents
- frac_colect_profits
""")
[ ]:
obs = Observations(feature_list)
We can test these observations on the environment:
[ ]:
gen = InstanceGenerator(batch_size=8)
sel = AgentSelector()
rew = DenseReward()
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
[ ]:
td = env.reset(batch_size = 8, num_agents=4, num_nodes=16)
[ ]:
td_observation = env.observe()
[ ]:
td_observation
Let’s run an episode:
[ ]:
while not td["done"].all():
td = env.sample_action(td) # this is where we insert our policy
td = env.step(td)
and check the collected profits:
[ ]:
env.td_state['agents']['cum_profit'].sum(-1)
An environment with agents performing random actions is not very impressive. Let’s train a policy with PPO algorithm to get smarter agents:
[ ]:
# When using Colab
#%cd maenvs4vrp/notebooks/
[ ]:
%run ../learning/ppo/train_ma_ppo.py --vrp_env toptw --num_agents 4 --num_nodes 21
Challenges¶
Ex0. Warm-up¶
Ok! Let’s now try some small hands-on coding challenges. To simplify solution verification, allowing a pen and paper check, let’s use some small toy instances.
[ ]:
from maenvs4vrp.environments.toptw.toy_instance_generator import ToyInstanceGenerator
[ ]:
gen = ToyInstanceGenerator()
obs = Observations()
sel = AgentSelector()
rew = DenseReward()
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
[ ]:
td = env.reset()
The set of service nodes and the depot’s coordinates are as follows:
[ ]:
fig = plt.figure(figsize=(3,3))
plt.plot(env.td_state['coords'][0][:,0].numpy(), env.td_state['coords'][0][:,1].numpy(), 'o')
plt.plot(env.td_state['coords'][0][0,0].numpy(), env.td_state['coords'][0][0,1].numpy(), 'o', color='red' )
And the corresponding time‑window constraints are:
[ ]:
for k, data in enumerate(zip(env.td_state['tw_low'][0].tolist(), env.td_state['tw_high'][0].tolist())):
print(f'node {k} time window is: [{data[0]}; {data[1]}]')
All agents begin at the depot (node 0, shown as the red dot). The travel times from the depot to each service node are:
[ ]:
loc = env.td_state['coords'].gather(1, env.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
time2j = torch.pairwise_distance(loc, env.td_state["coords"], eps=0, keepdim = False)
time2j[0]
If the agent selects to visit node 1, what will be the collected profit?
Checking the previous distance values, time windows and the new distances, what will be the mask of the admissible nodes after this step?
(hint: check the env.td_state attribute.)
[ ]:
td['action'] = torch.tensor([[1]])
[ ]:
td = env.step(td)
[ ]:
# %load snippets/ex0.py
# your code here!!
Next, we’ll dive into the Observations module:
Ex1. Team Orienteering Problem with Time Windows - Observations¶
[ ]:
from maenvs4vrp.environments.toptw.env import Environment
from maenvs4vrp.environments.toptw.env_agent_selector import AgentSelector
from maenvs4vrp.environments.toptw.env_agent_reward import DenseReward
A critical part of training agents is their ability to retrieve useful information from the environment in order to act on it. In MAEnvs4VRP, you can tailor exactly what data your agents observe by implementing custom methods in the Observations class.
[ ]:
from maenvs4vrp.environments.toptw.observations import Observations
[ ]:
gen = ToyInstanceGenerator()
sel = AgentSelector()
rew = DenseReward()
[ ]:
obs = Observations()
The class has a default_feature_list attribute where the default configuration dictionary is defined.
[ ]:
obs.default_feature_list
Also, five possible features lists exist, detailing the available features in the class: possible_nodes_static_features, possible_nodes_dynamic_features, possible_agent_features, possible_agents_features, possible_global_features. For example:
[ ]:
obs.possible_nodes_dynamic_features
Lets see how to add another nodes dynamic observation.
Change the code below in order to implement the nodes dynamic feature
wait_time_div_end_time:
[ ]:
# %load snippets/ex1.py
class Observations(Observations):
def __init__(self, feature_list:dict = None):
super().__init__()
self.default_feature_list['nodes_dynamic'].append('wait_time_div_end_time')
self.possible_nodes_dynamic_features.append('wait_time_div_end_time')
def get_feat_wait_time_div_end_time(self):
""" dynamic feature
Args:
Returns:
Tensor: waiting time at nodes divided by end time.
"""
loc = self.env.td_state['coords'].gather(1, self.env.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
ptime = self.env.td_state['cur_agent']['cur_time'].clone()
time2j = torch.pairwise_distance(loc, self.env.td_state["coords"], eps=0, keepdim = False)
#arrivej = !! your code here !!
#wait = !! your code here !!
return wait / self.env.td_state['end_time'].unsqueeze(dim=-1)
[ ]:
obs = Observations(obs)
We can re-check the possible nodes dynamic features available:
[ ]:
obs.possible_nodes_dynamic_features
and the ones the that are going to used by the agent:
[ ]:
obs.default_feature_list['nodes_dynamic']
Ok! Now, let’s creat the TOPTW environment:
[ ]:
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
Check if your answer is correct, by running a couple of environment steps.
Note: the observation feature will be on the obs.default_feature_list['nodes_dynamic'].index('wait_time_div_end_time') position of the node_dynamic_obs tensor.
[ ]:
# %load snippets/ex2.py
# check the new feature position here
[ ]:
td = env.reset()
[ ]:
#%load snippets/ex3.py
#check the nodes dynamic observations on the td
Now select a node to visit and execute one environment step (you can replace the value 3 with any other valid node index):
[ ]:
td['action'] = torch.tensor([[3]])
[ ]:
td = env.step(td)
[ ]:
# your code here
[ ]:
# your code here
Think of another potentially useful observation feature for this environment. Implement and test it.
[ ]:
# your code here
[ ]:
# your code here
Ex2. Split Delivery Vehicle Routing Problem with Time Windows (SDVRPTW)¶
The Split Delivery Vehicle Routing Problem with Time Windows (SDVRPTW) extends the classic CVRPTW by allowing each customer’s demand to be split across multiple visits from different vehicles. In other words, a single customer can receive partial deliveries from more than one truck, as long as all time‐window constraints are respected.
Below, we’ll outline the modifications needed to turn our existing CVRPTW environment into an SDVRPTW setup. But first, let’s take a quick look at the CVRPTW environment:
[ ]:
from maenvs4vrp.environments.cvrptw.toy_instance_generator import ToyInstanceGenerator
[ ]:
gen = ToyInstanceGenerator()
inst = gen.sample_instance()
The services and depot location is:
[ ]:
fig = plt.figure(figsize=(3,3))
plt.plot(inst['data']['coords'][0][:,0].numpy(), inst['data']['coords'][0][:,1].numpy(), 'o')
plt.plot(inst['data']['coords'][0][0,0].numpy(), inst['data']['coords'][0][0,1].numpy(), 'o', color='red' )
With time windows and demands:
[ ]:
for k, data in enumerate(zip(inst['data']['tw_low'][0].tolist(), inst['data']['tw_high'][0].tolist(), inst['data']['demands'][0].tolist())):
print(f'node {k} time window is: [{data[0]}; {data[1]}], with demand {data[2]}')
[ ]:
from maenvs4vrp.environments.cvrptw.env import Environment
from maenvs4vrp.environments.cvrptw.env_agent_selector import AgentSelector
from maenvs4vrp.environments.cvrptw.env_agent_reward import DenseReward
from maenvs4vrp.environments.cvrptw.observations import Observations
[ ]:
gen = ToyInstanceGenerator()
sel = AgentSelector()
rew = DenseReward()
obs = Observations()
[ ]:
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
[ ]:
td = env.reset()
check what agent is active and what actions are admissible for him.
Note: on the td acess cur_agent_idx and observations/action_mask keys.
[ ]:
#%load snippets/ex4.py
# your code here
This information is also available by accessing the environment td_state attribute on the cur_agent key.
[ ]:
env.td_state['cur_agent']
[ ]:
env.td_state['cur_agent']['cur_load']
Let’s choose to serve node 2:
[ ]:
td['action'] = torch.tensor([[2]])
td = env.step(td)
[ ]:
action = torch.tensor([[2]])
What should the new
cur_loadandaction_maskbe? Check your answer.
[ ]:
# %load snippets/ex5.py
# your code here
What happens if the agents try to serve the node
1?
[ ]:
# %load snippets/ex6.py
# your code here
OK. Now, lets see what changes are needed to the CVRTPW environment to obtain SDVRPT. we only need to tweak _update_feasibility and _update_state methods on the Environment class. Everything else will remain unchanged.
``_update_feasibility``
CVRPTW behavior: Once a customer’s is visited, we mark that node as infeasible for all future visits.
SDVRPTW change: Allow a node to remain feasible until its remaining demand reaches zero. That means:
Track remaining demand at each node (initial demand minus delivered quantity).
In
_update_feasibility, a node is only masked out when its remaining demand == 0.
``_update_state``
CVRPTW behavior: When an agent visits node i, it:
Deducts the node’s full demand from the vehicle load.
Sets that node’s demand to zero.
SDVRPTW change: On visiting node i, an agent can only deliver up to its remaining vehicle capacity (and no more than the node’s remaining demand). So you must:
Compute
delivered = min(vehicle_capacity, node_remaining_demand)Subtract
deliveredfrom both the vehicle’s load and the node’s remaining demand.If a node’s remaining demand drops to zero, then subsequently
_update_feasibilitywill mask it out.
[ ]:
from maenvs4vrp.environments.sdvrptw.toy_instance_generator import ToyInstanceGenerator
from maenvs4vrp.environments.sdvrptw.env import Environment
from maenvs4vrp.environments.sdvrptw.env_agent_selector import AgentSelector
from maenvs4vrp.environments.sdvrptw.env_agent_reward import DenseReward
from maenvs4vrp.environments.sdvrptw.observations import Observations
[ ]:
gen = ToyInstanceGenerator()
sel = AgentSelector()
rew = DenseReward()
obs = Observations()
Let’s start with the _update_feasibility method:
[ ]:
# %load snippets/ex7.py
# your code here
class Environment(Environment):
def _update_feasibility(self):
_mask = self.td_state['nodes']['active_nodes_mask'].clone() * self.td_state['cur_agent']['action_mask'].clone()
# time windows constraints
loc = self.td_state['coords'].gather(1, self.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
ptime = self.td_state['cur_agent']['cur_time'].clone()
time2j = torch.pairwise_distance(loc, self.td_state["coords"], eps=0, keepdim = False)
if self.n_digits is not None:
time2j = torch.floor(self.n_digits * time2j) / self.n_digits
arrivej = ptime + time2j
waitj = torch.clip(self.td_state['tw_low']-arrivej, min=0)
service_startj = arrivej + waitj
c1 = service_startj <= self.td_state['tw_high']
c2 = service_startj + self.td_state['service_time'] + self.td_state['time2depot'] <= self.td_state['end_time'].unsqueeze(-1)
# capacity constraints (if there is no load, the agent can only return to the depot)
c3 = torch.ones_like(_mask, dtype=torch.bool, device=env.device)
#c3[self.td_state['cur_agent']['cur_load'].le(0).squeeze(-1)] = !!your code here!!
#c3[self.td_state['cur_agent']['cur_load'].le(0).squeeze(-1), self.td_state['depot_idx']] = !!your code here!!
_mask = _mask * c1 * c2 * c3
# update state
self.td_state['cur_agent'].update({'action_mask': _mask})
self.td_state['agents']['feasible_nodes'].scatter_(1,
self.td_state['cur_agent_idx'][:,:,None].expand(-1,-1,self.num_nodes), _mask.unsqueeze(1))
Now the _update_state method:
[ ]:
%load snippets/ex8.py
class Environment(Environment):
def _update_state(self, action):
loc = self.td_state['coords'].gather(1, self.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
next_loc = self.td_state['coords'].gather(1, action[:,:,None].expand(-1, -1, 2))
ptime = self.td_state['cur_agent']['cur_time'].clone()
time2j = torch.pairwise_distance(loc, next_loc, eps=0, keepdim = False)
if self.n_digits is not None:
time2j = torch.floor(self.n_digits * time2j) / self.n_digits
tw = self.td_state['tw_low'].gather(1, action)
service_time = self.td_state['service_time'].gather(1, action)
arrivej = ptime + time2j
waitj = torch.clip(tw-arrivej, min=0)
time_update = arrivej + waitj + service_time
# update agent cur node
self.td_state['cur_agent']['cur_node'] = action
self.td_state['agents']['cur_node'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cur_node'])
# update agent cur time
self.td_state['cur_agent']['cur_time'] = time_update
# is agent is done set agent time to end_time
agents_done = ~self.td_state['agents']['active_agents_mask'].gather(1, self.td_state['cur_agent_idx']).clone()
self.td_state['cur_agent']['cur_time'] = torch.where(agents_done, self.td_state['end_time'].unsqueeze(-1),
self.td_state['cur_agent']['cur_time'])
self.td_state['agents']['cur_time'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cur_time'])
# update agent cum traveled time
self.td_state['cur_agent']['cur_ttime'] = time2j
self.td_state['cur_agent']['cum_ttime'] += time2j
self.td_state['agents']['cur_ttime'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cur_ttime'])
self.td_state['agents']['cum_ttime'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cum_ttime'])
# update agent load and node demands
#cur_demands = !!your code here!!
#current_load = !!your code here!!
#load_transfer = !!your code here!!
self.td_state['cur_agent']['cur_load'] -= load_transfer
# if agent is done set agent cur_load to 0
self.td_state['cur_agent']['cur_load'] = torch.where(agents_done, 0.,
self.td_state['cur_agent']['cur_load'])
self.td_state['nodes']['cur_demands'].scatter_(1, action, cur_demands-load_transfer)
# update done nodes
self.td_state['nodes']['active_nodes_mask'] = self.td_state['nodes']['cur_demands'].gt(0)
self.td_state['nodes']['active_nodes_mask'].scatter_(1, self.td_state['depot_idx'], True)
self.td_state['agents']['cur_load'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cur_load'])
# update visited nodes
r = torch.arange(*self.td_state.batch_size, device=self.device)
self.td_state['agents']['visited_nodes'][r, self.td_state['cur_agent_idx'].squeeze(-1), action.squeeze(-1)] = True
# update agent step
self.td_state['cur_agent']['cur_step'] = torch.where(~agents_done, self.td_state['cur_agent']['cur_step']+1,
self.td_state['cur_agent']['cur_step'])
self.td_state['agents']['cur_step'].scatter_(1, self.td_state['cur_agent_idx'], self.td_state['cur_agent']['cur_step'])
# if all done activate first agent to guarantee batch consistency during agent sampling
self.td_state['agents']['active_agents_mask'][self.td_state['agents']['active_agents_mask'].sum(1).eq(0), 0] = True
self._update_feasibility()
Let’s test the environment, and repeat the steps we have performed for de CVRPTW:
[ ]:
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
[ ]:
td = env.reset()
[ ]:
td['action'] = torch.tensor([[2]])
td = env.step(td)
[ ]:
env.td_state['cur_agent']['action_mask']
[ ]:
env.td_state['cur_agent']['cur_load']
What happens if the agents now goes the node
1?
[ ]:
# %load snippets/ex9.py
# your code here
What should the new
cur_load,action_maskand nodescur_demandsbe? Check your answer.
[ ]:
# %load snippets/ex10.py
# your code here
It seems to be working!
Ex3. Capacitated Vehicle Routing Problem with Soft Time Windows (CVRPSTW)¶
In this variation of the CVRPTW, time window constraints are relaxed and can be violated at a penalty cost (usually linear proportional to the interval between opening/closing times and vehicle arrival). Although the penalty function can be defined in several ways, we consider the formulation studied in M. A. Figliozzi). Concretely, the time window violation cannot exceed \(P_{max}\), and consequently, for each customer, we can enlarge its time window to \([o_i - P_{max}, c_i + P_{max}] = [o^s_i , c^s_i]\) outside which the service cannot be performed. When a vehicle arrives at a customer at time \(t_i \in [o^s_i , c^s_i]\), it can have an early arrival penalty cost of \(p_e \max (o_i-t_i,0)\) and a late arrival penalty cost of \(p_l \max (t_i-c_i, 0)\).
Furthermore, the vehicle’s maximum waiting time at any customer, \(W_{max}\), is imposed. That is, the vehicles can only arrive at each customer after \(o_i - P_{max} - W_{max}\), so that its waiting time doesn’t exceed \(W_{max}\).
The environment for this problem has already been almost done for us. Compared to the base CVRPTW environment, early_penalty and late_penalty attributes were added to the environment and tw_high_limit, tw_high_limit, arrive_limit attributes were added to td_state.
[ ]:
from maenvs4vrp.environments.cvrpstw.env import Environment
from maenvs4vrp.environments.cvrpstw.env_agent_selector import AgentSelector
from maenvs4vrp.environments.cvrpstw.observations import Observations
from maenvs4vrp.environments.cvrpstw.toy_instance_generator import ToyInstanceGenerator
from maenvs4vrp.environments.cvrpstw.env_agent_reward import DenseReward
[ ]:
gen = ToyInstanceGenerator()
sel = AgentSelector()
rew = DenseReward()
obs = Observations()
Complete the
_update_feasibilitymethod in order to take into account the waiting time constraint:
[ ]:
# %load snippets/ex11.py
class Environment(Environment):
def _update_feasibility(self):
_mask = self.td_state['nodes']['active_nodes_mask'].clone() * self.td_state['cur_agent']['action_mask'].clone()
# time windows constraints
loc = self.td_state['coords'].gather(1, self.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
ptime = self.td_state['cur_agent']['cur_time'].clone()
time2j = torch.pairwise_distance(loc, self.td_state["coords"], eps=0, keepdim = False)
if self.n_digits is not None:
time2j = torch.floor(self.n_digits * time2j) / self.n_digits
arrivej = ptime + time2j
waitj = torch.clip(self.td_state['tw_low_limit']-arrivej, min=0)
service_startj = arrivej + waitj
#c0 = !! your code here !! # agents can only arrive at each customer after $o_i - P_{max} - W_{max}$
c1 = service_startj <= self.td_state['tw_high_limit']
c2 = service_startj + self.td_state['service_time'] + self.td_state['time2depot'] <= self.td_state['end_time'].unsqueeze(-1)
# capacity constraints
c3 = self.td_state['demands'] <= self.td_state['cur_agent']['cur_load']
_mask = _mask * c0 * c1 * c2 * c3
# update state
self.td_state['cur_agent'].update({'action_mask': _mask})
self.td_state['agents']['feasible_nodes'].scatter_(1,
self.td_state['cur_agent_idx'][:,:,None].expand(-1,-1,self.num_nodes), _mask.unsqueeze(1))
Complete the the
DenseRewardclass in order to take into acount the penalty for time windows violation:
(hint: check td_state['cur_agent']['cur_penalty'] )
[ ]:
%load snippets/ex12.py
class DenseReward(DenseReward):
"""Reward class.
"""
def get_reward(self, action):
"""
"""
# your code here!!
return reward, penalty
[ ]:
rew = DenseReward()
[ ]:
env = Environment(instance_generator_object=gen,
obs_builder_object=obs,
agent_selector_object=sel,
reward_evaluator=rew,
seed=0)
[ ]:
td = env.reset()
Let’s get some information about the environment:
[ ]:
fig = plt.figure(figsize=(3,3))
plt.plot(env.td_state['coords'][0][:,0].numpy(), env.td_state['coords'][0][:,1].numpy(), 'o')
plt.plot(env.td_state['coords'][0][0,0].numpy(), env.td_state['coords'][0][0,1].numpy(), 'o', color='red' )
[ ]:
for k, data in enumerate(zip(env.td_state['tw_low'][0].tolist(), env.td_state['tw_high'][0].tolist(), env.td_state['demands'][0].tolist())):
print(f'node {k} time window is: [{data[0]}; {data[1]}], with demand {data[2]}')
[ ]:
for k, data in enumerate(zip(env.td_state['tw_low_limit'][0].tolist(), env.td_state['tw_high_limit'][0].tolist(), env.td_state['arrive_limit'][0].tolist())):
print(f'node {k} time window limit is: [{data[0]:.2f}; {data[1]:.2f}], with arrive time limit {data[2]:.2f}')
For the active agent in the depot, the times (distances) to customers will be:
[ ]:
loc = env.td_state['coords'].gather(1, env.td_state['cur_agent']['cur_node'][:,:,None].expand(-1, -1, 2))
time2j = torch.pairwise_distance(loc, env.td_state["coords"], eps=0, keepdim = False)
time2j[0]
Make some environment steps to check if our implementation is correct.
What
rewardandpenaltyvalues are expected?
[ ]:
# %load snippets/ex13.py
[ ]:
# your code here
[ ]:
# your code here
[ ]:
# your code here
Let’s do an episode rollout and check the reward and penalty through every step:
[ ]:
td = env.reset()
while not td["done"].all():
td = env.sample_action(td)
td = env.step(td)
step = env.env_nsteps
reward = td['reward']
penalty = td['penalty']
print(f'env step number:{step}, reward: {reward}, penalty: {penalty}')