Source code for hwt.hdl.types.streamVal

from hwt.doc_markers import internal
from hwt.hdl.operator import Operator
from hwt.hdl.operatorDefs import AllOps
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.defs import BOOL, INT
from hwt.hdl.types.slice import HSlice
from hwt.hdl.types.typeCast import toHVal
from hwt.hdl.value import HValue
from hwt.synthesizer.rtlLevel.mainBases import RtlSignalBase


[docs]class HStreamVal(HValue): """ Class for values of HStream HDL type """
[docs] @classmethod def from_py(cls, typeObj, val, vld_mask=None): """ :param typeObj: HStream instance :param val: None or iterrable of values :param vld_mask: if is None validity is resolved from val if is 0 value is invalidated if is 1 value has to be valid """ min_len = typeObj.len_min if vld_mask == 0: val = None element_t = typeObj.element_t if val is None: elements = [element_t.from_py(None) for _ in range(min_len)] else: elements = [] for v in val: if isinstance(v, RtlSignalBase): # is signal assert v._dtype == typeObj.element_t e = v else: e = typeObj.element_t.from_py(v) elements.append(e) cur_len = len(elements) assert cur_len >= min_len and cur_len <= typeObj.len_max _mask = int(bool(val)) if vld_mask is None: vld_mask = _mask else: assert (vld_mask == _mask) return cls(typeObj, elements, vld_mask)
[docs] def to_py(self): if not self._is_full_valid(): raise ValueError(f"Value of {self} is not fully defined") return [v.to_py() for v in self.val]
@internal def __hash__(self): return hash((self._dtype, self.val, self.vld_mask))
[docs] def _is_full_valid(self): return self.vld_mask == 1
[docs] @internal def _getitem__val(self, key): """ :atention: this will clone item from array, iterate over .val if you need to modify items """ kv = key.val if not key._is_full_valid(): raise KeyError() return self.val[kv].__copy__()
def __getitem__(self, key): iamVal = isinstance(self, HValue) key = toHVal(key) isSLICE = isinstance(key, HSlice.getValueCls()) if isSLICE: raise NotImplementedError() elif isinstance(key, (HValue, RtlSignalBase)): pass else: raise NotImplementedError( f"Index operation not implemented for index {key}") if iamVal and isinstance(key, HValue): return self._getitem__val(key) return Operator.withRes(AllOps.INDEX, [self, key], self._dtype.element_t)
[docs] @internal def _setitem__val(self, index, value): if index._is_full_valid(): self.val[index.val] = value.__copy__() else: self.val = {}
def __setitem__(self, index, value): """ Only syntax sugar for user, not used inside HWT * In HW design is not used (__getitem__ returns "reference" and it is used) * In simulator is used _setitem__val directly """ if isinstance(index, int): index = INT.from_py(index) else: assert isinstance(self, HValue) assert isinstance(index._dtype, Bits), index._dtype if not isinstance(value, HValue): value = self._dtype.element_t.from_py(value) else: assert value._dtype == self._dtype.element_t, ( value._dtype, self._dtype.element_t) return self._setitem__val(index, value) def __iter__(self): return iter(self.val) def __len__(self): return len(self.val)
[docs] @internal def _eq__val(self, other): assert self._dtype.element_t == other._dtype.element_t eq = True vld = 1 if (len(self.val) == len(other.val)): for a, b in zip(self.val, other.val): eq = eq and bool(a) == bool(b) if not eq: break vld = vld & a.vld_mask & b.vld_mask else: eq = False vld = 0 return BOOL.getValueCls()(BOOL, int(eq), vld)
[docs] def _eq(self, other): assert isinstance(other, HStreamVal) return self._eq__val(other)