Source code for core.backend.writeLaunch

"""
VIKI: more than a GUI for ROS, https://github.com/UT-RAM/viki 
version: 0.2 - Alice
copyright: Robin Hoogervorst, Alex Kamphuis, Cees Trouwborst, 2016 
licensed under the MIT License
"""

import xml.etree.cElementTree as ET
import xml.dom.minidom
import random
import pprint

[docs]def write(configuration, filename="aeroworks.launch"): # Create root element <launch> root = ET.Element("launch") recursiveWrite(configuration, root, root) f = open(filename, 'w') f.write(prettify(root))
[docs]def prettify(elem): """Return a pretty-printed XML string for the Element. """ rough_string = ET.tostring(elem, 'utf-8') reparsed = xml.dom.minidom.parseString(rough_string) return reparsed.toprettyxml(indent="\t")
[docs]def getName(obj): return obj.name
[docs]def recursiveWrite(configPart, configElem, rootElem, path=''): # Loop through modules at current level for con in configPart.connections_to_add: from_attr = lookup(configPart, con.listener, path) to_attr = lookup(configPart, con.publisher, path) # NEW REMAP if from_attr is not to_attr: relayElement = ET.SubElement(rootElem, 'node') relayElement.set('name', '$(anon remap_' + str(random.random()) + ')') relayElement.set('pkg', 'topic_tools') relayElement.set('type', 'relay') relayElement.set('args', to_attr + ' ' + from_attr) relayElement.set('ns', 'remaps') # OLD REMAP # remap = ET.SubElement(rootElem, "remap") # remap.set('to', to_attr) # remap.set('from', from_attr) for machine in configPart.machines_to_add: machineElement = ET.SubElement(rootElem, 'machine') machineElement.set('name', machine.name) machineElement.set('address', machine.hostname) machineElement.set('user', machine.username) machineElement.set('password', machine.password) machineElement.set('env-loader', '~/.viki_env') # machineElement.set('ros-root', '/opt/ros/') # machineElement.set('ros-package-path', '~/catkin_ws/') # machineElement.set('default', 'false') for mod in configPart.modules_to_add: # Loop through executables for specific module for ic in mod.implementation.config: from_attr = path + '/' + lookupInternal(ic.listener, mod) to_attr = path + '/' + lookupInternal(ic.publisher, mod) # THIS IS THE NEW REMAP relayElement = ET.SubElement(rootElem, 'node') relayElement.set('name', '$(anon remap_{})'.format(str(random.random()))) relayElement.set('pkg', 'topic_tools') relayElement.set('type', 'relay') relayElement.set('args', to_attr + ' ' + from_attr) relayElement.set('ns', 'remaps') # THIS IS THE OLD REMAP # remap = ET.SubElement(rootElem, "remap") # remap.set('to', to_attr) # remap.set('from', from_attr) added_params = [] for executable in mod.implementation.executables: nodens = ET.SubElement(configElem, "group", ns=mod.id+'_'+executable.id) node = ET.SubElement(nodens, "node", pkg=executable.pkg, name=mod.id+'_'+executable.id, type=executable.executable) # Check if one of the parameters that are to be set are present in this executable # Todo/problem: params are not defined at executable level, but at module level. for paramSearch in mod.parameters_to_add[:]: paramFound = False for paramList in executable.params[:]: if paramSearch.name == paramList.name: param = ET.SubElement(node, "param", name=paramSearch.name, value=paramSearch.value) # Remove found param from both lists mod.parameters_to_add.remove(paramSearch) added_params.append(paramList.name) paramFound = True break if not paramFound: print "Error during parameter config: could not find {}".format(paramSearch.name) print "Added parameters: {}".format(added_params) # At this point, we also have the default parameters we should fill. loop again. for paramList in executable.params: if not paramList.name in added_params: added_params.append(paramList.name) param = ET.SubElement(node, "param", name=paramList.name, value=paramList.default, type=paramList.type) # find any command line arguments that belong to this executable for argSearch in mod.args: if argSearch.execid == executable.id: node.attrib['args'] = argSearch.argument # find any prefixes for this executable for prefixSearch in mod.prefixes: if prefixSearch.execid == executable.id: node.attrib['launch-prefix'] = prefixSearch.prefix # find any machine selections for selectionSearch in mod.machine_selections: print selectionSearch.execid if selectionSearch.execid == executable.id: node.attrib['machine'] = selectionSearch.machine_name # At this point, we end up with some parameters that are not "connected". Echo those. for paramSearch in mod.parameters_to_add: print 'Parameter "' + paramSearch.name + '", valued "' + paramSearch.value + '", could not be connected.' try: for ns in configPart.namespaces: ns_elem = ET.SubElement(rootElem, "group", ns=ns.id) recursiveWrite(ns, ns_elem, rootElem, path+'/'+ns.id) except AttributeError: print 'No namespaces in configPart'
[docs]def lookup(configPart, string, path): pprint.pprint(configPart.namespaces) pprint.pprint(configPart.modules_to_add) pprint.pprint(string) pprint.pprint(path) # Split string in parts (basically the address) parts = string.split('/') # If the number of parts is smaller than two, something is wrong. More is possible (but not recommended) if len(parts) < 2: raise Exception("Incorrect connect-statement") connectionString = '' # Keep checking if the first part of the connection string matches a namespace. If so: add it to the final connection string and pop it off. at_end = False while at_end is False: matchFound = False for ns in configPart.namespaces: if ns.id == parts[0]: matchFound = True configPart = ns connectionString += parts[0] + '/' parts.pop(0) if matchFound is False: at_end = True # Correct namespace is found, now find node name, # basiaclly replacing <executable_id> with <running namespace of executable> for mod in configPart.modules_to_add: if mod.id == parts[0]: interfaces = mod.implementation.outputs+mod.implementation.inputs for con in interfaces: if con.name == parts[1]: linkparts = con.link.split('/') exec_id = linkparts[0] executable = mod.implementation.getExecutable(exec_id) if executable == None: print "WARNING: No executable with id {} found".format(exec_id) executable_interface = executable.getInterface("/".join(linkparts[1:])) if executable_interface == None: print "WARNING: No interface with name {} found. Make sure you specified the in- and outputs correctly".format("/".join(linkparts[1:])) linkparts[0] = '' # Fallback to extend namespace if the ROS topic is defined private if executable_interface.namespace == "private": connectionString += mod.id + '_' + exec_id + '/' connectionString += mod.id + '_' + exec_id + '/' + "/".join(linkparts[1:]) break break return path + '/' + connectionString
[docs]def lookupInternal(string, mod): parts = string.split('/') if len(parts) < 2: raise Exception("Incorrect connect-statement") exec_id = parts[0] executable = mod.implementation.getExecutable(exec_id) executable_interface = executable.getInterface("/".join(parts[1:])) parts[0] = '' if executable_interface.namespace == "private": parts[0] = '{}_{}/'.format(mod.id, exec_id) parts[0] += mod.id + "_" + exec_id return "/".join(parts)