Source code for hwt.hdl.types.stream

from math import inf, isinf
from typing import List, Optional

from hwt.doc_markers import internal
from hwt.hdl.types.hdlType import HdlType
from hwt.serializer.generic.indent import getIndent


[docs]class HStream(HdlType): """ Stream is an abstract type. It is an array with unspecified size. :ivar ~.element_t: type of smalest chunk of data which can be send over this stream :ivar ~.len_min: minimum repetitions of element_t (inclusive interval) :ivar ~.len_max: maximum repetitions of element_t (inclusive interval) :ivar ~.start_offsets: list of numbers which represents the number of invalid bytes before valid data on stream (invalid bytes means the bytes which does not have bit validity set, e.g. Axi4Stream keep=0b10 -> offset=1 ) """
[docs] def __init__(self, element_t, frame_len=inf, start_offsets: Optional[List[int]]=None, const=False): super(HStream, self).__init__(const=const) self.element_t = element_t if isinstance(frame_len, float) and isinf(frame_len): frame_len = (1, inf) elif isinstance(frame_len, int): frame_len = (frame_len, frame_len) self.len_min, self.len_max = frame_len if start_offsets is None: start_offsets = (0, ) self.start_offsets = tuple(start_offsets)
[docs] def bit_length(self): if self.len_min != self.len_max or isinf(self.len_max): raise TypeError("This HStream does not have constant size", self) else: # len_min == len_max return self.len_min * self.element_t.bit_length()
def __eq__(self, other: HdlType): if self is other: return True if (type(self) is type(other)): if self.start_offsets == other.start_offsets \ and self.len_min == other.len_min \ and self.len_max == other.len_max: return self.element_t == other.element_t return False def __hash__(self): return hash((self.start_offsets, self.len_min, self.len_max, self.element_t))
[docs] @internal @classmethod def getValueCls(cls): try: return cls._valCls except AttributeError: from hwt.hdl.types.streamVal import HStreamVal cls._valCls = HStreamVal return cls._valCls
def __repr__(self, indent=0, withAddr=None, expandStructs=False): return "%s<%s len:%s, align:%r\n%s>" % ( getIndent(indent), self.__class__.__name__, (self.len_min, self.len_max), self.start_offsets, self.element_t.__repr__(indent=indent+1, withAddr=withAddr, expandStructs=expandStructs), )