Skip to main content

Build Your Simulation

Building your own simulation involves three steps.

  • The first step is to create a map that describes the area intended for simulation.
  • The second step is to generate trip data describing movements between elements in the map created in the first step.
  • The third step is to conduct the simulation based on the map and travel data generated in the first two steps.

Build Map

In this step, we fetch roadnet and aois data within the area intended for simulation from OpenStreetMap and process these data into GeoJSON format, finally we process this roadnet GeoJSON file and AOI GeoJSON file into our own Protobuf Map format. .

Fetch from OpenStreetMap

from mosstool.map.osm import RoadNet, Building
rn = RoadNet(
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
max_latitude=39.92,
min_latitude=39.78,
max_longitude=116.32,
min_longitude=116.40,
)
roadnet = rn.create_road_net("cache/topo.geojson")
building = Building(
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
max_latitude=39.92,
min_latitude=39.78,
max_longitude=116.32,
min_longitude=116.40,
)
aois = building.create_building("cache/aois.geojson")
  • Here max_latitude, min_latitude, max_longitude, min_longitude describe the bounding box of the area from which data is to be obtained from OSM, and all are in WGS84 coordinates.
  • proj_str is a PROJ.4 projection string, used to transform longitude and latitude coordinates into planar xy coordinates. Generally, if there are no special requirements, the proj_str can be directly set to a Transverse Mercator projection centered on the map's center, which is f"+proj=tmerc +lat_0={(max_latitude+min_latitude)/2} +lon_0={(max_longitude+min_longitude)/2}".

It's worth noting that the features in output GeoJSON file of create_road_net() and create_building() remains in WGS84 coordinates. The provision of proj_str is because using planar xy coordinates is more precise than latitude and longitude coordinates when handling the internal topological relationships of the road network.

Generate the Map format output

from mosstool.map.builder import Builder
builder = Builder(
net=roadnet,
aois=aois,
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
)
m = builder.build("example")
  • roadnet is the GeoJSON file we generated in previous step.
  • aois is the GeoJSON file we generated in previous step.
  • proj_str plays the same role as describes in previous step, which transforms longitude and latitude coordinates into planar xy coordinates.
from mosstool.util.format_converter import dict2pb
pb = dict2pb(m, Map())
with open("data/temp/map.pb", "wb") as f:
f.write(pb.SerializeToString())

Here we save the generate Map into cache path.

Build Trip

In this step, we generate random persons with specific movements between lanes within the map.

Generate persons with only origin and destination positions

Load the required data

from mosstool.trip.generator import (PositionMode, RandomGenerator,
default_vehicle_template_generator)
from mosstool.trip.route import RoutingClient, pre_route
from mosstool.type import Map, Persons, TripMode, Person
from mosstool.util.format_converter import pb2json
with open("data/temp/map.pb", "rb") as f:
m = Map()
m.ParseFromString(f.read())
  • Here m is the map we generated in Step Build Map.
  • template_func, as its name suggests, provides basic attributes for the generated persons apart from their schedules. In mosstool, a default vehicle template function is provided as default_vehicle_template_generator. There are also default bus template function default_bus_template_generator, template function GaussianTemplateGenerator to set attributes with Gaussian distribution, template function UniformTemplateGenerator to set attributes with Uniform distribution and template function CalibratedTemplateGenerator for generating attributes according to the probabilities we have calibrated.

Initialize the RandomGenerator

RandomGenerator is a tool to generate random persons on specific map.

rg = RandomGenerator(
m,
[PositionMode.LANE, PositionMode.LANE],
TripMode.TRIP_MODE_DRIVE_ONLY,
template_func=default_vehicle_template_generator,
)
  • m is the map we just loaded in previous step.
  • position_modes is a list of PositionMode enums, this indicates the number of movements for the generated person; in the example, the movements of generated person is lane0-to-lane1. If position_modes is set as [PositionMode.LANE, PositionMode.LANE, PositionMode.LANE], then the generated movement is lane0-to-lane1-to-lane2, and so on.
  • trip_mode indicates the mode of transportation for the person's movement. In this example, it is set to TripMode.TRIP_MODE_DRIVE_ONLY, which means the person's mode of transportation is driving.
  • template_func is what we just loaded in previous step.

Generate the persons

persons = rg.uniform(
num=100,
first_departure_time_range=(8 * 3600, 9 * 3600),
schedule_interval_range=(5 * 60, 10 * 60),
start_id=0,
)
print(persons)
  • num specifies the number of persons to be generated.
  • first_departure_time_range is a list of PositionMode enums, this indicates the number of movements for the generated person; in the example, the movements of generated person is lane0-to-lane1. If position_modes is set as [PositionMode.LANE, PositionMode.LANE, PositionMode.LANE], then the generated movement is lane0-to-lane1-to-lane2, and so on.
  • trip_mode indicates the mode of transportation for the person's movement. In this example, it is set to TripMode.TRIP_MODE_DRIVE_ONLY, which means the person's mode of transportation is driving.

Fill in the route of the persons' all schedules

This step requires extra packages released on Releases · routing. Activate the routing service before running codes below with ./routing -map data/temp/map.pb.

client = RoutingClient("http://localhost:52101")
ok_persons = []
for p in persons:
p = await pre_route(client, p)
if len(p.schedules) > 0 and len(p.schedules[0].trips) > 0:
ok_persons.append(p)
print(ok_persons)
print("final length: ", len(ok_persons))
pb = Persons(persons=ok_persons)
with open("data/temp/persons.pb", "wb") as f:
f.write(pb.SerializeToString())
  • For local running routing service, the default listening host is 52101, so in this example the input argument of RoutingClient is set to "http://localhost:52101". To change this gRPC listening address, add extra input arg -listen "localhost:<HOST_NUM>" when activating routing service.
  • Due to the use of the await keyword in the request navigation, the entire function needs to be an asynchronous function (declared with async)
  • if len(p.schedules) > 0 and len(p.schedules[0].trips) > 0 keeps person woh has at least one trip with valid route.

Run Simulation

Create MOSS engine

from moss import Engine, TlPolicy, Verbosity
MAP_PATH = "./data/temp/map.pb"
TRIP_PATH = "./data/temp/persons.pb"
eng = Engine(
map_file=MAP_PATH,
agent_file=TRIP_PATH,
start_step=8 * 3600,
verbose_level=Verbosity.ALL,
device=0,
)
  • Here map_file and agent_file are what we generated in Step Build Map and Build Trip.
  • start_step determines the time (in seconds) of the simulation begins. 8 * 3600 means the simulation begins at 8 a.m.
  • verbose_level specifies the level of output logs; here, 'Verbosity.ALL' will output all logs.
  • device specifies which CUDA device to run simulation on.

Set traffic-light policy

eng.set_tl_duration_batch(range(eng.junction_count), 30)
eng.set_tl_policy_batch(
range(eng.junction_count), TlPolicy.FIXED_TIME
)

In the example, the simplest traffic-light control policy was chosen, with a fixed phase duration of 30 seconds

Simulate specified steps

for _ in range(600):
eng.next_step()

Here, next_step is used to simulate step by step for 600 steps. You can also specify next_step(n) to simulate a certain number of steps at once.