Python API
moss
moss.map
Map Objects
class Map()
parse_standard_junction
def parse_standard_junction()
Parse all the junctions in the map as standard junctions. 4 in roads and 4 out roads.
moss.engine
LaneChange Objects
class LaneChange(Enum)
NONE
: Vehicles will not do lane change according to road condition.
SUMO
: Vehicles will attempt to switch to a faster lane, as in SUMO.
MOBIL
: Vehicles will take the action that maximize the change in acceleration of self and affected vehicles, as in MOBIL.
Engine Objects
class Engine()
Moss Engine
NOTE: Cannot create multiple Engines on different device. For that purpose, use moss.parallel.ParallelEngine
instead.
vehicle_count
@property
def vehicle_count() -> int
The number of vehicles in the agent file
lane_count
@property
def lane_count() -> int
The number of lanes
road_count
@property
def road_count() -> int
The number of roads
junction_count
@property
def junction_count() -> int
The number of junctions
get_map
def get_map()
Get the Map object
get_agents
def get_agents()
Get the Agents object
get_current_time
def get_current_time() -> float
Get the current time
get_running_vehicle_count
def get_running_vehicle_count() -> int
Get the total number of running vehicles
get_lane_statuses
def get_lane_statuses() -> NDArray[np.int8]
Get the traffic light status of each lane, 0
-green / 1
-yellow / 2
-red / 3
-restriction
get_lane_geoms
def get_lane_geoms() -> List[NDArray[np.float32]]
Get the geometry of each lane, [[x1,y1], ..., [xn,yn]]
get_lane_lengths
def get_lane_lengths() -> NDArray[np.int8]
Get the length of each lane
get_lane_vehicles
def get_lane_vehicles() -> Union[Dict[str, List[str]], List[List[int]]]
Get the list of vehicles of each lane
get_lane_vehicle_counts
def get_lane_vehicle_counts() -> NDArray[np.int32]
Get the number of vehicles of each lane
get_lane_waiting_vehicle_counts
def get_lane_waiting_vehicle_counts(
speed_threshold: float = 0.1) -> Union[Dict[str, int], List[int]]
Get the number of vehicles of each lane with speed lower than speed_threshold
get_lane_waiting_at_end_vehicle_counts
def get_lane_waiting_at_end_vehicle_counts(
speed_threshold: float = 0.1,
distance_to_end: float = 100) -> Union[Dict[str, int], List[int]]
Get the number of vehicles of each lane with speed lower than speed_threshold
and distance to end lower than distance_to_end
get_lane_ids
def get_lane_ids() -> NDArray[np.int32]
Get the ids of the lanes
get_junction_ids
def get_junction_ids() -> NDArray[np.int32]
Get the ids of the junctions
get_junction_lanes
def get_junction_lanes() -> List[List[int]]
Get the index
of the lanes inside each junction
get_junction_inout_lanes
def get_junction_inout_lanes() -> Tuple[List[List[int]], List[List[int]]]
Get the index
of the in
and out
lanes of each junction
get_junction_phase_lanes
def get_junction_phase_lanes() -> List[List[Tuple[List[int], List[int]]]]
Get the index
of the in
and out
lanes of each phase of each junction
get_junction_phase_ids
def get_junction_phase_ids() -> NDArray[np.int32]
Get the phase id of each junction, -1
if it has no traffic lights
get_junction_phase_counts
def get_junction_phase_counts() -> NDArray[np.int32]
Get the number of available phases of each junction
get_junction_dynamic_roads
def get_junction_dynamic_roads() -> List[List[int]]
Get the ids of the dynamic roads connected to each junction
get_road_lane_plans
def get_road_lane_plans(road_index: int) -> List[List[slice]]
Get the dynamic lane plan of the road road_index
,
represented as list of lane groups:
[
[slice(lane_start, lane_end), ...]
]
get_vehicle_lanes
def get_vehicle_lanes() -> Union[Dict[str, int], NDArray[np.int32]]
Get the lane index of each vehicle, -1
for not running on the road
get_vehicle_speeds
def get_vehicle_speeds() -> Union[Dict[str, float], NDArray[np.float32]]
Get the speed of each vehicle, -1
for not running on the road
get_vehicle_statuses
def get_vehicle_statuses() -> NDArray[np.int8]
Get the status of each vehicle, 1
-driving / 2
-finished / 0
-otherwise
get_vehicle_raw_statuses
def get_vehicle_raw_statuses() -> NDArray[np.int8]
Get the raw status of each vehicle, only for debugging.
get_vehicle_traveling_or_departure_times
def get_vehicle_traveling_or_departure_times() -> NDArray[np.float32]
- For
finished
vehicles, it is the traveling time. - For
driving
vehicles, it is the negative departure time.
get_finished_vehicle_count
def get_finished_vehicle_count() -> int
Get the number of the finished vehicles
get_finished_vehicle_average_traveling_time
def get_finished_vehicle_average_traveling_time() -> float
Get the average traveling time of the finished vehicles
get_running_vehicle_average_traveling_time
def get_running_vehicle_average_traveling_time() -> float
Get the average traveling time of the running vehicles
get_departed_vehicle_average_traveling_time
def get_departed_vehicle_average_traveling_time() -> float
Get the average traveling time of the departed vehicles (running+finished)
get_vehicle_distances
def get_vehicle_distances() -> Union[Dict[str, float], NDArray[np.float32]]
Get the traveled distance of each vehicle on the current lane
get_vehicle_total_distances
def get_vehicle_total_distances(
) -> Union[Dict[str, float], NDArray[np.float32]]
Get the total traveled distance of each vehicle
get_vehicle_positions
def get_vehicle_positions() -> NDArray[np.float64]
Get the (x,y,dir) of each running vehicle
get_vehicle_positions_all
def get_vehicle_positions_all() -> NDArray[np.float64]
Get the (x,y,dir) of all vehicles
get_vehicle_id_positions
def get_vehicle_id_positions() -> NDArray[np.float64]
Get the (id,x,y,dir) of each vehicle
get_road_lane_plan_index
def get_road_lane_plan_index(road_index: int) -> int
Get the lane plan of road road_index
get_road_vehicle_counts
def get_road_vehicle_counts() -> NDArray[np.int32]
Get the number of vehicles of each road
get_road_waiting_vehicle_counts
def get_road_waiting_vehicle_counts(
speed_threshold: float = 0.1) -> Union[Dict[str, int], List[int]]
Get the number of vehicles with speed lower than speed_threshold
of each road
set_vehicle_enable
def set_vehicle_enable(vehicle_index: int, enable: bool)
Enable or disable vehicle vehicle_index
set_vehicle_enable_batch
def set_vehicle_enable_batch(vehicle_indices: List[int],
enable: Union[bool, List[bool]])
Enable or disable vehicles in vehicle_indices
set_tl_policy
def set_tl_policy(junction_index: int, policy: TlPolicy)
Set the traffic light policy of junction junction_index
to policy
set_tl_policy_batch
def set_tl_policy_batch(junction_indices: List[int], policy: TlPolicy)
Set the traffic light policy of all junctions in junction_indices
to policy
set_tl_duration
def set_tl_duration(junction_index: int, duration: int)
Set the traffic light switch duration of junction junction_index
to duration
NOTE: This is only effective for TlPolicy.FIXED_TIME
and TlPolicy.MAX_PRESSURE
.
NOTE: Set duration to 0
to use the predefined duration in the map_file
set_tl_duration_batch
def set_tl_duration_batch(junction_indices: List[int], duration: int)
Set the traffic light switch duration of all junctions in junction_indices
to duration
NOTE: This is only effective for TlPolicy.FIXED_TIME
and TlPolicy.MAX_PRESSURE
NOTE: Set duration to 0
to use the predefined duration in the map_file
set_tl_phase
def set_tl_phase(junction_index: Union[str, int], phase_index: int)
Set the phase of junction_index
to phase_index
set_tl_phase_batch
def set_tl_phase_batch(junction_indices: List[int], phase_indices: List[int])
Set the phase of junction_index
to phase_index
in batch
set_road_lane_plan
def set_road_lane_plan(road_index: int, plan_index: int)
Set the lane plan of road road_index
set_road_lane_plan_batch
def set_road_lane_plan_batch(road_indices: List[int], plan_indices: List[int])
Set the lane plan of road road_index
set_lane_restriction
def set_lane_restriction(lane_index: int, flag: bool)
Set the restriction state of lane lane_index
set_lane_restriction_batch
def set_lane_restriction_batch(lane_indices: List[int], flags: List[bool])
Set the restriction state of lane lane_index
set_lane_max_speed
def set_lane_max_speed(lane_index: int, max_speed: float)
Set the max_speed of lane lane_index
set_lane_max_speed_batch
def set_lane_max_speed_batch(lane_indices: List[int],
max_speeds: Union[float, List[float]])
Set the max_speed of lane lane_index
set_vehicle_route
def set_vehicle_route(vehicle_index: int,
route: List[int],
end_lane_id: int = -1,
end_s: float = -10)
Set the route of vehicle vehicle_index
to route
and the end to (end_lane_id
,end_s
)
-
If
end_lane_id
is-1
, then the vehicle will stop at the rightmost drivable lane of the last road -
end_s
will be clipped to be within the length of end lane -
If
end_s<0
, it will be treated as measured from the end of the lane
debug_vehicle_info
def debug_vehicle_info() -> NDArray[np.float64]
Get debug info
debug_vehicle_full_info
def debug_vehicle_full_info(vehicle_id: int) -> Tuple
Get full debug info for vehicle vehicle_id
debug_lane_info
def debug_lane_info()
Get debug info
next_step
def next_step(n=1)
Move forward n
steps
make_checkpoint
def make_checkpoint() -> int
Make a checkpoint of the current state of the simulator and return the checkpoint id
restore_checkpoint
def restore_checkpoint(checkpoint_id)
Restore the state of the simulator to a previous checkpoint
moss.convert
convert_from_mongo
def convert_from_mongo(map_col: Collection,
agent_col: Collection,
out_map: str,
out_agent: str,
use_tqdm=False,
ignore_unknown_fields=False)
Convert MongoDB collections into binary files
moss.router
Router Objects
class Router()
get_route
def get_route(start_road: int, end_road: int) -> List[int]
Get the route from start_road
to end_road
in the form of a list of road ids
get_aoi_route
def get_aoi_route(start_aoi: int, end_aoi: int, check=False) -> List[int]
Get the route from start_aoi
to end_aoi
in the form of a list of road ids
moss.geom
frange
def frange(a, b, s)
range for float numbers. [a, b)
with step s
Point Objects
class Point()
from_angle
@staticmethod
def from_angle(angle)
Create point on unit circle by angle
__mul__
def __mul__(x)
Scalar product
__matmul__
def __matmul__(o)
Dot product
__truediv__
def __truediv__(x)
Scalar division
perp
@property
def perp()
Perpendicular point (-y, x)
solve_intersection
def solve_intersection(a: Point, p: Point, b: Point, q: Point, single=True)
Solve for a + t p = b + s q
. Return t
if single
else (t, s)
angle_normalize
def angle_normalize(angle)
Return angle in [-π, π)
shoelace
def shoelace(arr)
Use the shoelace formula to calculate polygon area
arr
is a list of Point
s, vertexes of the polygon
Polyline Objects
class Polyline()
offset
def offset(d)
Offset polyline to the left by d
ltrim_
def ltrim_(l)
Trim polyline from the left by l
rtrim_
def rtrim_(l)
Trim polyline from the right by l
connect
@staticmethod
def connect(A: Point, a: Point, B: Point, b: Point, allow_overlap=False)
Return a polyline that connects point A
and B
with directions a
and b
respectively
overlap
def overlap(other)
Get overlaps with another polyline.
Return the distances of the overlaps along both polyline.
get_xy_from_s
def get_xy_from_s(s, clip=False)
Get Point(x, y) given distance traveled
extend_
def extend_(x)
extend polyline by abs(x)
if x>=0 then extend on the end, otherwise on the beginning
moss.mongo_map
moss.export
Recorder Objects
class Recorder()
Recorder is for local visualization and writes to a local file
DBRecorder Objects
class DBRecorder()
DBRecorder is for web visualization and writes to Postgres Database
moss.go_router
Router Objects
class Router()
__init__
def __init__(map_file: str,
routing_bin: str,
addr: str = 'localhost:52101',
use_existing_router=False)
map_file: path to the map file routing_bin: path to the routing binary addr: the address to be used for TCP socket
get_route
def get_route(start_lane: int, end_lane: int) -> List[int]
Get the route from start_lane
to end_lane
in the form of a list of road ids
get_route_async
async def get_route_async(start_end_lanes: List[Tuple[int, int]],
callback=None,
n_workers=8)
Get the route from start_lane
to end_lane
in the form of a list of road ids
get_aoi_route
def get_aoi_route(start_aoi: int, end_aoi: int) -> List[int]
Get the route from start_aoi
to end_aoi
in the form of a list of road ids
get_aoi_route_async
async def get_aoi_route_async(start_end_aois: List[Tuple[int, int]],
callback=None,
n_workers=os.cpu_count())
Get the route from start_aoi
to end_aoi
in the form of a list of road ids
moss.parallel
ParallelEngine Objects
class ParallelEngine(Engine)
Moss Engine for parallel simulation
ParallelManager Objects
class ParallelManager(Engine)
Helper class for managing multiple ParallelEngine
s.
Method calls on this object will be broadcast to all managed engines
execute
def execute(func: Callable[[int, Engine], Any]) -> List[Any]
Run func(idx, eng) in parallel and return the results as a list