Tutorial 2: Reading robot data
Reading robot data
Tutorial 2 is started similarly to Tutorial 1.
While in Tutorial 1 the frontend was used to send command to the pressure controlled robot, in Tutorial 2 the frontend is used to read the robot data.
handle = get_handle()
frontend = handle.frontends["robot"]
observation = frontend.latest()
“observation” is an instance of the Observation-class. It contains has the following method:
# at each backend iteration (i.e. control iteration of o80_mujoco), an observation is generated.
# this gives the backend iteration number at which this observation was generated.
# the time stamp at which the observation was generated.
# it is an integer representing time in nano seconds
# the pressure of the muscles as measured by sensors
print("observed pressures",observation.get_observed_pressures())
# the pressure of the muscles as the commands requested them to be.
# Difference with observed pressures comes from either imprecision
# and/or time for the system to reach the commanded pressures
print("desired pressures",observation.get_desired_pressures())
# as measured by the encoders, in radian
# finite difference applied on positions, in radian per seconds
# these methods are available only for simulated robots:
print("cartesian position of the racket",observation.get_cartesian_position())
print("orientation of the racket",observation.get_cartesian_orientation())
# on the real robot, encoders do not read a proper value until
# the joint moved through a certain reference position. If ref_found
# is False for a given join, it means the join did not yet go through this
# position, and the values of positions and velocities (as above) are meaningless
print("references found",observation.get_references_found())
# the measured frequency of the backend at the time the observation was created.
frequency = observation.get_frequency()
You may get a history of observations:
observations = frontend.get_latest_observations(10)
for observation in observations:
Only a certain number of iteration are kept in memory:
# requesting the last 10 millions observations. The rotating memory does not keep so many observations.
# so instead, it will return all contained informations.
# to increase the memory size, the code needs to be recompiled after the "QUEUE_SIZE" (see o80_pam/srcpy/wrappers.cpp)
# is increased.
# note the reading and deserializing of so many observations from the shared memory takes some time
observations = frontend.get_latest_observations(10000000)
print("number of observations read: {}".format(len(observations)))
print("spanning a duration of: {} seconds".format((observations[-1].get_time_stamp()-observations[0].get_time_stamp())*1e-9))
It is possible to request the observation corresponding to a specific backend iteration.
latest_iteration = frontend.latest().get_iteration()
observation = frontend.read(latest_iteration-10)
print("requested observation of iteration {}, received observation of iteration {}".format(latest_iteration-10,observation.get_iteration()))
You can also make sure you read all observations generated during a time window:
# getting latest observation and its iteration number
observation = frontend.latest()
tagged_iteration = observation.get_iteration()
# waiting
# getting all new observations generated since
observations = frontend.get_observations_since(tagged_iteration)
print("tagged iteration: {}".format(tagged_iteration))
print("recovered observations from {} to {}".format(observations[0].get_iteration(),
print("spanning a duration of: {} seconds".format((observations[-1].get_time_stamp()-observations[0].get_time_stamp())*1e-9))
You may also force your python code to run at the frequency of the backend by waiting for new observations:
# you can wait for a new observation to be written by the frontend
time_start = time.time()
# this is important to call this function before calling "wait_for_next" as below
observations = []
# this loop should run at the same frequency as the backend
while time.time()-time_start < 3:
# checking the frequency
duration = observations[-1].get_time_stamp() - observations[0].get_time_stamp()
frequency = (len(observations) / (duration*1e-9) )
print("collected {} observations at a frequency of {} hz".format(len(observations),frequency))
The pulse and pulse_and_wait presented previously and used then for sending commands, also return instances of Observation:
# pulse both sends the queue of commands and
# returns the latest observations
observation = frontend.pulse()
# waiting for the previous command to finish
# getting latest observation
observation1 = frontend.latest()
# pulse_and_wait returns the observation
# generated at the iteration the command was finished
observation2 = frontend.pulse_and_wait()
duration = observation2.get_time_stamp()-observation1.get_time_stamp()
duration = float(duration)*1e-9
print("the command took around {} seconds to complete".format(duration))