Source code for hwt.synthesizer.rtlLevel.rtlSignal

from copy import copy
from typing import Generator, Dict, Tuple, Set, Union, Self, List, \
    Literal, Optional

from hwt.constants import NOT_SPECIFIED
from hwt.doc_markers import internal
from hwt.hdl.const import HConst
from hwt.hdl.operatorDefs import HOperatorDef, HwtOps, CAST_OPS
from hwt.hdl.portItem import HdlPortItem
from hwt.hdl.sensitivityCtx import SensitivityCtx
from hwt.hdl.statements.assignmentContainer import HdlAssignmentContainer
from hwt.hdl.statements.statement import HdlStatement, HwtSyntaxError
from hwt.hdl.types.bitsCastUtils import fitTo_t
from hwt.hdl.types.defs import SLICE, INT
from hwt.hdl.types.hdlType import HdlType
from hwt.hdl.types.typeCast import toHVal
from hwt.hdl.variables import HdlSignalItem
from hwt.mainBases import RtlSignalBase, HwIOBase
from hwt.pyUtils.setList import SetList
from hwt.synthesizer.exceptions import TypeConversionErr
from hwt.synthesizer.rtlLevel.exceptions import SignalDriverErr, \
    SignalDriverErrType

OperatorCaheKeyType = Union[
    Tuple['OpDefinition', int, object],
    Tuple['OpDefinition', int, object, object],
    Tuple['OpDefinition', int, object, object, object],
]


[docs] class CREATE_NEXT_SIGNAL():
[docs] def __init__(self): raise AssertionError("This class should be used as a constant")
[docs] class RtlSignal(RtlSignalBase, HdlSignalItem): """ RtlSignal signal is a connection between statements and operators in circuit graph. :ivar ~._rtlEndpoints: SetList of operators and statements for which this signal is driver. :ivar ~._rtlDrivers: SetList of operators and statements which can drive this signal. If driver is statement tree only top statement is present. :ivar ~._usedOps: A dictionary of used operators which can be reused. :ivar ~._usedOpsAlias: A dictionary tuple of operator and operands to set of tuples of operator and operands, used to resolve which combination of the operator and operands resulted in to same result. :note: The _usedOps, _usedOpsAlias cache record is generated only for the left most signal in expression. :ivar ~._isUnnamedExpr: means that this signal is part of expression and should not be rendered :ivar ~._nop_val: value which is used to fill up statements when no other value is assigned, use NOT_SPECIFIED to disable :ivar ~._const: flag which tell that this signal can not have any other driver than a default value :cvar __instCntr: counter used for generating instance ids :ivar ~._instId: internally used only for intuitive sorting of statements in serialized code :ivar ~._rtlObjectOrigin: optionally an object which generated this signal :ivar ~._rtlNextSig: optional signal signal which is used as a next signal if this RtlSignal is actually a FF output. """ __instCntr = 0 __slots__ = [ "_ctx", "_rtlEndpoints", "_rtlDrivers", "_usedOps", "_usedOpsAlias", "_isUnnamedExpr", "_hdlName", "_hasGenericName", "_instId", "_nop_val", "_const", "_hwIO", "_rtlObjectOrigin", "_rtlNextSig", ]
[docs] def __init__(self, ctx: 'RtlNetlist', name: str, dtype: HdlType, def_val=None, nop_val=NOT_SPECIFIED, next_signal:Union[RtlSignalBase, Literal[NOT_SPECIFIED, CREATE_NEXT_SIGNAL]]=NOT_SPECIFIED, virtual_only=False, is_const=False): """ :param ctx: context - RtlNetlist which is this signal part of :param name: name hint for this signal, if is None name is chosen automatically :param def_val: value which is used for reset and as default value in HDL :param nop_val: value which is used to fill up statements when no other value is assigned, use NOT_SPECIFIED to disable :param is_const: flag which tell that this signal can not have any other driver than a default value """ self._instId: int = RtlSignal._nextInstId() if name is None: name = "sig_" self._hasGenericName = True else: self._hasGenericName = False assert isinstance(dtype, HdlType) super(RtlSignal, self).__init__(name, dtype, def_val, virtual_only=virtual_only) self._rtlCtx = ctx if ctx: # params do not have any context on created # and it is assigned after param is bounded to unit or interface ctx.signals.add(self) # set can not be used because hash of items are changing self._rtlEndpoints: SetList[Union[HdlStatement, HdlPortItem, "Operator"]] = SetList() self._rtlDrivers: SetList[HdlStatement, HdlPortItem, "Operator"] = SetList() self._usedOps: Dict[OperatorCaheKeyType, RtlSignal] = {} self._usedOpsAlias: Dict[OperatorCaheKeyType, Set[OperatorCaheKeyType]] = {} self._isUnnamedExpr: bool = True self._nop_val = nop_val self._const = is_const self._rtlObjectOrigin = None if nop_val is NOT_SPECIFIED: nop_val = self # construct next signal if requested if next_signal is NOT_SPECIFIED: _next_signal = None elif next_signal is CREATE_NEXT_SIGNAL: _next_signal = self.__class__(ctx, name + "_next", dtype, nop_val=nop_val) else: assert isinstance(next_signal, RtlSignalBase), next_signal assert next_signal._dtype is dtype _next_signal = next_signal if _next_signal._nop_val is NOT_SPECIFIED: _next_signal._nop_val = self self._rtlNextSig: Optional[RtlSignal] = _next_signal
[docs] @internal @classmethod def _nextInstId(cls): """ Get next instance id """ i = cls.__instCntr cls.__instCntr += 1 return i
[docs] def staticEval(self): # operator writes in self._val new value driven_by_def_val = True if self._rtlDrivers: for d in self._rtlDrivers: if isinstance(d, HdlPortItem): assert d.getInternSig() is self, (d, self) continue d.staticEval() driven_by_def_val = False if driven_by_def_val: if isinstance(self.def_val, RtlSignal): self._val = self.def_val._val.staticEval() else: # _val is invalid initialization value self._val = self.def_val.__copy__() if not isinstance(self._val, HConst): raise ValueError( "Evaluation of signal returned not supported object (%r)" % (self._val,)) return self._val
[docs] def singleDriver(self): """ Returns a first driver if signal has only one driver. """ d_cnt = len(self._rtlDrivers) if d_cnt == 0: raise SignalDriverErr([(SignalDriverErrType.MISSING_DRIVER, self), ]) elif d_cnt > 1: raise SignalDriverErr([(SignalDriverErrType.MULTIPLE_COMB_DRIVERS, self), ]) return self._rtlDrivers[0]
[docs] @internal def _walk_sensitivity(self, casualSensitivity: Set[RtlSignalBase], seen: Set[RtlSignalBase], ctx: SensitivityCtx): """ Walk expression and collect signals which is this expression sensitive to. (:see: what is signal sensitivity in vhdl/verilog) :param casualSensitivity: set of public signals which is this expression sensitive to but rising/faling edge operator is not present :param seen: set of all seen signals :param ctx: context where sensitivity """ seen.add(self) if self._const: return if not self._isUnnamedExpr: casualSensitivity.add(self) return try: op = self.singleDriver() except SignalDriverErr: op = None if op is None or isinstance(op, HdlStatement): casualSensitivity.add(self) return op._walk_sensitivity(casualSensitivity, seen, ctx)
[docs] @internal def _walk_public_drivers(self, seen: set) -> Generator["RtlSignal", None, None]: """ Walk all non hidden signals in an expression """ seen.add(self) if not self._isUnnamedExpr: yield self return assert self._rtlDrivers, self for d in self._rtlDrivers: # d has to be operator otherwise this signal would be public itself assert not isinstance(d, HdlStatement), (d.__class__) yield from d._walk_public_drivers(seen)
[docs] def _auto_cast(self, toType: HdlType): """ Cast value or signal of this type to another compatible type. :param toType: instance of HdlType to cast into """ return self._dtype.auto_cast_RtlSignal(self, toType)
[docs] def _explicit_cast(self, toType: HdlType): """ Cast value or signal of this type to another friendly type. :param toType: instance of HdlType to cast into """ return self._dtype.explicit_cast_RtlSignal(self, toType)
[docs] def _reinterpret_cast(self, toType: HdlType): """ Cast value or signal of this type to another type of same size. :param toType: instance of HdlType to cast into """ return self._dtype.reinterpret_cast_RtlSignal(self, toType)
[docs] @internal def _create_HOperator(self, operator: HOperatorDef, opCreateDelegate , *otherOps) -> Union[Self, HConst]: """ Try lookup operator with this parameters in _usedOps if not found create new one and stored it in _usedOps :param operator: instance of HOperatorDef :param opCreateDelegate: function (\\*ops) to create operator :param otherOps: other operands (ops = self + otherOps) :return: RtlSignal which is result of newly created operator """ indexOfSelfInOperands = 0 k = (operator, indexOfSelfInOperands, *otherOps) used = self._usedOps try: return used[k] except KeyError: pass o = opCreateDelegate(self, *otherOps) # input operands may be type converted, # search if this happened, and return always same result signal try: op_instantiated = (o._rtlObjectOrigin.operator == operator and o._rtlObjectOrigin.operands[indexOfSelfInOperands] is self) except AttributeError: op_instantiated = False usedOpsAlias = self._usedOpsAlias if op_instantiated: # try check real operands and operator which were used after all default type conversions k_real = (operator, indexOfSelfInOperands, *o._rtlObjectOrigin.operands[1:]) if k != k_real: alias = usedOpsAlias[k_real] usedOpsAlias[k] = alias alias.add(k) used[k] = o return o
[docs] @internal def _getIndexCascade(self): """ Find out if this signal is something indexed """ hwIO = self indexes = [] sign_cast_seen = False while True: try: # now self is the result of the index xxx[xx] <= source # get index op d = hwIO.singleDriver() try: op = d.operator except AttributeError: # probably port or statement break if op == HwtOps.INDEX or op == HwtOps.DOT: # get signal on which is index applied indexedOn = d.operands[0] if isinstance(indexedOn, RtlSignalBase): hwIO = indexedOn indexes.append(d.operands[1]) else: raise HwtSyntaxError("can not assign to a static value", indexedOn) elif op == HwtOps.TRUNC: indexedOn = d.operands[0] width = int(d.operands[1]) if isinstance(indexedOn, RtlSignalBase): if hwIO._dtype.bit_length() > 1 or hwIO._dtype.force_vector: indexes.append(SLICE.from_py(slice(width, 0, -1))) else: indexes.append(INT.from_py(width)) hwIO = indexedOn else: raise HwtSyntaxError("can not assign to a static value", indexedOn) elif op in CAST_OPS: sign_cast_seen = True hwIO = d.operands[0] else: # the concatenations should have been already resolved before entering of this function raise HwtSyntaxError( f"can not assign to result of operator {d}") except SignalDriverErr: break if not indexes: indexes = None else: indexes.reverse() return hwIO, indexes, sign_cast_seen
[docs] def _getDestinationSignalForAssignmentToThis(self): """ :return: a signal which should be used as a destination if assigning to this signal """ return self if self._rtlNextSig is None else self._rtlNextSig
def __call__(self, source, dst_resolve_fn=lambda x: x._getDestinationSignalForAssignmentToThis(), exclude=None, fit=False) -> Union[HdlAssignmentContainer, List[HdlAssignmentContainer]]: """ Create assignment to this signal :attention: it is not call of function it is operator of assignment :return: list of assignments """ assert not self._const, self if exclude is not None and (self in exclude or source in exclude): return [] if isinstance(source, HwIOBase) and not source._hwIOs: assert source._isAccessible, (source, "must be a Signal Interface which is accessible in current scope") source = source._sig assert self._dtype.isScalar(), ("For non scalar types this should be overriden", self._dtype, self.__class__, self) try: if source is None: requires_type_check = False source = self._dtype.from_py(None) else: requires_type_check = True source = toHVal(source, suggestedType=self._dtype) except Exception as e: # simplification of previous exception traceback e_simplified = copy(e) raise e_simplified if requires_type_check: err = False try: if fit: source = fitTo_t(source, self._dtype) source = source._auto_cast(self._dtype) except TypeConversionErr: err = True if err: raise TypeConversionErr( "Can not connect ", source._dtype, " to ", self._dtype, " ", source, self) if self._isUnnamedExpr: try: d = self.singleDriver() except: d = None operator = getattr(d, "operator", None) if operator is not None: if operator.allowsAssignTo: if operator == HwtOps.NOT: # instead of assigning to negation we assign the negation return d.operands[0](~source, dst_resolve_fn=dst_resolve_fn, exclude=exclude, fit=fit) elif operator in CAST_OPS: # we need to assert that src and dst type matches, but we do not anything else dst = d.operands[0] src_sign = source._dtype.signed dst_sign = dst._dtype.signed if src_sign == dst_sign: return dst(source) elif dst_sign is None: return dst(source._vec()) elif dst_sign: return dst(source._signed()) else: return dst(source._unsigned()) elif operator == HwtOps.CONCAT: offset = 0 res = [] # reversed because LSB first for op in reversed(d.operands): w = op._dtype.bit_length() res.append(op(source[w + offset: offset])) offset += w return res else: raise AssertionError("Assignment to operator is not allowed by operator definition", self,) try: mainSig, indexCascade, signCastSeen = self._getIndexCascade() mainSig = dst_resolve_fn(mainSig) if signCastSeen: src_sign = source._dtype.signed dst_sign = mainSig._dtype.signed if src_sign == dst_sign: pass elif dst_sign is None: source = source._vec() elif dst_sign: source = source._signed() else: source = source._unsigned() return HdlAssignmentContainer(source, mainSig, indexCascade) except Exception as e: # simplification of previous exception traceback e_simplified = copy(e) raise e_simplified
[docs] def _getAssociatedClk(self) -> Self: assert self._rtlNextSig is not None, self d = self.singleDriver() # this expects a simple if rising_edge(clk) # assert isinstance(d, IfContainer), d cond = d.cond.singleDriver() # assert isinstance(cond, HOperatorNode) and cond.operator is HwtOps.RISING_EDGE, cond return cond.operands[0]
[docs] def _getAssociatedRst(self) -> Self: assert self._rtlNextSig is not None, self d = self.singleDriver() # this expects a simple if rising_edge(clk) # assert isinstance(d, IfContainer), d # cond = d.cond.singleDriver() # assert isinstance(cond, HOperatorNode) and cond.operator is HwtOps.RISING_EDGE, cond assert len(d.ifTrue) == 1 reset_if = d.ifTrue[0] return reset_if.cond
[docs] def _is_full_valid(self) -> bool: return self._const and self._val._is_full_valid()
[docs] def _is_partially_valid(self) -> bool: return self._const and self._val._is_partially_valid()