次の次くらいにやっと逆運動学がでてきます。
http://www.ros.org/wiki/pr2/Tutorials/Getting%20the%20current%20joint%20angles
ではシミュレータを立ち上げて、
$ rostopic echo joint_states
してみましょう。
うぎゃーーー。すごい大量のデータがでてきますね。
これはsensor_msgs/JointState型のトピックです。
$ rosmsg show sensor_msgs/JointState
してみると
Header header
uint32 seq
time stamp
string frame_id
string[] name
float64[] position
float64[] velocity
float64[] effort
となっています。前回見たtrajectory_msgs/JointTrajectory型とにてますね。
position, effortあたりはグリッパのときにでてきましたね。
じゃあ、いつもどおりパッケージつくりましょう。
$ roscd sandbox
$ roscreate-pkg joint_states_listener rospy sensor_msgs
$ roscd joint_states_listener
で、以下をsrv/ReturnJointStates.srvとして保存しましょう。
string[] name --- uint32[] found float64[] position float64[] velocity float64[] effort
そしてCMakeLists.txtの以下をコメントイン。
rosbuild_gensrv()
で、makeしておきましょう。
そして以下をsrc/joint_states_listener/joint_states_listener.pyとして保存しましょう。
#!/usr/bin/python
#spins off a thread to listen for joint_states messages
#and provides the same information (or subsets of) as a service
import roslib
roslib.load_manifest('joint_states_listener')
import rospy
from joint_states_listener.srv import *
from sensor_msgs.msg import JointState
import threading
#holds the latest states obtained from joint_states messages
class LatestJointStates:
def __init__(self):
rospy.init_node('joint_states_listener')
self.lock = threading.Lock()
self.name = []
self.position = []
self.velocity = []
self.effort = []
self.thread = threading.Thread(target=self.joint_states_listener)
self.thread.start()
s = rospy.Service('return_joint_states', ReturnJointStates, self.return_joint_states)
#thread function: listen for joint_states messages
def joint_states_listener(self):
rospy.Subscriber('joint_states', JointState, self.joint_states_callback)
rospy.spin()
#callback function: when a joint_states message arrives, save the values
def joint_states_callback(self, msg):
self.lock.acquire()
self.name = msg.name
self.position = msg.position
self.velocity = msg.velocity
self.effort = msg.effort
self.lock.release()
#returns (found, position, velocity, effort) for the joint joint_name
#(found is 1 if found, 0 otherwise)
def return_joint_state(self, joint_name):
#no messages yet
if self.name == []:
rospy.logerr("No robot_state messages received!\n")
return (0, 0., 0., 0.)
#return info for this joint
self.lock.acquire()
if joint_name in self.name:
index = self.name.index(joint_name)
position = self.position[index]
velocity = self.velocity[index]
effort = self.effort[index]
#unless it's not found
else:
rospy.logerr("Joint %s not found!", (joint_name,))
self.lock.release()
return (0, 0., 0., 0.)
self.lock.release()
return (1, position, velocity, effort)
#server callback: returns arrays of position, velocity, and effort
#for a list of joints specified by name
def return_joint_states(self, req):
joints_found = []
positions = []
velocities = []
efforts = []
for joint_name in req.name:
(found, position, velocity, effort) = self.return_joint_state(joint_name)
joints_found.append(found)
positions.append(position)
velocities.append(velocity)
efforts.append(effort)
return ReturnJointStatesResponse(joints_found, positions, velocities, efforts)
#run the server
if __name__ == "__main__":
latestjointstates = LatestJointStates()
print "joints_states_listener server started, waiting for queries"
rospy.spin()
当然chmod 755 joint_states_listener.pyもしておきます。
そして以下でサービスを呼び出します。
#!/usr/bin/python
#test client for joint_states_listener
import roslib
roslib.load_manifest('joint_states_listener')
import rospy
from joint_states_listener.srv import ReturnJointStates
import time
import sys
def call_return_joint_states(joint_names):
rospy.wait_for_service("return_joint_states")
try:
s = rospy.ServiceProxy("return_joint_states", ReturnJointStates)
resp = s(joint_names)
except rospy.ServiceException, e:
print "error when calling return_joint_states: %s"%e
sys.exit(1)
for (ind, joint_name) in enumerate(joint_names):
if(not resp.found[ind]):
print "joint %s not found!"%joint_name
return (resp.position, resp.velocity, resp.effort)
#pretty-print list to string
def pplist(list):
return ' '.join(['%2.3f'%x for x in list])
#print out the positions, velocities, and efforts of the right arm joints
if __name__ == "__main__":
joint_names = ["r_shoulder_pan_joint",
"r_shoulder_lift_joint",
"r_upper_arm_roll_joint",
"r_elbow_flex_joint",
"r_forearm_roll_joint",
"r_wrist_flex_joint",
"r_wrist_roll_joint"]
(position, velocity, effort) = call_return_joint_states(joint_names)
while(1):
print "position:", pplist(position)
print "velocity:", pplist(velocity)
print "effort:", pplist(effort)
time.sleep(1)
ようするに、joint_statesというトピックを購読して、必要なものだけを返しているだけですね。
今回はこれくらいにしましょう。
中身なくてすみません。
今回はこれくらいにしましょう。
中身なくてすみません。
0 件のコメント:
コメントを投稿