Skip to main content

Python API

moss

moss.map

Map Objects

class Map()

parse_standard_junction

def parse_standard_junction()

Parse all the junctions in the map as standard junctions. 4 in roads and 4 out roads.

moss.engine

LaneChange Objects

class LaneChange(Enum)

NONE: Vehicles will not do lane change according to road condition.

SUMO: Vehicles will attempt to switch to a faster lane, as in SUMO.

MOBIL: Vehicles will take the action that maximize the change in acceleration of self and affected vehicles, as in MOBIL.

Engine Objects

class Engine()

Moss Engine

NOTE: Cannot create multiple Engines on different device. For that purpose, use moss.parallel.ParallelEngine instead.

vehicle_count

@property
def vehicle_count() -> int

The number of vehicles in the agent file

lane_count

@property
def lane_count() -> int

The number of lanes

road_count

@property
def road_count() -> int

The number of roads

junction_count

@property
def junction_count() -> int

The number of junctions

get_map

def get_map()

Get the Map object

get_agents

def get_agents()

Get the Agents object

get_current_time

def get_current_time() -> float

Get the current time

get_running_vehicle_count

def get_running_vehicle_count() -> int

Get the total number of running vehicles

get_lane_statuses

def get_lane_statuses() -> NDArray[np.int8]

Get the traffic light status of each lane, 0-green / 1-yellow / 2-red / 3-restriction

get_lane_geoms

def get_lane_geoms() -> List[NDArray[np.float32]]

Get the geometry of each lane, [[x1,y1], ..., [xn,yn]]

get_lane_lengths

def get_lane_lengths() -> NDArray[np.int8]

Get the length of each lane

get_lane_vehicles

def get_lane_vehicles() -> Union[Dict[str, List[str]], List[List[int]]]

Get the list of vehicles of each lane

get_lane_vehicle_counts

def get_lane_vehicle_counts() -> NDArray[np.int32]

Get the number of vehicles of each lane

get_lane_waiting_vehicle_counts

def get_lane_waiting_vehicle_counts(
speed_threshold: float = 0.1) -> Union[Dict[str, int], List[int]]

Get the number of vehicles of each lane with speed lower than speed_threshold

get_lane_waiting_at_end_vehicle_counts

def get_lane_waiting_at_end_vehicle_counts(
speed_threshold: float = 0.1,
distance_to_end: float = 100) -> Union[Dict[str, int], List[int]]

Get the number of vehicles of each lane with speed lower than speed_threshold and distance to end lower than distance_to_end

get_lane_ids

def get_lane_ids() -> NDArray[np.int32]

Get the ids of the lanes

get_junction_ids

def get_junction_ids() -> NDArray[np.int32]

Get the ids of the junctions

get_junction_lanes

def get_junction_lanes() -> List[List[int]]

Get the index of the lanes inside each junction

get_junction_inout_lanes

def get_junction_inout_lanes() -> Tuple[List[List[int]], List[List[int]]]

Get the index of the in and out lanes of each junction

get_junction_phase_lanes

def get_junction_phase_lanes() -> List[List[Tuple[List[int], List[int]]]]

Get the index of the in and out lanes of each phase of each junction

get_junction_phase_ids

def get_junction_phase_ids() -> NDArray[np.int32]

Get the phase id of each junction, -1 if it has no traffic lights

get_junction_phase_counts

def get_junction_phase_counts() -> NDArray[np.int32]

Get the number of available phases of each junction

get_junction_dynamic_roads

def get_junction_dynamic_roads() -> List[List[int]]

Get the ids of the dynamic roads connected to each junction

get_road_lane_plans

def get_road_lane_plans(road_index: int) -> List[List[slice]]

Get the dynamic lane plan of the road road_index, represented as list of lane groups:

[
[slice(lane_start, lane_end), ...]
]

get_vehicle_lanes

def get_vehicle_lanes() -> Union[Dict[str, int], NDArray[np.int32]]

Get the lane index of each vehicle, -1 for not running on the road

get_vehicle_speeds

def get_vehicle_speeds() -> Union[Dict[str, float], NDArray[np.float32]]

Get the speed of each vehicle, -1 for not running on the road

get_vehicle_statuses

def get_vehicle_statuses() -> NDArray[np.int8]

Get the status of each vehicle, 1-driving / 2-finished / 0-otherwise

get_vehicle_raw_statuses

def get_vehicle_raw_statuses() -> NDArray[np.int8]

Get the raw status of each vehicle, only for debugging.

get_vehicle_traveling_or_departure_times

def get_vehicle_traveling_or_departure_times() -> NDArray[np.float32]
  • For finished vehicles, it is the traveling time.
  • For driving vehicles, it is the negative departure time.

get_finished_vehicle_count

def get_finished_vehicle_count() -> int

Get the number of the finished vehicles

get_finished_vehicle_average_traveling_time

def get_finished_vehicle_average_traveling_time() -> float

Get the average traveling time of the finished vehicles

get_running_vehicle_average_traveling_time

def get_running_vehicle_average_traveling_time() -> float

Get the average traveling time of the running vehicles

get_departed_vehicle_average_traveling_time

def get_departed_vehicle_average_traveling_time() -> float

Get the average traveling time of the departed vehicles (running+finished)

get_vehicle_distances

def get_vehicle_distances() -> Union[Dict[str, float], NDArray[np.float32]]

Get the traveled distance of each vehicle on the current lane

get_vehicle_total_distances

def get_vehicle_total_distances(
) -> Union[Dict[str, float], NDArray[np.float32]]

Get the total traveled distance of each vehicle

get_vehicle_positions

def get_vehicle_positions() -> NDArray[np.float64]

Get the (x,y,dir) of each running vehicle

get_vehicle_positions_all

def get_vehicle_positions_all() -> NDArray[np.float64]

Get the (x,y,dir) of all vehicles

get_vehicle_id_positions

def get_vehicle_id_positions() -> NDArray[np.float64]

Get the (id,x,y,dir) of each vehicle

get_road_lane_plan_index

def get_road_lane_plan_index(road_index: int) -> int

Get the lane plan of road road_index

get_road_vehicle_counts

def get_road_vehicle_counts() -> NDArray[np.int32]

Get the number of vehicles of each road

get_road_waiting_vehicle_counts

def get_road_waiting_vehicle_counts(
speed_threshold: float = 0.1) -> Union[Dict[str, int], List[int]]

Get the number of vehicles with speed lower than speed_threshold of each road

set_vehicle_enable

def set_vehicle_enable(vehicle_index: int, enable: bool)

Enable or disable vehicle vehicle_index

set_vehicle_enable_batch

def set_vehicle_enable_batch(vehicle_indices: List[int],
enable: Union[bool, List[bool]])

Enable or disable vehicles in vehicle_indices

set_tl_policy

def set_tl_policy(junction_index: int, policy: TlPolicy)

Set the traffic light policy of junction junction_index to policy

set_tl_policy_batch

def set_tl_policy_batch(junction_indices: List[int], policy: TlPolicy)

Set the traffic light policy of all junctions in junction_indices to policy

set_tl_duration

def set_tl_duration(junction_index: int, duration: int)

Set the traffic light switch duration of junction junction_index to duration

NOTE: This is only effective for TlPolicy.FIXED_TIME and TlPolicy.MAX_PRESSURE.

NOTE: Set duration to 0 to use the predefined duration in the map_file

set_tl_duration_batch

def set_tl_duration_batch(junction_indices: List[int], duration: int)

Set the traffic light switch duration of all junctions in junction_indices to duration

NOTE: This is only effective for TlPolicy.FIXED_TIME and TlPolicy.MAX_PRESSURE

NOTE: Set duration to 0 to use the predefined duration in the map_file

set_tl_phase

def set_tl_phase(junction_index: Union[str, int], phase_index: int)

Set the phase of junction_index to phase_index

set_tl_phase_batch

def set_tl_phase_batch(junction_indices: List[int], phase_indices: List[int])

Set the phase of junction_index to phase_index in batch

set_road_lane_plan

def set_road_lane_plan(road_index: int, plan_index: int)

Set the lane plan of road road_index

set_road_lane_plan_batch

def set_road_lane_plan_batch(road_indices: List[int], plan_indices: List[int])

Set the lane plan of road road_index

set_lane_restriction

def set_lane_restriction(lane_index: int, flag: bool)

Set the restriction state of lane lane_index

set_lane_restriction_batch

def set_lane_restriction_batch(lane_indices: List[int], flags: List[bool])

Set the restriction state of lane lane_index

set_lane_max_speed

def set_lane_max_speed(lane_index: int, max_speed: float)

Set the max_speed of lane lane_index

set_lane_max_speed_batch

def set_lane_max_speed_batch(lane_indices: List[int],
max_speeds: Union[float, List[float]])

Set the max_speed of lane lane_index

set_vehicle_route

def set_vehicle_route(vehicle_index: int,
route: List[int],
end_lane_id: int = -1,
end_s: float = -10)

Set the route of vehicle vehicle_index to route and the end to (end_lane_id,end_s)

  • If end_lane_id is -1, then the vehicle will stop at the rightmost drivable lane of the last road

  • end_s will be clipped to be within the length of end lane

  • If end_s<0, it will be treated as measured from the end of the lane

debug_vehicle_info

def debug_vehicle_info() -> NDArray[np.float64]

Get debug info

debug_vehicle_full_info

def debug_vehicle_full_info(vehicle_id: int) -> Tuple

Get full debug info for vehicle vehicle_id

debug_lane_info

def debug_lane_info()

Get debug info

next_step

def next_step(n=1)

Move forward n steps

make_checkpoint

def make_checkpoint() -> int

Make a checkpoint of the current state of the simulator and return the checkpoint id

restore_checkpoint

def restore_checkpoint(checkpoint_id)

Restore the state of the simulator to a previous checkpoint

moss.convert

convert_from_mongo

def convert_from_mongo(map_col: Collection,
agent_col: Collection,
out_map: str,
out_agent: str,
use_tqdm=False,
ignore_unknown_fields=False)

Convert MongoDB collections into binary files

moss.router

Router Objects

class Router()

get_route

def get_route(start_road: int, end_road: int) -> List[int]

Get the route from start_road to end_road in the form of a list of road ids

get_aoi_route

def get_aoi_route(start_aoi: int, end_aoi: int, check=False) -> List[int]

Get the route from start_aoi to end_aoi in the form of a list of road ids

moss.geom

frange

def frange(a, b, s)

range for float numbers. [a, b) with step s

Point Objects

class Point()

from_angle

@staticmethod
def from_angle(angle)

Create point on unit circle by angle

__mul__

def __mul__(x)

Scalar product

__matmul__

def __matmul__(o)

Dot product

__truediv__

def __truediv__(x)

Scalar division

perp

@property
def perp()

Perpendicular point (-y, x)

solve_intersection

def solve_intersection(a: Point, p: Point, b: Point, q: Point, single=True)

Solve for a + t p = b + s q. Return t if single else (t, s)

angle_normalize

def angle_normalize(angle)

Return angle in [-π, π)

shoelace

def shoelace(arr)

Use the shoelace formula to calculate polygon area

arr is a list of Points, vertexes of the polygon

Polyline Objects

class Polyline()

offset

def offset(d)

Offset polyline to the left by d

ltrim_

def ltrim_(l)

Trim polyline from the left by l

rtrim_

def rtrim_(l)

Trim polyline from the right by l

connect

@staticmethod
def connect(A: Point, a: Point, B: Point, b: Point, allow_overlap=False)

Return a polyline that connects point A and B with directions a and b respectively

overlap

def overlap(other)

Get overlaps with another polyline.

Return the distances of the overlaps along both polyline.

get_xy_from_s

def get_xy_from_s(s, clip=False)

Get Point(x, y) given distance traveled

extend_

def extend_(x)

extend polyline by abs(x)

if x>=0 then extend on the end, otherwise on the beginning

moss.mongo_map

moss.export

Recorder Objects

class Recorder()

Recorder is for local visualization and writes to a local file

DBRecorder Objects

class DBRecorder()

DBRecorder is for web visualization and writes to Postgres Database

moss.go_router

Router Objects

class Router()

__init__

def __init__(map_file: str,
routing_bin: str,
addr: str = 'localhost:52101',
use_existing_router=False)

map_file: path to the map file routing_bin: path to the routing binary addr: the address to be used for TCP socket

get_route

def get_route(start_lane: int, end_lane: int) -> List[int]

Get the route from start_lane to end_lane in the form of a list of road ids

get_route_async

async def get_route_async(start_end_lanes: List[Tuple[int, int]],
callback=None,
n_workers=8)

Get the route from start_lane to end_lane in the form of a list of road ids

get_aoi_route

def get_aoi_route(start_aoi: int, end_aoi: int) -> List[int]

Get the route from start_aoi to end_aoi in the form of a list of road ids

get_aoi_route_async

async def get_aoi_route_async(start_end_aois: List[Tuple[int, int]],
callback=None,
n_workers=os.cpu_count())

Get the route from start_aoi to end_aoi in the form of a list of road ids

moss.parallel

ParallelEngine Objects

class ParallelEngine(Engine)

Moss Engine for parallel simulation

ParallelManager Objects

class ParallelManager(Engine)

Helper class for managing multiple ParallelEngines.

Method calls on this object will be broadcast to all managed engines

execute

def execute(func: Callable[[int, Engine], Any]) -> List[Any]

Run func(idx, eng) in parallel and return the results as a list

moss.agent