import enum
import random
import math
import copy
from typing import List
import numpy as np
from .evse import EVSE
from ..simulation.vehicle import Vehicle
from .status import Status
__all__ = [
'QueueMode',
'Bank'
]
[docs]class QueueMode(enum.IntEnum):
DEFAULT = enum.auto()
STACK = enum.auto() # Last-in, first out
RANDOM = enum.auto()
SOC = enum.auto()
TIME = enum.auto()
RSOC = enum.auto()
RTIME = enum.auto()
[docs] @classmethod
def lookup(cls, s: str):
return {
'default': QueueMode.DEFAULT,
'stack': QueueMode.STACK,
'random': QueueMode.RANDOM,
'soc': QueueMode.SOC,
'time': QueueMode.TIME,
'rsoc': QueueMode.RSOC,
'rtime': QueueMode.RTIME
}.get(s, QueueMode.DEFAULT)
ALLOWED_QUEUE_MODES = {
QueueMode.DEFAULT,
QueueMode.STACK,
QueueMode.RANDOM,
QueueMode.SOC,
QueueMode.TIME,
QueueMode.RSOC,
QueueMode.RTIME
}
[docs]class Bank(object):
__slots__ = (
'max_power',
'capacity',
'queue_probability',
'dynamic_size',
'queue_mode',
'queue',
'available_evse',
'occupied_evse',
'demand_profile',
'occupancy_profile'
)
def __init__(self, max_power: float = 0., capacity: float = 0., queue_probability: float = 1.,
dynamic_size: bool = False, queue_mode: QueueMode = QueueMode.DEFAULT,
evse: list = [], demand_profile: np.array = np.empty(0),
occupancy_profile: np.array = np.empty(0)):
self.max_power = max_power
self.capacity = capacity
self.queue_probability = queue_probability
self.dynamic_size = dynamic_size
self.queue_mode = queue_mode
self.queue = []
self.available_evse = evse
self.occupied_evse = []
self.demand_profile = demand_profile
self.occupancy_profile = occupancy_profile
@property
def num_available(self):
return len(self.available_evse)
@property
def occupancy(self) -> float:
return float(len(self.occupied_evse) / self.size)
@property
def num_occupied(self):
return len(self.occupied_evse)
@property
def size(self):
return self.num_available + self.num_occupied
@property
def num_operating(self) -> int:
x = 0
for evse in self.occupied_evse:
if evse.is_connected:
x += 1
return x
@property
def total_demand(self) -> float:
x = 0.0
for evse in self.occupied_evse:
if evse.is_connected:
x += evse.demand
return x
@property
def pct_capacity(self):
return self.total_demand / self.capacity
[docs] def record_demand(self, idx: int):
self.demand_profile[idx] = self.total_demand
[docs] def record_occupancy(self, idx: int):
self.occupancy_profile[idx] = self.num_occupied
[docs] def add_evse_with_rules(self, fleet_size: int,
chargers: List[EVSE], rules: dict):
for evse, rule in zip(chargers, rules):
min_num = rule.get('minimum', 1)
max_num = rule.get('maximum', 0)
ratio = rule.get('evse_per_vehicle', 0)
num_to_add = max(min_num, fleet_size*ratio)
if max_num > 0:
num_to_add = min(num_to_add, max_num)
num_to_add = math.ceil(num_to_add)
for i in range(num_to_add):
self.available_evse.append(copy.deepcopy(evse))
if self.dynamic_size:
break
if self.dynamic_size:
break
[docs] def charge_connected(self, minutes_per_interval: float, idx: int):
demand = 0.
occ = 0
for evse in self.occupied_evse:
if evse.vehicle.idx <= idx:
demand += evse.charge_vehicle(minutes_per_interval)
occ += 1
self.demand_profile[idx] = demand
self.occupancy_profile[idx] = occ / self.size
[docs] def set_profile_length(self, size: int):
self.demand_profile = np.zeros(size, dtype=np.float32)
self.occupancy_profile = np.zeros(size, dtype=np.uint8)
[docs] def add_evse(self, evse: EVSE):
self.available_evse.append(evse)
self.size = min(255, self.size + 1)
self.capacity += evse.max_power
[docs] def remove_evse(self, evse: EVSE):
out = True
try:
self.capacity = max(0, self.capacity - evse.max_power)
self.available_evse.remove(evse)
self.size = max(0, self.size - 1)
except Exception:
out = False
return out
[docs] def enqueue_vehicle(self, vic: Vehicle):
if vic.powertrain.pev:
vic.status = Status.IN_QUEUE
self.queue.append(vic)
[docs] def enqueue_vehicle_prob(self, vic: Vehicle):
out = False
if vic.powertrain.pev:
if random.random() <= self.queue_probability:
self.enqueue_vehicle(vic)
out = True
return out
[docs] def dequeue_vehicle(self, vic: Vehicle):
out = False
try:
self.queue.remove(vic)
out = True
except Exception:
out = False
return out
[docs] def dequeue_vehicle_index(self, fleet_index: int):
out = None
for v in self.queue:
if v.fleet_index == fleet_index:
if self.dequeue_vehicle(v):
out = v
return out
[docs] def next_available_evse(self):
out = None
if self.available_evse:
if not self.available_evse[0].is_connected:
out = self.available_evse[0]
return out
[docs] def dequeue_early_departures(self, idx: int):
departing = []
for v in self.queue:
if v.idx <= idx:
if v.is_driving:
if self.dequeue_vehicle(v):
departing.append(v)
return departing
[docs] def pop_next_vehicle(self):
winner = None
if self.queue:
# Default queue mode
winner = self.queue[0]
# Stack
if self.queue_mode == QueueMode.STACK:
winner = self.queue[-1]
# Random
elif self.queue_mode == QueueMode.RANDOM:
winner = self.queue[random.randrange(len(self.queue))]
# SoC (lowest first)
elif self.queue_mode == QueueMode.SOC:
for v in self.queue[1:]:
if v.soc < winner.soc:
winner = v
# SoC (Highest first)
elif self.queue_mode == QueueMode.RSOC:
for v in self.queue[1:]:
if v.soc > winner.soc:
winner = v
# Time to charge (assume 6kW power)
elif self.queue_mode == QueueMode.TIME:
for v in self.queue[1:]:
v_time = v.minutes_to_charged(6, 1)
winner_time = winner.minutes_to_charged(6, 1)
if v_time > winner_time:
winner = v
# Lowest time to charge
elif self.queue_mode == QueueMode.RTIME:
for v in self.queue[1:]:
v_time = v.minutes_to_charged(6, 1)
winner_time = winner.minutes_to_charged(6, 1)
if v_time < winner_time:
winner = v
if not self.dequeue_vehicle(winner):
winner = None
return winner
[docs] def process_queue(self):
"""Checks each EVSE in the bank.
If the EVSE is not connected to a vehicle, dequeue one and connect it.
"""
num_connected = 0
# Only do work if there are available EVSE
if self.num_available > 0:
# If we're dynamically sized, conect every Vehicle
# that is compatible with the type of charger.
# If a Vehicle is ineligible (incompatible or the SoC is too high),
# remove it from the queue
if self.dynamic_size:
while self.queue:
vehicle = self.pop_next_vehicle()
evse: EVSE
evse = self.available_evse[0]
if vehicle.evse_compatible(evse.model.dc, evse.model.dc_plugs):
evse = copy.deepcopy(self.available_evse[0])
if evse.connect_vehicle(vehicle):
self.occupied_evse.append(evse)
num_connected += 1
# Otherwise, process a queue normally
else:
ineligible_vehicles = []
# As long as there is EVSE available and there are still
# vehicles in the queue
while self.num_available > 0 and self.queue:
vehicle = self.pop_next_vehicle()
# Check for a valid vehicle
if vehicle is not None:
connected = False
for evse in self.available_evse:
# Check if the EVSE can accept connections
# and the EVSE is compatible with the Vehicle and
# its needs
if evse.connect_vehicle(vehicle):
num_connected += 1 # Update our count
# Move the EVSE to the list of occupied
self.available_evse.remove(evse)
self.occupied_evse.append(evse)
# Remember that we connected this one
connected = True
# Break out of the loop;
# No need to look at other EVSE
break
# If we failed to connect, the vehicle is ineligible
# for any of the available EVSE.
if not connected:
ineligible_vehicles.append(vehicle)
# Now that we're done popping Vehicles,
# requeue the ineligible ones so they can wait for EVSE that
# will meet their needs
# First, check if we're using a stack, so we can re-stack the
# queue
if self.queue_mode == QueueMode.STACK:
ineligible_vehicles = ineligible_vehicles[::-1]
for v in ineligible_vehicles:
self.queue.append(v)
# Return the number of vehicles that got to connect
return num_connected
[docs] def disconnect_completed_vehicles(self, idx: int):
"""
Disconnects any vehicles that are effectively done charging.
"""
num_disconnected = 0
for evse in self.occupied_evse:
if evse.is_connected and not evse.v2g_mode:
vehicle = evse.vehicle
if vehicle.idx <= idx:
if evse.charge_completed:
if evse.disconnect_vehicle() is not None:
num_disconnected += 1
self.occupied_evse.remove(evse)
if not self.dynamic_size:
self.available_evse.append(evse)
return num_disconnected
[docs] def disconnect_departing_vehicles(self, idx: int):
num_disconnected = 0
for evse in self.occupied_evse:
if evse.is_connected:
vehicle = evse.vehicle
if vehicle.idx <= idx:
if evse.vehicle.is_driving:
v = evse.vehicle
if evse.disconnect_vehicle() is not None:
v.status = Status.DRIVING
num_disconnected += 1
self.occupied_evse.remove(evse)
if not self.dynamic_size:
self.available_evse.append(evse)
return num_disconnected