Source code for cimpyorm.Model.Elements.Property

#   Copyright (c) 2018 - 2020 Institute for High Voltage Technology and Institute for High Voltage Equipment and Grids, Digitalization and Power Economics
#   RWTH Aachen University
#   Contact: Thomas Offergeld (t.offergeld@iaew.rwth-aachen.de)
#  #
#   This module is part of CIMPyORM.
#  #
#   CIMPyORM is licensed under the BSD-3-Clause license.
#   For further information see LICENSE in the project's root directory.
#

from collections import OrderedDict
from typing import Union
from string import ascii_letters, digits

from sqlalchemy import Column, String, ForeignKey, Boolean, Float, Integer, Table, ForeignKeyConstraint
from sqlalchemy.orm import relationship

from cimpyorm.auxiliary import get_logger, shorten_namespace, XPath
from cimpyorm.Model import auxiliary as aux
from cimpyorm.Model.Elements.Base import ElementMixin, se_ref
from cimpyorm.Model.Elements.Class import CIMClass
from cimpyorm.Model.Elements.Enum import CIMEnumValue, CIMEnum
from cimpyorm.Model.Elements.Datatype import CIMDT

log = get_logger(__name__)


[docs]class CIMProp(ElementMixin, aux.Base): """ Class representing a CIM Model property """ # pylint: disable=too-many-instance-attributes __tablename__ = "CIMProp" XPathMap = None prop_name = Column(String(50)) # Map the property to its class cls_name = Column(String(80), primary_key=True) cls_namespace = Column(String(30), primary_key=True) #: The class this property belongs to. cls = relationship(CIMClass, foreign_keys=[cls_name, cls_namespace], backref="props") datatype_name = Column(String(50)) datatype_namespace = Column(String(50)) #: This property's datatype. datatype = relationship(CIMDT, foreign_keys=[datatype_name, datatype_namespace], backref="usedby") inverse_property_name = Column(String(80)) inverse_property_namespace = Column(String(30)) inverse_class_name = Column(String(80)) inverse_class_namespace = Column(String(30)) #: The inverse property associated with this property (None if the property is a Primitive or #: an EnumValue) inverse = relationship("CIMProp", foreign_keys=[inverse_property_name, inverse_property_namespace, inverse_class_name, inverse_class_namespace], uselist=False) used = Column(Boolean) multiplicity = Column(String(10)) many_remote = Column(Boolean) optional = Column(Boolean) type_ = Column(String(80)) type = "Generic" __table_args__ = (ForeignKeyConstraint((cls_namespace, cls_name), (CIMClass.namespace_name, CIMClass.name)), ForeignKeyConstraint((datatype_namespace, datatype_name), (CIMDT.namespace_name, CIMDT.name)), ForeignKeyConstraint(("inverse_class_name", "inverse_class_namespace", "inverse_property_namespace", "inverse_property_name"), ("CIMProp.cls_name", "CIMProp.cls_namespace", "CIMProp.namespace_name", "CIMProp.name")), ) __mapper_args__ = { "polymorphic_identity": __tablename__, "polymorphic_on": type_ } def __init__(self, schema_elements=None): """ Class constructor :param schema_elements: the (merged) xml node element containing the property's description """ super().__init__(schema_elements) self._inverseProperty = None self.cls_namespace, self.cls_name = self._get_domain() _, self.range_name = self._get_range() self.range_namespace = self.namespace_name self.prop_name = self.name.split(".")[-1] self.datatype_namespace, self.datatype_name = self._get_datatype() self._set_inverse() self.used = self._get_used() self.multiplicity = self._multiplicity self.many_remote = self._many_remote self.optional = self._optional self.key = None self.var_key = None self.xpath = None self.association_table = None @classmethod def _generateXPathMap(cls): super()._generateXPathMap() Map = { "label": XPath(r"rdfs:label/text()", namespaces=cls.nsmap), "association": XPath(r"cims:AssociationUsed/text()", namespaces=cls.nsmap), "inverseRoleName": XPath(r"cims:inverseRoleName/@rdf:resource", namespaces=cls.nsmap), "datatype": XPath(r"cims:dataType/@rdf:resource", namespaces=cls.nsmap), "multiplicity": XPath(r"cims:multiplicity/@rdf:resource", namespaces=cls.nsmap), "type": XPath(r"rdf:type/@rdf:resource", namespaces=cls.nsmap), "domain": XPath(r"rdfs:domain/@rdf:resource", namespaces=cls.nsmap), "range": XPath(r"rdfs:range/@rdf:resource", namespaces=cls.nsmap) } if not cls.XPathMap: cls.XPathMap = Map else: cls.XPathMap = {**cls.XPathMap, **Map} @property def u_key(self): r = (se_ref(self.name, self.namespace_name), se_ref(self.cls_name, self.cls_namespace)) return r def _set_inverse(self): inverse_ns, inverse_name = self._get_inverse() if not inverse_name: return self.inverse_class_name, self.inverse_property_name = inverse_name.split(".") self.inverse_property_namespace = self.inverse_class_namespace = inverse_ns def _get_used(self): """ Determine whether the property needs to be added to the SQLAlchemy declarative class (i.e. it is not an inverseProperty of an existing mapper or it maps to a value, not a reference). :return: True if property should be represented in the SQLAlchemy declarative model. """ return bool(self._get_association()) or self.inverse_property_name is None # @property # def _get_namespace(self) -> Union[str, None]: # return self._extract_namespace(self.name) def _get_datatype(self): dt = self._get_property("datatype") if not dt: return None, None ns, dt = self._extract_namespace(dt) return ns, dt @property def _multiplicity(self): mp = self._get_property("multiplicity") return mp.split("M:")[-1] if not isinstance(mp, list) \ else mp[0].split("M:")[-1] # pylint: disable=unsubscriptable-object def _get_association(self) -> Union[bool, None]: association = self._get_property("association") if not association: return None elif isinstance(association, list): if len(set(association)) == 1: return association[0] == "Yes" # pylint: disable=E1136 elif not set(association): return None else: raise ValueError(f"Ambiguous association used parameter for property {self.name}.") else: return association == "Yes" def _get_inverse(self): inverse = self._get_property("inverseRoleName") if not inverse: return None, None ns, inverse = self._extract_namespace(inverse) return ns, inverse def _get_namespace(self) -> Union[str, None]: stereotyped_namespace = self._get_property("stereotype_text") if stereotyped_namespace and stereotyped_namespace == "Entsoe": # Fixme: This is hardcoded as the "Entsoe" stereotype determines the namespace for # some properties. However, the same attribute is sometimes used to denote the CIM # Package (e.g. ShortCircuit) which should not be misinterpreted as a namespace. return "entsoe" else: # Determine from name return self._extract_namespace(self.schema_elements.name)[0] def _get_range(self): range = self._get_property("range") if not range: return None, None ns, range = self._extract_namespace(range) return ns, range def _get_domain(self): domain = self._get_property("domain") ns, domain = self._extract_namespace(domain) return ns, domain @property def mapped_datatype(self): # pylint: disable=inconsistent-return-statements if self.datatype: dt = self.datatype if dt.base_datatype is not None: return dt.base_datatype else: return dt.name else: return None @property def _many_remote(self): if isinstance(self._multiplicity, list): return any([mp[-1] in ["2", "n"] for mp in self._multiplicity]) # pylint: disable=not-an-iterable else: return self._multiplicity[-1] in ["2", "n"] @property def _optional(self): if isinstance(self._multiplicity, list): return any([mp.startswith("0") for mp in self._multiplicity]) # pylint: disable=not-an-iterable else: return self._multiplicity.startswith("0") @property def full_name(self): return self.namespace.short + "_" + self.name def generate(self, nsmap): attrs = OrderedDict() dt = self.mapped_datatype if self.used: if isinstance(self.range, CIMEnum): var, query_base = self.name_query() attrs[f"{var}_name"] = Column(String(120), ForeignKey(CIMEnumValue.name), name=f"{var}_name") attrs[f"{var}_namespace"] = Column(String(120), ForeignKey(CIMEnumValue.namespace_name), name=f"{var}_namespace") attrs[f"{var}_enum_name"] = Column(String(120), ForeignKey(CIMEnumValue.enum_name), name=f"{var}_enum_name") attrs[f"{var}_enum_namespace"] = Column(String(120), ForeignKey(CIMEnumValue.enum_namespace), name=f"{var}_enum_namespace") attrs[var] = relationship(CIMEnumValue, foreign_keys=(attrs[f"{var}_name"], attrs[f"{var}_namespace"], attrs[f"{var}_enum_name"], attrs[f"{var}_enum_namespace"])) attrs["__table_args__"] = (ForeignKeyConstraint( (attrs[f"{var}_name"], attrs[f"{var}_namespace"], attrs[f"{var}_enum_name"], attrs[f"{var}_enum_namespace"]), (CIMEnumValue.name, CIMEnumValue.namespace_name, CIMEnumValue.enum_name, CIMEnumValue.enum_namespace) ),) self.key = f"{var}" self.xpath = XPath(query_base + "/@rdf:resource", namespaces=nsmap) elif self.range: self.generate_relationship(nsmap) elif not self.range: var, query_base = self.name_query() log.debug(f"Generating property for {var} on {self.name}") self.key = var self.xpath = XPath(query_base + "/text()", namespaces=nsmap) if dt: if dt == "String": attrs[var] = Column(String(50), name=f"{var}") elif dt in ("Float", "Decimal"): attrs[var] = Column(Float, name=f"{var}") elif dt == "Integer": attrs[var] = Column(Integer, name=f"{var}") elif dt == "Boolean": attrs[var] = Column(Boolean, name=f"{var}") else: attrs[var] = Column(String(30), name=f"{var}") else: # Fallback to parsing as String(50) attrs[var] = Column(String(50), name=f"{var}") for attr, attr_value in attrs.items(): setattr(self.cls.class_, attr, attr_value) def set_var_key(self): end = "" if isinstance(self.range, CIMEnum): end = "_name" elif self.range: end = "_id" self.var_key = self.namespace.short + "_" + self.name if self.namespace.short != "cim" else self.name + end def name_query(self): try: var = self.namespace.short + "_" + self.name if self.namespace.short != "cim" else self.name except AttributeError: if self.namespace is None: raise KeyError(f"Undefined namespace: {self.namespace_name}") for _str in (self.namespace.short, self.cls.name, self.name): # Make sure there are no funky characters in the XPath query. if any((_char not in ascii_letters+digits+"_" for _char in _str)): raise ValueError("Malformed XPath-Query.") query_base = f"{self.namespace.short}:{self.cls.name}.{self.name}" return var, query_base def generate_relationship(self, nsmap=None): var, query_base = self.name_query() attrs = {} Map = {} log.debug(f"Generating relationship for {var} on {self.name}") if self.many_remote: if self.inverse: br = self.inverse.name if self.namespace.short == "cim" else \ self.namespace.short + "_" + self.inverse.name tbl = self.generate_association_table() self.association_table = tbl attrs[var] = relationship(self.range.full_name, secondary=tbl, backref=br) else: tbl = self.generate_association_table() attrs[var] = relationship(self.range.full_name, secondary=tbl) else: attrs[f"{var}_id"] = Column(String(50), ForeignKey(f"{self.range.full_name}.id"), name=f"{var}_id") if self.inverse: br = self.inverse.name if self.namespace.short == "cim" else \ self.namespace.short + "_" + self.inverse.name attrs[var] = relationship(self.range.full_name, foreign_keys=attrs[f"{var}_id"], backref=br) else: attrs[var] = relationship(self.range.full_name, foreign_keys=attrs[f"{var}_id"]) self.key = f"{var}_id" self.xpath = XPath(query_base + "/@rdf:resource", namespaces=nsmap) class_ = self.cls.class_ for attr, attr_value in attrs.items(): setattr(class_, attr, attr_value) return Map def generate_association_table(self): association_table = Table(f".asn_{self.cls.full_name}_{self.range.full_name}", aux.Base.metadata, Column(f"{self.range.full_name}_id", String(50), ForeignKey(f"{self.range.full_name}.id")), Column(f"{self.cls.full_name}_id", String(50), ForeignKey(f"{self.cls.full_name}.id"))) return association_table
class CIMProp_AlphaNumeric(CIMProp): __tablename__ = "CIMProp_AlphaNumeric" cls_name = Column(String(80), primary_key=True) cls_namespace = Column(String(30), primary_key=True) range = None type = "Alphanumeric" __table_args__ = (ForeignKeyConstraint(("cls_name", "cls_namespace", "namespace_name", "name"), ("CIMProp.cls_name", "CIMProp.cls_namespace", "CIMProp.namespace_name", "CIMProp.name")), ForeignKeyConstraint(("cls_namespace", "cls_name"), (CIMClass.namespace_name, CIMClass.name)), ) __mapper_args__ = { "polymorphic_identity": __tablename__ } def __init__(self, schema_elements=None): """ Class constructor :param schema_elements: the (merged) xml node element containing the property's description """ super().__init__(schema_elements) class CIMProp_Reference(CIMProp): __tablename__ = "CIMProp_Reference" cls_name = Column(String(80), primary_key=True) cls_namespace = Column(String(30), primary_key=True) range_name = Column(String(80)) range_namespace = Column(String(30)) range = relationship(CIMClass, foreign_keys=[range_name, range_namespace], backref="range_elements") type = "Reference" __table_args__ = (ForeignKeyConstraint(("cls_name","cls_namespace", "namespace_name", "name"), ("CIMProp.cls_name", "CIMProp.cls_namespace", "CIMProp.namespace_name", "CIMProp.name")), ForeignKeyConstraint(("cls_namespace", "cls_name"), (CIMClass.namespace_name, CIMClass.name)), ForeignKeyConstraint((range_namespace, range_name), (CIMClass.namespace_name, CIMClass.name)), ) __mapper_args__ = { "polymorphic_identity": __tablename__ } def __init__(self, schema_elements=None): """ Class constructor :param schema_elements: the (merged) xml node element containing the property's description """ super().__init__(schema_elements) class CIMProp_Enumeration(CIMProp): __tablename__ = "CIMProp_Enumeration" cls_name = Column(String(80), primary_key=True) cls_namespace = Column(String(30), primary_key=True) range_name = Column(String(80)) range_namespace = Column(String(30)) range = relationship(CIMEnum, foreign_keys=[range_name, range_namespace], backref="range_elements") type = "Enumeration" __table_args__ = (ForeignKeyConstraint(("cls_name","cls_namespace", "namespace_name", "name"), ("CIMProp.cls_name", "CIMProp.cls_namespace", "CIMProp.namespace_name", "CIMProp.name")), ForeignKeyConstraint(("cls_namespace", "cls_name"), (CIMClass.namespace_name, CIMClass.name)), ForeignKeyConstraint((range_namespace, range_name), (CIMEnum.namespace_name, CIMEnum.name)), ) __mapper_args__ = { "polymorphic_identity": __tablename__ } def __init__(self, schema_elements=None): """ Class constructor :param schema_elements: the (merged) xml node element containing the property's description """ super().__init__(schema_elements) def insert(self, argmap, value): argmap[f"{self.key}_name"] = value.split(".")[-1] argmap[f"{self.key}_namespace"] = self.namespace.short argmap[f"{self.key}_enum_name"] = \ shorten_namespace(value, self.nsmap).split("_")[-1].split(".")[0] argmap[f"{self.key}_enum_namespace"] = self.namespace.short