"""
VIKI: more than a GUI for ROS, https://github.com/UT-RAM/viki
version: 0.2 - Alice
copyright: Robin Hoogervorst, Alex Kamphuis, Cees Trouwborst, 2016
licensed under the MIT License
"""
"""Describes the different objects used as abstraction of the AeroWorks configuration and modules."""
import helpers
[docs]class Interface:
"""An *interface* represents an in- or output of a module. This in- and outputs can be connected to other module's in- and outputs."""
def __init__(self, interface_type,
name, message_type, required, link=None, namespace="base"):
#: Represents the name of the interface. E.g.: "cmd_vel"
self.name = name
#: Represents the type of the interface. E.g.: "ros_topic"
self.type = interface_type
#: Namespace used in the ros package for this topic, "base", "global" or "private"; according to wiki.ros.org/Names
self.namespace = namespace
#: Represents the message type the interface can handle. E.g.: "twist" or "standard_msgs/Empty"
self.message_type = helpers.lookupMessageType(message_type)
#: Defines if the interface is required to be connected. Execution will halt if this requirement is not met.
self.required = required
#: Contains the reference to the executable within the module that actually provides the in- or output. E.g.: "test_node/test_topic"
self.link = link
[docs]class Internal_Interface:
"""An *internal_interface* represents a connection between an in- and output **within** a module. At this point, implicitly only implemented for *ros_topic* type interfaces.
.. note:: In the future, this might be a specific case of :class:`Interface`.
"""
def __init__(self, publisher, listener):
#: Refers to the topic on which messages are be published. E.g. "node_name/command_giver"
self.publisher = publisher
#: Refers to the topic where the node is listening for messages. This topic is remapped to listen to the publisher. E.g. "node2_name/i_listen_here"
self.listener = listener
[docs]class Module:
"""A *module* represents a separately distributable piece of software, that is able to work within the AeroWorks infrastructure.
"""
def __init__(self, type, id):
#: A list of :class:`Interface` describing all inputs of the module.
self.inputs = []
#: A list of :class:`Interface` describing all outputs of the module.
self.outputs = []
#: A list of :class:`Executable` describing all executables (ROS nodes) in the module.
self.executables = []
#: Represents the ID of the module, as defined in the ``viki.xml``. This is used for identifying and selecting the module throughout the process, so this value should be unique.
self.id = id
#: Represents the type of the module, as defined in the ``viki.xml``. This categorization is added for the comfort of the user. Examples are "userinput", "sensor", "controller" and "vehicle".
self.type = type
#: Contains a dictionary with all module metadata, as defined in ``viki.xml``.
self.meta = {}
#: A list of :class:`Internal_Interface` describing all internal connections.
self.config = []
#: Contains the path to the module config
self.path = ''
#: List with names of dependent ROS packages
self.package_dependencies = []
#: List with missing packages, initially empty
self.missing_packages = []
[docs] def addPackageDependency(self, package_name, type="apt-get", src=""):
self.package_dependencies.append({
'name': package_name,
'type': type,
'src': src
})
[docs] def addMissingPackage(self, package_name):
self.missing_packages.add(package_name)
[docs] def addOutput(self, interface):
"""Adds an output to the list of module outputs (:attr:`outputs`).
:param interface: The interface to add
:type interface: :class:`Interface`
"""
self.outputs.append(interface)
[docs] def addExecutable(self, executable):
"""Adds an executable to the list of module executables (:attr:`executables`). Those executables are ROS nodes.
:param executable: The executable to add.
:type executable: :class:`Executable`
"""
self.executables.append(executable)
[docs] def addIntConnect(self, internal_interface):
"""Adds an internal connection to the list of module internal connections (:attr:`config`).
:param internal_interface: The connection to add
:type internal_interface: :class:`Internal_Interface`
"""
self.config.append(internal_interface)
[docs] def setPath(self, path):
"""Sets the path for the current module.
:param path: The path where to find the module
:type path: string"""
self.path = path
[docs] def getExecutable(self, exec_id):
""" Returns the executable with the id, within this module
:param exec_id: the string id of the executable, as defined in the xml file
:return: Executable
"""
for executable in self.executables:
if executable.id == exec_id:
return executable
return None
[docs]class Executable:
"""An *executable* represents a ROS node that can be launched through a ROS launch file, generated by the AeroWorks infrastructure."""
def __init__(self, id, pkg, executable):
#: A list of :class:`Interface` describing all inputs of the executable.
self.inputs = []
#: A list of :class:`Interface` describing all outputs of the executable.
self.outputs = []
#: A list of :class:`Parameter` describing parameters to be set for the executable.
self.params = []
#: A string with default arguments
self.args = ""
#: The ID of the executable, as given in the ``viki.xml``.
self.id = id
#: The ROS package that contains the executable. E.g.: "joy"
self.pkg = pkg
#: The ROS node executable to launch. E.g. "joy_node".
self.executable = executable
[docs] def addOutput(self, interface):
"""Adds an output to the list of executable outputs (:attr:`outputs`).
:param interface: The interface to add
:type interface: :class:`Interface`
"""
self.outputs.append(interface)
[docs] def addParameter(self, parameter):
"""Adds a parameter to the list of executable parameters (:attr:`params`).
:param parameter: The parameter to add
:type parameter: :class:`Parameter`
"""
self.params.append(parameter)
[docs] def setArguments(self, argument):
self.args = argument
[docs] def getInterface(self, name):
for interface in (self.inputs + self.outputs):
if (interface.name == name):
return interface
return None
[docs]class Parameter:
"""A *Parameter* represents a param in the rosparam server
:param name: The name of the parameter
:param default: A default value, when no specific value is given
:param type: Type of parameter
:param value: Value of the parameter
"""
def __init__(self, name, type, default=None, value=None):
self.name = name
self.default = default
self.type = type
self.value = value
[docs]class Configuration:
"""A *Configuration* is a list of settings,
usually from a configuration file, used to setup an experiment.
it contains:
:param id: unique id for this configuration
:param modules_to_add: list of modules needed in this *configuration*
:param connections_to_add: list of connections needed
:param namespaces: list of :class:Namespace in this *configuration*
"""
def __init__(self, id):
self.id = id
self.modules_to_add = []
self.connections_to_add = []
self.machines_to_add = []
self.namespaces = []
[docs]class Namespace:
"""A namespace is in itself basically a :class:Configuration ,
which can in turn contain multiple namespaces.
:param id: unique id for this configuration
:param modules_to_add: list of modules needed in this *configuration*
:param connections_to_add: list of connections needed
:param namespaces: list of *namespace*s
"""
def __init__(self, id):
self.id = id
self.modules_to_add = []
self.connections_to_add = []
self.namespaces = []
[docs]class Module_to_add:
"""An object representing a module that is desired to be in a configuration.
It is a temporary container for all information to get the actual :class:Module classes,
and contains an *implementation* where the :class:Module can be placed.
:param role: role of this module eg. userinput, vehicle, etc.
:param id: unique id of the module
:param type: type of module
:param supress_warning: suppress warnings given in/by this module (not yet implemented)
:param parameters_to_add: list of parameters that are to be added to class:Executable in this module
:param args: list of command line arguments that are used to start this modules nodes
"""
def __init__(self, role, type, id, supress_warning=False):
self.role = role
self.id = id
self.type = type
self.supress_warning = supress_warning
self.parameters_to_add = []
self.args = []
self.prefixes = []
self.machine_selections = []
# here we put Module classes used for implementation
self.implementation = None
[docs] def add_cmdline_arg(self, arg):
"""Add a command line argument object *arg* to the list of arguments needed to run this module
:param arg: The command line argument object to add
"""
self.args.append(arg)
[docs] def add_prefix(self, pf):
"""Add a launch-prefix object *pf* to the list of prefixes desired to run
:param pf: prefix object
"""
self.prefixes.append(pf)
[docs] def add_machine_selection(self, selection):
"""Add a machine selection object *selection* to the list of machine selections desired to run
:param selection: machine selection object
"""
self.machine_selections.append(selection)
[docs]class Connection_to_add:
"""Representation of a connection desired to add to a configuration
:param publisher: topic of the publisher to be connected
:param subscriber: topic of the subscriber to be connected
"""
def __init__(self, publisher, listener):
self.publisher = publisher
self.listener = listener
[docs]class Parameter_to_add:
"""A parameter to add to a module (and finally to an executable)
:param name: name of the parameter
:param value: value of the parameter
"""
def __init__(self, name, value):
self.name = name
self.value = value
[docs]class Cmdline_argument:
"""An argument to use when starting a specific rosnode.
Used for instance when starting rviz with a configuration file.
:param executable_id: id of the executable this argument is used for
:param argument: string of all arguments to use for this executable
"""
def __init__(self, executable_id, argument):
self.execid = executable_id
self.argument = argument
[docs]class Launch_prefix:
"""A launch prefix that will be run before the actual ROSnode.
Can be very useful to for instance start a node in debug mode.
:param executable_id: id of the executable this prefix is used for
:param prefix: string of the prefix
"""
def __init__(self, executable_id, prefix):
self.execid = executable_id
self.prefix = prefix
[docs]class Selected_machine:
"""A machine selection defines at what machine the ros_node will run.
:param executable_id: id of the executable this selection is used for
:param machine_name: name of the machine
"""
def __init__(self, executable_id, machine_name):
self.execid = executable_id
self.machine_name = machine_name
[docs]class Machine:
def __init__(self, name, hostname, username, password):
self.name = name
self.hostname = hostname
self.username = username
self.password = password