import numpy as np
import numba as nb
import pandas as pd
from typing import List
import random
from evnrg.common.status import Status
from .eligibility import (
EligibilityRules,
ECode
)
from .eligibility import stop_eligibility
from evnrg.common.powertrain import Powertrain, PType
from .bracket import Bracket
from evnrg.common.plug import DCPlug
__all__ = [
'Vehicle'
]
@nb.njit
def next_stop(distance_a: np.array, begin: int):
i = begin
length = distance_a.shape[0]
d = distance_a[begin]
if d > 0:
while i < length and d > 0:
d = distance_a[i]
i += 1
return ECode(
begin_index=begin,
end_index=i,
code=Status.DRIVING
)
@nb.njit
def next_trip(distance_a: np.array, begin: int):
i = begin
length = distance_a.shape[0]
while i < length and distance_a[i] == 0.:
i += 1
return i
@nb.njit
def next_charging_opportunity(distance_array: np.array,
begin: int,
vidx: int, rules: EligibilityRules):
"""Finds the next Bracket that's a viable charging opportunity.
This is a JITted function.
Args:
current_bracket (Bracket): The `Bracket` to start searching from.
distance_array (numpy.array): A 1-D Numpy distance array.
vidx (int): The vehicle column index to use for mask lookup
rules (EligibilityRules): The `EligibilityRules` object to use
"""
if not (distance_array.ndim == 1):
raise ValueError('Array of zero length supplied.')
i = begin
alen = distance_array.shape[0]
code = ECode(alen, alen, Status.HOME_ELIGIBLE)
if distance_array[begin] == 0. and distance_array[begin + 1] > 0.:
while i < alen:
if distance_array[i] > 0.:
i += 1
else:
code = stop_eligibility(distance_array, i, vidx, rules)
acceptable_codes = (
Status.HOME_ELIGIBLE,
Status.AWAY_ELIGIBLE
)
if code.code in acceptable_codes:
break
else:
i = code.end_index
return code
@nb.njit
def distance_between_brackets(distance_array: np.array, b_start: Bracket,
b_end: Bracket):
"""Returns the distance between two brackets
Args:
distance_array (numpy.array): A 1-D distance array
b_start (Bracket): The `Bracket` to start from
b_end (Bracket): The `Bracket` to end at
Returns:
A `float` of the total distance traveled in the bracket.
"""
return distance_array[b_start.end:b_end.begin].sum()
@nb.njit
def is_last_bracket(distance_array: np.array, bkt: Bracket):
"""Returns if the bracket is the last bracket in thedistance array
Args:
distance_array (numpy.array): The 1-D `numpy.array` to test against
bkt (Bracket): The `Bracket` to test
Returns:
`bool`
"""
return bkt.end >= distance_array.shape[0]
@nb.njit
def rewrite_deferred_trips(distance_array: np.array,
deferred_array: np.array,
begin: int,
target: int,
begin_ev_range: float,
max_ev_range: float,
range_added_per_interval: float = 0.,
soc_buffer: float = 0.):
"""Searches for next eligible stop for BEVs, and defers trips as necessary.
If a BEV's range can't achieve the distance, the trips are deferred until
the distance is less than the range available. Additionally, This allows
vehicles in a queue to stay in it, or vehicles to continue charging.
Note:
This function modifies the distance and deferred arrays in-place.
Args:
distance_array (numpy.array): The distance array to use
deferred_array (numpy.array): The array that will be used to record
deferred distance
begin (int): The index to begin the operation at
vidx (int): Vehicle column index (fleet_id)
begin_status (int): The `Status` at the beginning index.
begin_ev_range (float): Current range remaining in vehicle's battery
max_ev_range (float): Maximum range the vehicle's battery can have
rules (EligibilityRules): the `EligibilityRules` to use
range_added_per_interval (:obj: `float`, optional): Allows range to be
added if the current state is `Status.CONNECTED`.
Returns:
A revised `Status`.
"""
# Only worry about doing this if:
# 1: The vehicle is stopped
# and 2: The ehicle is a BEV. PHEVs and ICEVs can leave any time.
# and 3: The vehicle is about to leave, i.e.: the next interval
# is a distance > 0
max_len = distance_array.shape[0]
if begin < max_len - 1:
ev_range = begin_ev_range
socmult = 1.0 - soc_buffer
required_dist = distance_array[begin:target].sum()
i = begin
while i < target and (ev_range * socmult) < required_dist:
# is the index advanced to a trip?
if distance_array[i] > 0:
j = i
# Find the end of the trip
while distance_array[j] > 0:
deferred_array[j] = distance_array[j]
distance_array[j] = 0.0
j += 1
# Defer the trip in this window
required_dist = distance_array[begin:target].sum()
# Add range if we're connected
#range_added = 0.
#if range_added_per_interval > 0.:
# range_added = (j - i) * range_added_per_interval
# ev_range = min(range_added + ev_range, max_ev_range)
# Move up the index
i = j
else:
# Iterate through a non-eligible stop
i += 1
return distance_array, deferred_array
@nb.njit
def drive_to_next_stop(begin: int, distance_a: np.array, fuel_a: np.array,
battery_a: np.array, battery_start: float,
ev_efficiency: float, fuel_efficiency: float):
"""Efficiently drives a vehicle through a drive bracket and stores energy data.
Args:
distance_a (numpy.array): 1-D Numpy distance array
fuel_a (numpy.array); 1-D Numpy array to record fuel use
battery_a (numpy.array): 1-D Numpy array that holds battery state
battery_start (float): The starting battery energy for the vehicle
ev_efficiency (float): The efficiency of electric operation in km/kWh.
For ICEVs, this should be zero.
fuel_efficiency (float): The efficiency of combustion operation
in km/L. For BEVs, this should be zero.
Returns:
An `int` indicating the number of intevals traveled.
Raises:
ValueError: if the EV distance and Ice distance do not add up to the
interval's distance (meaning the EV could not make it).
"""
error_ = .001
# Double check we're driving
i = begin
if distance_a[i] > 0.:
max_len = distance_a.shape[0]
while (i < max_len) and (distance_a[i] > 0.):
distance = distance_a[i]
e_dist = 0.
fuel_dist = 0.
# Check if we have a pev
if ev_efficiency > 0.:
# Do this to make sure we don't try to access index of -1
battery_state = battery_start
if i > 0:
battery_state = battery_a[i - 1]
# Only continue if we actually have battery power left
if battery_state > 0.:
# Process battery
battery_required = distance / ev_efficiency
battery_used = min(battery_state, battery_required)
battery_a[i] = battery_state - battery_used
e_dist = round(battery_used * ev_efficiency, 4)
if fuel_efficiency > 0.:
fuel_dist = distance - e_dist
fuel_a[i] = fuel_dist / fuel_efficiency
i += 1
return i
[docs]class Vehicle(object):
"""The primary driving logic object.
The `Vehicle` object keeps track of all data and operations specific
to an individual vehicle. Many of the operations are sped up through
external JITed functions.
Attributes:
fleet_id (int): The column index of the vehicle for use when
referencing 2-D array or positions in a `Fleet` object.
powertrain (Powertrain): The `'Powertrain` used for this vehicle's
energy calculations.
max-soc (float): The maximum state of charge (0.0 to 1.0) this
vehicle's battery may reach. This will be handy for modeling
battery degradation. For now, it defaults to 1.
evse_power (float): The effective power level in kW that this vehicle
is connected to. If the vehicle is not connected to EVSE, this
value is 0.
distance_a (numpy.array): A 1-D array of type `numpy.float32` that
contains the interval distance data. Unless deferring and
rewriting trips, this array should generally be treated as
read-only.
fuel_burned_a (numpy.array): A 1-D array of type `numpy.float32` that
holds fuel consumption data.
deferred_a (numpy.array): A 1-D array of type `numpy.float32` that
holds deferred distance. When BEVs have to defer trips, the
distances are copied to this array before the window in
`distance_a` is set to zero. This offers useful data about trip
completion by BEVs.
battery_a (numpy.array): A 1-D array of type 'numpy.float32` that
reflects the battery energy of the vehicle over time.
idx (int): The internal index for the `Vehicle` object.
begin_energy (float): The energy the vehicle starts with. This value
is referenced in battery operations when `idx` is 0.
status (int): indicates the current status of the vehicle. The value
is held by an `int`, but is generally set by referencing the
`Status` object (which is an `enum.IntEnum`). Do not try to
evaluate this value on its own. Instead use `Status`
(e.g. ``vehicle.status == Status.CONNECTED``
Args:
fleet_index (int): The vehicle column index from a `Fleet` object.
ptrain (Powertrain): The `Powertrain` object that will be used with
this vehicle.
distance (numpy.array): A 1-D `numpy.array` of type `numpy.float32`
that holds the interval distance data for this vehicle.
rules (EligibilityRules): The `EligibilityRules` that will be used
to evaluate the initial charging eligibility if `start_soc` is
less than 1.0.
start_soc (float): The initial state of charge of the vehicle. Defaults
to 1.0 (100%).
"""
__slots__ = (
'fleet_id',
'powertrain',
'max_soc',
'evse_power',
'distance_a',
'fuel_burned_a',
'deferred_a',
'battery_a',
'idx',
# 'bracket',
'begin_energy',
'status',
'soc_buffer'
)
def __init__(self, fleet_index: int,
ptrain: Powertrain, distance: np.array,
rules: EligibilityRules, start_soc: float = 1.,
soc_buffer: float = 0):
self.begin_energy = ptrain.energy_at_soc(start_soc)
self.fleet_id = fleet_index
self.powertrain = ptrain
self.battery_a = np.zeros(distance.shape, dtype=np.float32)
self.max_soc = 1 if ptrain.pev else 0
self.evse_power = 0.
self.distance_a = distance
self.fuel_burned_a = np.zeros(len(distance), dtype=np.float32)
self.soc_buffer = float(soc_buffer)
self.deferred_a = np.zeros(len(distance), dtype=np.float32)
self.idx = 0
self.status = Status.DRIVING
if distance[0] == 0.:
if start_soc < 1.:
self.status = stop_eligibility(0, distance, fleet_index, rules)
else:
self.status = Status.STOPPED
# self.bracket = Bracket(0, fleet_index, distance_array, rules)
@property
def battery_state(self):
"""Provides the "incoming" battery state (from the previous index).
Returns:
A `float` representing the battery energy.
"""
if self.idx > 0:
return self.battery_a[self.idx - 1]
return self.begin_energy
@property
def intervals(self):
"""The length of the distance array.
Returns:
A length of type `int`.
Raises:
`AssertionError` if the length is zero.
"""
d = self.distance_a.shape[0]
assert d > 0, \
'Cannot use a zero-length drive profile.'
return d
@property
def is_driving(self):
"""Returns a `bool` indicating if the vehicle is driving."""
return self.distance_a[self.idx] > 0
@property
def is_stopped(self):
"""Returns the opposite of `is_driving`."""
return not self.is_driving
@property
def is_new_stop(self):
out = False
if self.idx > 0:
out = self.is_stopped and self.distance_a[self.idx - 1] > 0
return out
@property
def will_drive_next(self):
"""Indicates whether the next interval will start a trip."""
return self.is_stopped and self.distance_a[self.idx + 1] > 0
@property
def elapsed_distance(self):
"""Returns the elapsed distance for this simulation"""
return np.sum(self.distance_a[:self.idx])
@property
def total_distance(self):
"""Returns the total distance for the simuilation.
Note:
This value will change if trips are deferred!
"""
return np.sum(self.distance_a)
@property
def progress(self):
"""Returns how far through its distance array the vehicle is.
The value is expressed as a `float` percentage wher 1.0 is 100%.
"""
return (self.idx + 1) / self.intervals
@property
def distance(self):
return self.distance_a[self.idx]
@property
def soc(self):
"""Returns the battery state of charge as a percentage."""
out = 0
if self.powertrain.batt_cap > 0:
out = self.battery_a[self.idx - 1] / self.powertrain.batt_cap
return out
@property
def ev_range(self):
"""Returns the range (in km) the battery has remaining."""
return self.battery_a[self.idx - 1] * self.powertrain.ev_eff
@property
def max_ev_range(self):
"""The electric range the vehicle would have on a fully-charged
battery.
"""
return self.powertrain.batt_cap * self.max_soc * self.powertrain.ev_eff
@property
def evse_connected(self) -> bool:
"""Indicates if the attribute `evse_power` is greater than zero.
A value of `True` indicates that an EVSE is connected to the vehicle.
"""
return self.evse_power > 0.
[docs] def index_sync(self, index: int):
"""Reports back how many intervals ahead or behind the vehicle is.
Args:
index (int): The index to check against
Returns:
An `int` indicating the differnce between `index` and the
vehicle's index. A negative number indicates the vehicle is
ahead. A positive number indicates it is behind. Zero
indicates the indexes are in sync.
"""
return index - self.idx
[docs] def increment_index(self, index: int):
"""Advances the vehicle's index, but only if ithe vehicle is behind.
Args:
index (int): The index to attempt to advance to.
Returns:
An `int` indicating the number intervals behind or forward
the vehicle's internal index is.
"""
if index > self.idx:
self.idx += 1
return index - self.idx
[docs] def set_battery(self, val: float, previous: bool = True):
"""Sets the battery energy.
Args:
val (float): The value to set the battery to.
Nomalizes to be between 0 and the battery's max.
previous (:obj: `bool`, optional): Whether to set the
previous index's value. Defaults to `True`.
If the current index is 0, then sets the beginning
value for the vehicle.
"""
true_val = max(0, min(self.powertrain.batt_cap, val))
i = self.idx - 1 if previous else self.idx
if i >= 0:
self.battery_a[i] = true_val
else:
self.begin_energy = true_val
[docs] def delta_battery(self, delta: float):
"""Sets the current index's battery state.
The battery will be normalized to zero or the battery max.
Args:
delta (float): The change in the battery.
Returns:
A `float` representing the current battery energy.
"""
val = self.battery_state + delta
true_val = max(0., min(self.powertrain.batt_cap, val))
self.battery_a[self.idx] = true_val
return true_val
[docs] def evse_compatible(self, dcfc: bool, plugs=None):
if dcfc:
if self.powertrain.dc_capable:
if self.powertrain.dc_plug in plugs:
return True
return False
return True
[docs] def connect_evse(self, power, dcfc, plugs=None):
"""Attempts to connect EVSE to the vehicle.
Checks to see if the connection is compatible.
Args:
power (float): The input power from the EVSE
dcfc (bool): Whether the input is DC.
plugs (:obj: iterable, optional): An iterable
containing the DC connectors available at
this EVSE. Defaults to None.
This is only needed for DC connections. AC
connections are assumed to be J1772.
Returns:
`False` if connection fails, or a `float` representing
the effective power level of the conenction (in kW).
"""
if dcfc and self.powertrain.dc_power > 0.:
# Check for DCFC plug compatibility
if self.powertrain.dc_plug in plugs:
self.evse_power = min(self.powertrain.dc_power, power)
self.status = Status.CONNECTED
return self.evse_power
elif not dcfc and self.powertrain.ac_power > 0.:
self.evse_power = min(self.powertrain.ac_power, power)
self.status = Status.CONNECTED
return self.evse_power
return False
[docs] def disconnect_evse(self):
"""Disconnects the EVSE and sets the power level to zero."""
self.status = Status.DISCONNECTED
self.evse_power = 0.
[docs] def charge_battery(self, energy: float):
"""Charges the battery.
Essentially a wrapper for `Vehicle.delta_battery()`,
esxcept that this function checks to ensure there
is EVSE connected.
Args:
energy (float): The energy to add
Returns:
A `float` representing net energy added.
This adjusts for the fact the battery may
be nearly full.
"""
if self.powertrain.pev and self.evse_connected:
new_state = self.delta_battery(energy)
return self.battery_state - new_state
return False
[docs] def attempt_defer_trips(self, rules: EligibilityRules,
min_per_interval: float):
if self.idx + 1 < self.distance_a.shape[0]:
if (self.powertrain.bev and self.will_drive_next):
acceptable_codes = (
Status.HOME_ELIGIBLE
)
max_len = self.distance_a.shape[0]
target_index = next_trip(self.distance_a, self.idx)
el_stop = None
while el_stop is None and target_index < max_len:
stop = next_stop(self.distance_a, target_index)
code = stop_eligibility(
self.distance_a,
stop.end_index,
self.fleet_id,
rules,
use_mask=False
)
if code.begin_index == code.end_index:
target_index = max_len
elif code.code == Status.HOME_ELIGIBLE:
el_stop = code
else:
target_index = code.end_index
a, b = rewrite_deferred_trips(
self.distance_a,
self.deferred_a,
self.idx,
target_index,
self.ev_range,
self.powertrain.energy_at_soc(0.9),
self.evse_power * (min_per_interval/60.0),
self.soc_buffer
)
self.distance_a = a
self.deferred_a = b
# Recheck eligibility
if self.powertrain.pev and self.status == Status.INELIGIBLE:
new_code = stop_eligibility(
self.distance_a,
self.idx,
self.fleet_id,
rules,
use_mask=False
)
self.status = new_code.code
[docs] def advance_index(self, rules: EligibilityRules, min_per_interval: float):
max_len = self.distance_a.shape[0]
if self.idx < max_len:
driving = self.distance_a[self.idx] > 0
drive_next = False
if self.idx < max_len - 1:
drive_next = self.distance_a[self.idx + 1] > 0
# Check for the end of a drive bracket
if driving:
code: ECode
code = next_stop(self.distance_a, self.idx)
try:
stop_index = drive_to_next_stop(
self.idx,
self.distance_a[:],
self.fuel_burned_a[:],
self.battery_a[:],
self.battery_state,
self.powertrain.ev_eff,
self.powertrain.ice_eff
)
except AssertionError:
pass
self.idx = stop_index
code = stop_eligibility(
self.distance_a,
self.idx,
self.fleet_id,
rules
)
self.status = code.code
elif drive_next:
self.attempt_defer_trips(rules, min_per_interval)
self.idx += 1
if self.idx < max_len:
self.battery_a[self.idx] = self.battery_a[self.idx - 1]
else:
self.idx += 1
if self.idx < max_len:
self.battery_a[self.idx] = self.battery_a[self.idx - 1]