Build Your Simulation
Building your own simulation involves three steps.
- The first step is to create a map that describes the area intended for simulation.
- The second step is to generate trip data describing movements between elements in the map created in the first step.
- The third step is to conduct the simulation based on the map and travel data generated in the first two steps.
Build Map
In this step, we fetch roadnet
and aois
data within the area intended for simulation from OpenStreetMap and process these data into GeoJSON format, finally we process this roadnet GeoJSON file and AOI GeoJSON file into our own Protobuf Map format.
.
Fetch from OpenStreetMap
from mosstool.map.osm import RoadNet, Building
rn = RoadNet(
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
max_latitude=39.92,
min_latitude=39.78,
max_longitude=116.32,
min_longitude=116.40,
)
roadnet = rn.create_road_net("cache/topo.geojson")
building = Building(
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
max_latitude=39.92,
min_latitude=39.78,
max_longitude=116.32,
min_longitude=116.40,
)
aois = building.create_building("cache/aois.geojson")
- Here
max_latitude
,min_latitude
,max_longitude
,min_longitude
describe the bounding box of the area from which data is to be obtained from OSM, and all are in WGS84 coordinates. proj_str
is a PROJ.4 projection string, used to transform longitude and latitude coordinates into planar xy coordinates. Generally, if there are no special requirements, theproj_str
can be directly set to a Transverse Mercator projection centered on the map's center, which isf"+proj=tmerc +lat_0={(max_latitude+min_latitude)/2} +lon_0={(max_longitude+min_longitude)/2}"
.
It's worth noting that the features
in output GeoJSON file of create_road_net()
and create_building()
remains in WGS84 coordinates. The provision of proj_str
is because using planar xy coordinates is more precise than latitude and longitude coordinates when handling the internal topological relationships of the road network.
Generate the Map
format output
from mosstool.map.builder import Builder
builder = Builder(
net=roadnet,
aois=aois,
proj_str="+proj=tmerc +lat_0=22.54095 +lon_0=113.90899",
)
m = builder.build("example")
roadnet
is the GeoJSON file we generated in previous step.aois
is the GeoJSON file we generated in previous step.proj_str
plays the same role as describes in previous step, which transforms longitude and latitude coordinates into planar xy coordinates.
pb = dict2pb(m, Map())
with open("data/temp/map.pb", "wb") as f:
f.write(pb.SerializeToString())
Here we save the generate Map into cache path.
Build Trip
In this step, we generate random persons with specific movements between lanes
within the map.
Generate persons with only origin and destination positions
Load the required data
from mosstool.trip.generator import (PositionMode, RandomGenerator,
default_vehicle_template_generator)
from mosstool.trip.route import RoutingClient, pre_route
from mosstool.type import Map, Persons, TripMode, Person
from mosstool.util.format_converter import pb2json
with open("data/temp/map.pb", "rb") as f:
m = Map()
m.ParseFromString(f.read())
- Here
m
is the map we generated in StepBuild Map
. template_func
, as its name suggests, provides basic attributes for the generated persons apart from their schedules. Inmosstool
, a default vehicle template function is provided asdefault_vehicle_template_generator
. There are also default bus template functiondefault_bus_template_generator
, template functionGaussianTemplateGenerator
to set attributes with Gaussian distribution, template functionUniformTemplateGenerator
to set attributes with Uniform distribution and template functionCalibratedTemplateGenerator
for generating attributes according to the probabilities we have calibrated.
Initialize the RandomGenerator
RandomGenerator
is a tool to generate random persons on specific map.
rg = RandomGenerator(
m,
[PositionMode.LANE, PositionMode.LANE],
TripMode.TRIP_MODE_DRIVE_ONLY,
template_func=default_vehicle_template_generator,
)
m
is the map we just loaded in previous step.position_modes
is a list ofPositionMode
enums, this indicates the number of movements for the generated person; in the example, the movements of generated person is lane0-to-lane1. Ifposition_modes
is set as[PositionMode.LANE, PositionMode.LANE, PositionMode.LANE]
, then the generated movement is lane0-to-lane1-to-lane2, and so on.trip_mode
indicates the mode of transportation for the person's movement. In this example, it is set toTripMode.TRIP_MODE_DRIVE_ONLY
, which means the person's mode of transportation is driving.template_func
is what we just loaded in previous step.
Generate the persons
persons = rg.uniform(
num=100,
first_departure_time_range=(8 * 3600, 9 * 3600),
schedule_interval_range=(5 * 60, 10 * 60),
start_id=0,
)
print(persons)
num
specifies the number of persons to be generated.first_departure_time_range
is a list ofPositionMode
enums, this indicates the number of movements for the generated person; in the example, the movements of generated person is lane0-to-lane1. Ifposition_modes
is set as[PositionMode.LANE, PositionMode.LANE, PositionMode.LANE]
, then the generated movement is lane0-to-lane1-to-lane2, and so on.trip_mode
indicates the mode of transportation for the person's movement. In this example, it is set toTripMode.TRIP_MODE_DRIVE_ONLY
, which means the person's mode of transportation is driving.
Fill in the route of the persons' all schedules
This step requires extra packages released on Releases · routing.
Activate the routing
service before running codes below with ./routing -map data/temp/map.pb
.
client = RoutingClient("http://localhost:52101")
ok_persons = []
for p in persons:
p = await pre_route(client, p)
if len(p.schedules) > 0 and len(p.schedules[0].trips) > 0:
ok_persons.append(p)
print(ok_persons)
print("final length: ", len(ok_persons))
pb = Persons(persons=ok_persons)
with open("data/temp/persons.pb", "wb") as f:
f.write(pb.SerializeToString())
- For local running routing service, the default listening host is
52101
, so in this example the input argument ofRoutingClient
is set to "http://localhost:52101". To change this gRPC listening address, add extra input arg-listen "localhost:<HOST_NUM>"
when activatingrouting
service. - Due to the use of the
await
keyword in the request navigation, the entire function needs to be an asynchronous function (declared with async) if len(p.schedules) > 0 and len(p.schedules[0].trips) > 0
keepsperson
woh has at least one trip with valid route.
Run Simulation
Create MOSS engine
from moss import Engine, TlPolicy, Verbosity
MAP_PATH = "./data/temp/map.pb"
TRIP_PATH = "./data/temp/persons.pb"
eng = Engine(
map_file=MAP_PATH,
agent_file=TRIP_PATH,
start_step=8 * 3600,
verbose_level=Verbosity.ALL,
device=0,
)
- Here
map_file
andagent_file
are what we generated in StepBuild Map
andBuild Trip
. start_step
determines the time (in seconds) of the simulation begins.8 * 3600
means the simulation begins at 8 a.m.verbose_level
specifies the level of output logs; here, 'Verbosity.ALL' will output all logs.device
specifies whichCUDA
device to run simulation on.
Set traffic-light policy
eng.set_tl_duration_batch(range(eng.junction_count), 30)
eng.set_tl_policy_batch(
range(eng.junction_count), TlPolicy.FIXED_TIME
)
In the example, the simplest traffic-light control policy was chosen, with a fixed phase duration of 30 seconds
Simulate specified steps
for _ in range(600):
eng.next_step()
Here, next_step
is used to simulate step by step for 600 steps. You can also specify next_step(n)
to simulate a certain number of steps at once.