"""
VIKI: more than a GUI for ROS, https://github.com/UT-RAM/viki
version: 0.2 - Alice
copyright: Robin Hoogervorst, Alex Kamphuis, Cees Trouwborst, 2016
licensed under the MIT License
"""
import xml.etree.cElementTree as ET
import xml.dom.minidom
import random
import pprint
[docs]def write(configuration, filename="aeroworks.launch"):
# Create root element <launch>
root = ET.Element("launch")
recursiveWrite(configuration, root, root)
f = open(filename, 'w')
f.write(prettify(root))
[docs]def prettify(elem):
"""Return a pretty-printed XML string for the Element.
"""
rough_string = ET.tostring(elem, 'utf-8')
reparsed = xml.dom.minidom.parseString(rough_string)
return reparsed.toprettyxml(indent="\t")
[docs]def getName(obj):
return obj.name
[docs]def recursiveWrite(configPart, configElem, rootElem, path=''):
# Loop through modules at current level
for con in configPart.connections_to_add:
from_attr = lookup(configPart, con.listener, path)
to_attr = lookup(configPart, con.publisher, path)
# NEW REMAP
if from_attr is not to_attr:
relayElement = ET.SubElement(rootElem, 'node')
relayElement.set('name', '$(anon remap_'
+ str(random.random()) + ')')
relayElement.set('pkg', 'topic_tools')
relayElement.set('type', 'relay')
relayElement.set('args', to_attr + ' ' + from_attr)
relayElement.set('ns', 'remaps')
# OLD REMAP
# remap = ET.SubElement(rootElem, "remap")
# remap.set('to', to_attr)
# remap.set('from', from_attr)
for machine in configPart.machines_to_add:
machineElement = ET.SubElement(rootElem, 'machine')
machineElement.set('name', machine.name)
machineElement.set('address', machine.hostname)
machineElement.set('user', machine.username)
machineElement.set('password', machine.password)
machineElement.set('env-loader', '~/.viki_env')
# machineElement.set('ros-root', '/opt/ros/')
# machineElement.set('ros-package-path', '~/catkin_ws/')
# machineElement.set('default', 'false')
for mod in configPart.modules_to_add:
# Loop through executables for specific module
for ic in mod.implementation.config:
from_attr = path + '/' + lookupInternal(ic.listener, mod)
to_attr = path + '/' + lookupInternal(ic.publisher, mod)
# THIS IS THE NEW REMAP
relayElement = ET.SubElement(rootElem, 'node')
relayElement.set('name', '$(anon remap_{})'.format(str(random.random())))
relayElement.set('pkg', 'topic_tools')
relayElement.set('type', 'relay')
relayElement.set('args', to_attr + ' ' + from_attr)
relayElement.set('ns', 'remaps')
# THIS IS THE OLD REMAP
# remap = ET.SubElement(rootElem, "remap")
# remap.set('to', to_attr)
# remap.set('from', from_attr)
added_params = []
for executable in mod.implementation.executables:
nodens = ET.SubElement(configElem, "group", ns=mod.id+'_'+executable.id)
node = ET.SubElement(nodens, "node", pkg=executable.pkg, name=mod.id+'_'+executable.id, type=executable.executable)
# Check if one of the parameters that are to be set are present in this executable
# Todo/problem: params are not defined at executable level, but at module level.
for paramSearch in mod.parameters_to_add[:]:
paramFound = False
for paramList in executable.params[:]:
if paramSearch.name == paramList.name:
param = ET.SubElement(node, "param", name=paramSearch.name, value=paramSearch.value)
# Remove found param from both lists
mod.parameters_to_add.remove(paramSearch)
added_params.append(paramList.name)
paramFound = True
break
if not paramFound:
print "Error during parameter config: could not find {}".format(paramSearch.name)
print "Added parameters: {}".format(added_params)
# At this point, we also have the default parameters we should fill. loop again.
for paramList in executable.params:
if not paramList.name in added_params:
added_params.append(paramList.name)
param = ET.SubElement(node, "param", name=paramList.name, value=paramList.default, type=paramList.type)
# find any command line arguments that belong to this executable
for argSearch in mod.args:
if argSearch.execid == executable.id:
node.attrib['args'] = argSearch.argument
# find any prefixes for this executable
for prefixSearch in mod.prefixes:
if prefixSearch.execid == executable.id:
node.attrib['launch-prefix'] = prefixSearch.prefix
# find any machine selections
for selectionSearch in mod.machine_selections:
print selectionSearch.execid
if selectionSearch.execid == executable.id:
node.attrib['machine'] = selectionSearch.machine_name
# At this point, we end up with some parameters that are not "connected". Echo those.
for paramSearch in mod.parameters_to_add:
print 'Parameter "' + paramSearch.name + '", valued "' + paramSearch.value + '", could not be connected.'
try:
for ns in configPart.namespaces:
ns_elem = ET.SubElement(rootElem, "group", ns=ns.id)
recursiveWrite(ns, ns_elem, rootElem, path+'/'+ns.id)
except AttributeError:
print 'No namespaces in configPart'
[docs]def lookup(configPart, string, path):
pprint.pprint(configPart.namespaces)
pprint.pprint(configPart.modules_to_add)
pprint.pprint(string)
pprint.pprint(path)
# Split string in parts (basically the address)
parts = string.split('/')
# If the number of parts is smaller than two, something is wrong. More is possible (but not recommended)
if len(parts) < 2:
raise Exception("Incorrect connect-statement")
connectionString = ''
# Keep checking if the first part of the connection string matches a namespace. If so: add it to the final connection string and pop it off.
at_end = False
while at_end is False:
matchFound = False
for ns in configPart.namespaces:
if ns.id == parts[0]:
matchFound = True
configPart = ns
connectionString += parts[0] + '/'
parts.pop(0)
if matchFound is False:
at_end = True
# Correct namespace is found, now find node name,
# basiaclly replacing <executable_id> with <running namespace of executable>
for mod in configPart.modules_to_add:
if mod.id == parts[0]:
interfaces = mod.implementation.outputs+mod.implementation.inputs
for con in interfaces:
if con.name == parts[1]:
linkparts = con.link.split('/')
exec_id = linkparts[0]
executable = mod.implementation.getExecutable(exec_id)
if executable == None:
print "WARNING: No executable with id {} found".format(exec_id)
executable_interface = executable.getInterface("/".join(linkparts[1:]))
if executable_interface == None:
print "WARNING: No interface with name {} found. Make sure you specified the in- and outputs correctly".format("/".join(linkparts[1:]))
linkparts[0] = ''
# Fallback to extend namespace if the ROS topic is defined private
if executable_interface.namespace == "private":
connectionString += mod.id + '_' + exec_id + '/'
connectionString += mod.id + '_' + exec_id + '/' + "/".join(linkparts[1:])
break
break
return path + '/' + connectionString
[docs]def lookupInternal(string, mod):
parts = string.split('/')
if len(parts) < 2:
raise Exception("Incorrect connect-statement")
exec_id = parts[0]
executable = mod.implementation.getExecutable(exec_id)
executable_interface = executable.getInterface("/".join(parts[1:]))
parts[0] = ''
if executable_interface.namespace == "private":
parts[0] = '{}_{}/'.format(mod.id, exec_id)
parts[0] += mod.id + "_" + exec_id
return "/".join(parts)