This commit is contained in:
Ivan
2022-04-05 11:42:28 +03:00
commit 6dc0eb0fcf
5565 changed files with 1200500 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
cmake_minimum_required(VERSION 2.8.3)
project(test_rosservice)
find_package(catkin REQUIRED COMPONENTS genmsg rosunit rostest std_msgs)
if(CATKIN_ENABLE_TESTING)
add_service_files(DIRECTORY srv
FILES
HeaderEcho.srv
NOINSTALL
)
generate_messages(DEPENDENCIES std_msgs)
endif()
catkin_package()
if(CATKIN_ENABLE_TESTING)
catkin_add_nosetests(test)
add_rostest(test/rosservice.test)
#add_rostest(test/rosservice_unit.test)
endif()

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
NAME = 'header_echo_server'
from test_rosservice.srv import *
import rospy
def header_echo(req):
print("Returning %s"%(req))
return req.header
def header_echo_server():
rospy.init_node(NAME)
s = rospy.Service('header_echo', HeaderEcho, header_echo)
# spin() keeps Python from exiting until node is shutdown
rospy.spin()
if __name__ == "__main__":
header_echo_server()

View File

@@ -0,0 +1,22 @@
<package>
<name>test_rosservice</name>
<version>1.12.14</version>
<description>
Tests for the rosservice tool.
</description>
<maintainer email="dthomas@osrfoundation.org">Dirk Thomas</maintainer>
<license>BSD</license>
<url>http://ros.org/wiki/rosservice</url>
<author>Ken Conley</author>
<buildtool_depend version_gte="0.5.68">catkin</buildtool_depend>
<build_depend>genmsg</build_depend>
<build_depend>rostest</build_depend>
<build_depend>std_msgs</build_depend>
<test_depend>rospy</test_depend>
<test_depend>rosunit</test_depend>
<test_depend>test_rosmaster</test_depend>
</package>

View File

@@ -0,0 +1,3 @@
Header header
---
Header header

View File

@@ -0,0 +1,369 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import unittest
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
import time
import rosunit
from subprocess import Popen, PIPE, check_call, call
from contextlib import contextmanager
@contextmanager
def fakestdout():
realstdout = sys.stdout
fakestdout = StringIO()
sys.stdout = fakestdout
yield fakestdout
sys.stdout = realstdout
def todict(s):
d = {}
for l in s.split('\n'):
key, p, val = l.partition(':')
if p:
d[key] = val.strip()
return d
class TestRosservice(unittest.TestCase):
def setUp(self):
pass
def test_get_service_headers(self):
import rosservice
orig_uri = os.environ['ROS_MASTER_URI']
os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
try:
c = 'rosservice'
# test error conditions, integration tests cover success cases
try:
rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
self.fail("should have raised")
except rosservice.ROSServiceException: pass
try:
rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
self.fail("should have raised IO exc")
except rosservice.ROSServiceIOException: pass
finally:
os.environ['ROS_MASTER_URI'] = orig_uri
def test_get_service_type(self):
import rosservice
self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
def test_offline(self):
import rosservice
orig_uri = os.environ['ROS_MASTER_URI']
os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
try:
c = 'rosservice'
try:
rosservice.get_service_type('/add_two_ints')
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException:
pass
try:
rosservice._rosservice_cmd_list([c, 'list'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
try:
rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
try:
rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
try:
rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
try:
rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
try:
rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
self.fail("should have raised ROSServiceIOException")
except rosservice.ROSServiceIOException: pass
finally:
os.environ['ROS_MASTER_URI'] = orig_uri
def test_cmd_type(self):
import rosservice
cmd = 'rosservice'
s = '/add_two_ints'
try:
rosservice.rosservicemain([cmd, 'type', '/fake_service'])
self.fail("should have triggered error exit")
except SystemExit:
pass
for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'type', s])
v = b.getvalue().strip()
self.assertEquals('test_rosmaster/AddTwoInts', v)
def test_cmd_uri(self):
import rosservice
cmd = 'rosservice'
with fakestdout() as b:
try:
rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
self.fail("should have triggered error exit")
except SystemExit:
pass
for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'uri', s])
v = b.getvalue().strip()
self.assert_(v.startswith('rosrpc://'), v)
def test_cmd_node(self):
import rosservice
cmd = 'rosservice'
for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'node', s])
v = b.getvalue().strip()
if 'foo' in s:
self.assertEquals('/foo/a2iserver', v)
else:
self.assertEquals('/a2iserver', v)
try:
rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
self.fail("should have exited with error")
except SystemExit: pass
def test_full_usage(self):
import rosservice
try:
rosservice._fullusage()
self.fail("should have caused system exit")
except SystemExit: pass
def test_cmd_info(self):
import rosservice
cmd = 'rosservice'
try:
rosservice.rosservicemain([cmd, 'info'])
self.fail("should have exited with error")
except SystemExit: pass
try:
rosservice.rosservicemain([cmd, 'info', '/fake_service'])
self.fail("should have exited with error")
except SystemExit: pass
try:
rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
self.fail("should have exited with error")
except SystemExit: pass
for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'info', s])
d = todict(b.getvalue())
if 'foo' in s:
self.assertEquals('/foo/a2iserver', d['Node'])
else:
self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
self.assertEquals('a b', d['Args'])
self.assert_('URI' in d)
def test_cmd_find(self):
import rosservice
cmd = 'rosservice'
try:
rosservice.rosservicemain([cmd, 'find'])
self.fail("arg parsing should have failed")
except SystemExit: pass
try:
rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
self.fail("arg parsing should have failed")
except SystemExit: pass
v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
d = set([x for x in b.getvalue().split('\n') if x.strip()])
self.assertEquals(v, d)
with fakestdout() as b:
rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
self.assertEquals('', b.getvalue().strip())
def test_get_service_class_by_name(self):
import rosservice
try:
rosservice.get_service_class_by_name('fake')
self.fail("should have raised")
except rosservice.ROSServiceException as e:
self.assertEquals("Service [fake] is not available.", str(e))
def test_cmd_call(self):
import rosservice
cmd = 'rosservice'
try:
rosservice.rosservicemain([cmd, 'call'])
self.fail("arg parsing should have failed")
except SystemExit: pass
try:
rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
self.fail("should have failed with too many args")
except SystemExit: pass
def setUp(self):
# wait for all services to come up
import rosservice
services = ['/add_two_ints',
'/foo/add_two_ints',
'/bar/add_two_ints',
]
import time
timeout_t = time.time() + 10.
while time.time() < timeout_t:
with fakestdout() as b:
rosservice._rosservice_cmd_list(['rosservice', 'list'])
v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
if not (set(services) - set(v) ):
return
self.fail("timeout")
def test_cmd_list(self):
import rosservice
cmd = 'rosservice'
s = '/add_two_ints'
# test main entry
rosservice.rosservicemain([cmd, 'list'])
# test directly
services = ['/add_two_ints',
'/foo/add_two_ints',
'/bar/add_two_ints',
'/header_echo',
]
services_nodes = ['/add_two_ints /a2iserver',
'/foo/add_two_ints /foo/a2iserver',
'/bar/add_two_ints /bar/a2iserver',
'/header_echo /headerserver',
]
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list'])
v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
v = [x for x in v if not x.startswith('/rosout/')]
v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
self.assertEquals(set(services), set(v))
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
v = [x for x in v if not x.startswith('/rosout/')]
v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
self.assertEquals(set(services_nodes), set(v))
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
v = [x for x in v if not x.startswith('/rosout/')]
v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
self.assertEquals(set(services_nodes), set(v))
# test with multiple service names
try:
rosservice._rosservice_cmd_list([cmd, 'list', s, s])
self.fail("should have caused parser error")
except SystemExit:
pass
# test with resolved service names
for s in services:
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', s])
self.assertEquals(s, b.getvalue().strip())
# test with relative service names
s = 'add_two_ints'
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', s])
self.assertEquals('/add_two_ints', b.getvalue().strip())
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
with fakestdout() as b:
rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
# test with namespaces
s = '/foo'
rosservice._rosservice_cmd_list([cmd, 'list', s])
rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
s = 'foo'
rosservice._rosservice_cmd_list([cmd, 'list', s])
rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
NAME = 'test_rosservice'
if __name__ == '__main__':
rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])

View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import time
import unittest
import rospy
import rostest
import test_rosmaster.srv
from subprocess import Popen, PIPE, check_call, call
from threading import Thread
class TestTask(Thread):
def __init__(self, task):
Thread.__init__(self)
self.task = task
self.success = False
self.done = False
self.value = None
def run(self):
try:
print("STARTING TASK")
self.value = self.task()
print("TASK HAS COMPLETED")
self.success = True
except:
import traceback
traceback.print_exc()
self.done = True
class TestRosserviceOnline(unittest.TestCase):
def setUp(self):
pass
def test_rosservice(self):
# wait for network to initialize
services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints']
for s in services:
rospy.wait_for_service(s)
cmd = 'rosservice'
names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints']
# list
# - hard to exact match as we are still adding builtin services to nodes (e.g. set_logger_level)
output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
output = output.decode()
l = set(output.split())
for s in services:
self.assert_(s in l)
for name in names:
# args
output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0]
self.assertEquals(b'a b', output.strip())
# type
output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0]
self.assertEquals(b'test_rosmaster/AddTwoInts', output.strip())
# find
output = Popen([cmd, 'find', 'test_rosmaster/AddTwoInts'], stdout=PIPE).communicate()[0]
values = [v.strip() for v in output.decode().split('\n') if v.strip()]
self.assertEquals(set(values), set(services))
# uri
output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0]
# - no exact answer
self.assert_(output.decode().startswith('rosrpc://'), output)
# call
output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0]
self.assertEquals(b'sum: 3', output.strip())
output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0]
self.assertEquals(b'sum: 3', output.strip())
name = 'header_echo'
# test with a Header so we can validate keyword args
import yaml
import time
t = time.time()
# test with empty headers
for v in ['{}', '{header: {}}', '{header: {seq: 0}}']:
output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
output = output.strip()
self.assert_(output, output)
val = yaml.load(output)['header']
self.assertEquals('', val['frame_id'])
self.assert_(val['seq'] >= 0)
self.assertEquals(0, val['stamp']['secs'])
self.assertEquals(0, val['stamp']['nsecs'])
# test with auto headers
for v in ['{header: auto}', '{header: {stamp: now}}']:
output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
val = yaml.load(output.strip())['header']
self.assertEquals('', val['frame_id'])
self.assert_(val['seq'] >= 0)
self.assert_(val['stamp']['secs'] >= int(t))
# verify that it respects ROS_NS
# - the uris should be different as the names should resolve differently
env = os.environ.copy()
env['ROS_NAMESPACE'] = 'foo'
uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0]
uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
self.assert_(uri2.decode().startswith('rosrpc://'))
self.assertNotEquals(uri1, uri2)
# test_call_wait
def task1():
output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0]
self.assertEquals(b'sum: 3', output.strip())
timeout_t = time.time() + 5.
t1 = TestTask(task1)
t1.start()
rospy.init_node('test_call_wait')
rospy.Service("wait_two_ints", test_rosmaster.srv.AddTwoInts, lambda x: x.a + x.b)
while not t1.done and time.time() < timeout_t:
time.sleep(0.5)
self.assert_(t1.success)
PKG = 'test_rosservice'
NAME = 'test_rosservice_command_line_online'
if __name__ == '__main__':
rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)

View File

@@ -0,0 +1,11 @@
<launch>
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
<node name="headerserver" pkg="test_rosservice" type="header_echo_server.py" />
<group ns="foo">
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
</group>
<group ns="bar">
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
</group>
<test test-name="rosservice_command_line_online" pkg="test_rosservice" type="check_rosservice_command_line_online.py" />
</launch>

View File

@@ -0,0 +1,11 @@
<launch>
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
<node name="headerserver" pkg="test_rosservice" type="header_echo_server.py" />
<group ns="foo">
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
</group>
<group ns="bar">
<node name="a2iserver" pkg="test_rosmaster" type="add_two_ints_server" />
</group>
<test test-name="rosservice_unit" pkg="test_rosservice" type="check_rosservice.py" />
</launch>

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import unittest
import time
import rostest
from subprocess import Popen, PIPE, check_call, call
class TestRosserviceOffline(unittest.TestCase):
def setUp(self):
pass
## test that the usage command works
def test_cmd_help(self):
cmd = 'rosservice'
sub = ['args', 'info', 'list', 'call', 'type', 'uri', 'find']
output = Popen([cmd], stdout=PIPE).communicate()[0]
output = output.decode()
self.assert_('Commands' in output)
output = Popen([cmd, '-h'], stdout=PIPE).communicate()[0]
output = output.decode()
self.assert_('Commands' in output)
# make sure all the commands are in the usage
for c in sub:
self.assert_("%s %s"%(cmd, c) in output, output)
for c in sub:
output = Popen([cmd, c, '-h'], stdout=PIPE).communicate()
self.assert_("Usage:" in output[0].decode(), output)
# make sure usage refers to the command
self.assert_("%s %s" % (cmd, c) in output[0].decode(), output)
def test_offline(self):
cmd = 'rosservice'
# point at a different 'master'
env = os.environ.copy()
env['ROS_MASTER_URI'] = 'http://localhost:11312'
kwds = { 'env': env, 'stdout': PIPE, 'stderr': PIPE}
msg = "ERROR: Unable to communicate with master!\n"
output = Popen([cmd, 'list'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
output = Popen([cmd, 'type', 'add_two_ints'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
output = Popen([cmd, 'uri', 'add_two_ints'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
output = Popen([cmd, 'call', 'add_two_ints', '1', '2'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
# - wait should still fail if master is offline
output = Popen([cmd, 'call', '--wait', 'add_two_ints', '1', '2'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
output = Popen([cmd, 'find', 'test_ros/AddTwoInts'], **kwds).communicate()
self.assert_(output[1].decode().endswith(msg))
if __name__ == '__main__':
rostest.unitrun('test_rosservice', NAME, TestRosserviceOffline, sys.argv, coverage_packages=[])