Source code for sardana.macroserver.msparameter

#!/usr/bin/env python

##############################################################################
##
## This file is part of Sardana
##
## http://www.sardana-controls.org/
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
## Sardana is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Sardana is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the definition of the macroserver parameters for
macros"""

__all__ = ["WrongParam", "MissingParam", "UnknownParamObj", "WrongParamType",
           "TypeNames", "Type", "ParamType", "ParamRepeat", "ElementParamType",
           "ElementParamInterface", "AttrParamType", "AbstractParamTypes",
           "ParamDecoder"]

__docformat__ = 'restructuredtext'

from taurus.core.util.containers import CaselessDict

from sardana import ElementType, INTERFACES_EXPANDED
from sardana.macroserver.msbase import MSBaseObject
from sardana.macroserver.msexception import MacroServerException, \
    UnknownMacro, UnknownMacroLibrary


class WrongParam(MacroServerException):

    def __init__(self, *args):
        MacroServerException.__init__(self, *args)
        self.type = 'Wrong parameter'


class MissingParam(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Missing parameter'


class UnknownParamObj(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Unknown parameter'


class WrongParamType(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Unknown parameter type'


class TypeNames:
    """Class that holds the list of registered macro parameter types"""

    def __init__(self):
        self._type_names = {}
        self._pending_type_names = {}

    def addType(self, name):
        """Register a new macro parameter type"""
        setattr(self, name, name)
        self._type_names[name] = name
        if name in self._pending_type_names:
            del self._pending_type_names[name]

    def removeType(self, name):
        """remove a macro parameter type"""
        delattr(self, name)
        try:
            del self._type_names[name]
        except ValueError:
            pass

    def __str__(self):
        return str(self._type_names.keys())

#    def __getattr__(self, name):
#        if name not in self._pending_type_names:
#            self._pending_type_names[name] = name
#        return self._pending_type_names[name]


# This instance of TypeNames is intended to provide access to types to the
# Macros in a "Type.Motor" fashion
Type = TypeNames()


[docs]class ParamType(MSBaseObject):
All = 'All' # Capabilities ItemList = 'ItemList' ItemListEvents = 'ItemListEvents' capabilities = [] type_class = str def __init__(self, macro_server, name): MSBaseObject.__init__(self, name=name, full_name=name, macro_server=macro_server, elem_type=ElementType.ParameterType) def getName(self): return self.name def getObj(self, str_repr): return self.type_class(str_repr) @classmethod def hasCapability(cls, cap): return cap in cls.capabilities def serialize(self, *args, **kwargs): kwargs = MSBaseObject.serialize(self, *args, **kwargs) kwargs['composed'] = False return kwargs class ParamRepeat(object): # opts: min, max def __init__(self, *param_def, **opts): self.param_def = param_def self.opts = {'min': 1, 'max': None} self.opts.update(opts) self._obj = list(param_def) self._obj.append(self.opts) def items(self): return self.opts.items() def __getattr__(self, name): return self.opts[name] def obj(self): return self._obj class ElementParamType(ParamType): capabilities = ParamType.ItemList, ParamType.ItemListEvents def __init__(self, macro_server, name): ParamType.__init__(self, macro_server, name) def accepts(self, elem): return elem.getType() == self._name def getObj(self, name, pool=ParamType.All, cache=False): macro_server = self.macro_server if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: elem_info = pool.getObj(name, elem_type=self._name) if elem_info is not None and self.accepts(elem_info): return elem_info # not a pool object, maybe it is a macro server object (perhaps a macro # code or a macro library try: return macro_server.get_macro(name) except UnknownMacro: pass try: return macro_server.get_macro_lib(name) except UnknownMacroLibrary: pass # neither pool nor macroserver contains any element with this name raise UnknownParamObj('%s with name %s does not exist' % \ (self._name, name)) def getObjDict(self, pool=ParamType.All, cache=False): macro_server = self.macro_server objs = CaselessDict() if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: for elem_info in pool.getElements(): if self.accepts(elem_info): objs[elem_info.name] = elem_info for macro_lib_name, macro_lib in macro_server.get_macros().items(): if self.accepts(macro_lib): objs[macro_lib_name] = macro_lib for macro_name, macro in macro_server.get_macros().items(): if self.accepts(macro): objs[macro_name] = macro return objs def getObjListStr(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return obj_dict.keys() def getObjList(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return obj_dict.values() def serialize(self, *args, **kwargs): kwargs = ParamType.serialize(self, *args, **kwargs) kwargs['composed'] = True return kwargs class ElementParamInterface(ElementParamType): def __init__(self, macro_server, name): ElementParamType.__init__(self, macro_server, name) bases, doc = INTERFACES_EXPANDED.get(name) self._interfaces = bases def accepts(self, elem): elem_type = elem.getType() elem_interfaces = INTERFACES_EXPANDED.get(elem_type)[0] if elem_interfaces is None: return ElementParamType.accepts(self, elem) return self._name in elem_interfaces def getObj(self, name, pool=ParamType.All, cache=False): macro_server = self.macro_server if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: elem_info = pool.getElementWithInterface(name, self._name) if elem_info is not None and self.accepts(elem_info): return elem_info # not a pool object, maybe it is a macro server object (perhaps a macro # class or a macro library try: return macro_server.get_macro(name) except UnknownMacro: pass try: return macro_server.get_macro_lib(name) except UnknownMacroLibrary: pass # neither pool nor macroserver contains any element with this name raise UnknownParamObj('%s with name %s does not exist' % \ (self._name, name)) def getObjDict(self, pool=ParamType.All, cache=False): macro_server = self.macro_server objs = CaselessDict() if macro_server.is_macroserver_interface(self._name): return macro_server.get_elements_with_interface(self._name) if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: for elem_info in pool.getElementsWithInterface(self._name).values(): if self.accepts(elem_info): objs[elem_info.name] = elem_info return objs def getObjListStr(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return obj_dict.keys() def getObjList(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return obj_dict.values() class AttrParamType(ParamType): pass AbstractParamTypes = ParamType, ElementParamType, ElementParamInterface, AttrParamType class ParamDecoder: def __init__(self, door, macro_meta, param_str_list): self.door = door self.macro_meta = macro_meta self.param_str_list = param_str_list self.param_list = None self.decode() @property def type_manager(self): return self.door.type_manager def decode(self): macro_meta = self.macro_meta macro_type = macro_meta.get_type() pars_str = self.param_str_list[1:] pars_def = macro_meta.get_parameter() if macro_type == ElementType.MacroClass: _, self.param_list = self.decodeNormal(pars_str, pars_def) elif macro_type == ElementType.MacroFunction: if macro_meta.function.param_def is None: self.param_list = self.param_str_list[1:] else: _, self.param_list = self.decodeNormal(pars_str, pars_def) return self.param_list def decodeNormal(self, str_list, def_list): str_len = len(str_list) obj_list = [] str_idx = 0 for i, par_def in enumerate(def_list): name = par_def['name'] type_class = par_def['type'] def_val = par_def['default_value'] if str_idx == str_len: if def_val is None: if not isinstance(type_class, list): raise MissingParam, "'%s' not specified" % name elif isinstance(type_class, list): min_rep = par_def['min'] if min_rep > 0: msg = "'%s' demands at least %d values" %\ (name, min_rep) raise WrongParam, msg new_obj_list = [] if not def_val is None: new_obj_list.append(def_val) else: if isinstance(type_class, list): data = self.decodeRepeat(str_list[str_idx:], par_def) dec_token, new_obj_list = data else: type_manager = self.type_manager type_name = type_class type_class = type_manager.getTypeClass(type_name) par_type = type_manager.getTypeObj(type_name) par_str = str_list[str_idx] try: val = par_type.getObj(par_str) except ValueError, e: raise WrongParamType, e.message except UnknownParamObj, e: raise WrongParam, e.message if val is None: msg = 'Could not create %s parameter "%s" for "%s"' % \ (par_type.getName(), name, par_str) raise WrongParam, msg dec_token = 1 new_obj_list = [val] str_idx += dec_token obj_list += new_obj_list return str_idx, obj_list def decodeRepeat(self, str_list, par_def): name = par_def['name'] param_def = par_def['type'] min_rep = par_def['min'] max_rep = par_def['max'] dec_token = 0 obj_list = [] rep_nr = 0 while dec_token < len(str_list): if max_rep is not None and rep_nr == max_rep: break new_token, new_obj_list = self.decodeNormal(str_list[dec_token:], param_def) dec_token += new_token if len(new_obj_list) == 1: new_obj_list = new_obj_list[0] obj_list.append(new_obj_list) rep_nr += 1 if rep_nr < min_rep: msg = 'Found %d repetitions of param %s, min is %d' % \ (rep_nr, name, min_rep) raise RuntimeError, msg return dec_token, obj_list def getParamList(self): return self.param_list def __getattr__(self, name): return getattr(self.param_list, name)