Source code for hwt.hwIOs.agents.bramPort

from collections import deque
from typing import Literal, Union

from hwt.constants import READ, WRITE, NOP
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsConst import HBitsConst
from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.agents.clk import ClockAgent
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import WaitCombRead, WaitWriteOnly, WaitCombStable, Timer
from pyMathBitPrecise.bit_utils import apply_set_and_clear, mask, \
    byte_mask_to_bit_mask_int, bit_mask_to_byte_mask_int


[docs] @staticmethod def storeToRamMaskedByIndex(ram:dict[int, Union[tuple[int, int], HBitsConst]], index: int, data: Union[int, HBitsConst], bitmask: Union[int, HBitsConst], isInHBits=False): if not bitmask: # nothing will be set, no point in further update return # clear all bytes unsed by mask data &= bitmask if isInHBits: # explicitely invalidate bytes of data data = data._dtype.from_py(data.val, data.vld_mask & bitmask.val) cur = ram.get(index) if cur is not None: # merge previous and new data if isInHBits: data = apply_set_and_clear(cur, data & bitmask, bitmask) else: data = apply_set_and_clear(cur[0], data & bitmask, bitmask) bitmask |= cur[1] # print(f"storing {index:02x}: ({data if isinstance(data, int) else data.val:04x}, {int(bitmask):04x}) {bit_mask_to_byte_mask_int(int(bitmask), 32):01x}") if isInHBits: ram[index] = data else: ram[index] = (data, bitmask)
[docs] @staticmethod def storeToRamMaskedByAddress(ram: dict[int, Union[tuple[int, int], HBitsConst]], address: int, wordAlignAddrBitCnt: int, data: Union[int, HBitsConst], bitmask: Union[int, HBitsConst], isInHBits=False): """ :param isInHBits: switches between implementation for int and HBitsConst type for data/bitmask :param wordAlignAddrBitCnt: number of lsb bits which are discarded during conversion of address to ram index and which are used to address inside of the ram word """ # print(f"storing a:{address:08x}: ({data:064x}, {bitmask:064x}) {bit_mask_to_byte_mask_int(bitmask, 256):08x}") alignShift = address & mask(wordAlignAddrBitCnt) index0 = address >> wordAlignAddrBitCnt if alignShift: # the address is not aligned to a word boundary must split to 2 transactions # if first store actually sotres some data if isInHBits: DATA_WIDTH = data._dtype.bit_length() MASK_WIDTH = bitmask._dtype.bit_length() data = data._zext(DATA_WIDTH * 2) bitmask = bitmask._zext(MASK_WIDTH * 2) data <<= alignShift * 8 bitmask <<= alignShift * 8 if isInHBits: bitmask0 = bitmask[MASK_WIDTH:] bitmask1 = bitmask[:MASK_WIDTH] else: wordBitCnt = (2 ** wordAlignAddrBitCnt) * 8 wordBitMask = mask(wordBitCnt) bitmask0 = bitmask & wordBitMask bitmask1 = bitmask >> wordBitCnt if bitmask0: if isInHBits: data0 = data[DATA_WIDTH:] else: data0 = data & wordBitMask storeToRamMaskedByIndex(ram, index0, data0, bitmask0, isInHBits=isInHBits) # if second store actually stores some data if bitmask1: if isInHBits: data1 = data[:DATA_WIDTH] else: data1 = data >> wordBitCnt storeToRamMaskedByIndex(ram, index0 + 1, data1, bitmask1, isInHBits=isInHBits) else: storeToRamMaskedByIndex(ram, index0, data, bitmask, isInHBits=isInHBits)
HwIOBramPort_noClkAgent_requestTy = Union[ tuple[READ, HBitsConst], tuple[WRITE, HBitsConst, HBitsConst], # addr, data tuple[WRITE, HBitsConst, HBitsConst, HBitsConst], # addr, data, mask ]
[docs] class HwIOBramPort_noClkAgent(SyncAgentBase): """ A simulation agent for BramPort_withoutClk interface In slave mode acts as a memory, in master mode dispatches requests stored in "requests" dequeue :ivar ~.requests: list of tuples (request type, address, [write data]) - used for driver :ivar ~.data: list of data in memory, used for monitor :ivar ~.mem: if agent is in monitor mode (= is slave) all reads and writes are performed on mem object """
[docs] def __init__(self, sim: HdlSimulator, hwIO: "HwIOBramPort_noClk"): super().__init__(sim, hwIO, allowNoReset=True) self.HAS_WE = hasattr(hwIO, "we") self.HAS_BE = hwIO.HAS_BE and hwIO.DATA_WIDTH > 8 if self.HAS_BE: assert hwIO.DATA_WIDTH % 8 == 0, ("Expects only complete bytes", hwIO, hwIO.DATA_WIDTH) self.requests: deque[HwIOBramPort_noClkAgent_requestTy] = deque() self.readPending = False self.r_data = deque() self.mem: dict[int, HBitsConst] = {} self.requireInit = True self.clk_ag = None
[docs] def doReq(self, req: HwIOBramPort_noClkAgent_requestTy): rw = req[0] addr = req[1] hwIO = self.hwIO if rw == READ: assert hwIO.HAS_R, hwIO we = 0 wdata = None self.readPending = True if self._debugOutput is not None: self._debugOutput.write("%s, after %r read_req: %d\n" % ( self.hwIO._getFullName(), self.sim.now, addr)) elif rw == WRITE: assert hwIO.HAS_W, hwIO wdata = req[2] if len(req) == 3: if self.HAS_WE: we = mask(hwIO.we._dtype.bit_length()) else: assert self.HAS_WE we = req[3] if self._debugOutput is not None: self._debugOutput.write(f"{self.hwIO._getFullName():s}, after {self.sim.now:d}" f" write: 0x{int(addr):x}:{wdata} {int(we)}\n") else: raise NotImplementedError(rw) hwIO.addr.write(addr) if self.HAS_WE: hwIO.we.write(we) if hwIO.HAS_W: hwIO.din.write(wdata)
[docs] def onReadReq(self, addr: HBitsConst): """ on readReqRecieved in monitor mode """ self.requests.append((READ, addr))
[docs] def onWriteReq(self, addr: HBitsConst, data: HBitsConst, mask: Union[HBitsConst, Literal[0, 1, None]]): """ on writeReqRecieved in monitor mode """ self.requests.append((WRITE, addr, data, mask))
[docs] def monitor(self): """ Handle read/write request on this interfaces This method is executed on clock edge. This means that the read data should be put on dout after clock edge. """ hwIO = self.hwIO yield WaitCombStable() if self.notReset(): en = hwIO.en.read() en = int(en) if en: if self.HAS_WE: we = hwIO.we.read() we = int(we) elif hwIO.HAS_W: we = 1 else: we = 0 addr = hwIO.addr.read() if we: data = hwIO.din.read() self.onWriteReq(addr, data, we) elif hwIO.HAS_R: self.onReadReq(addr) if self.requests: req = self.requests.popleft() t = req[0] addr = req[1] if t == READ: v = self.mem.get(addr.val, None) yield Timer(1) yield WaitWriteOnly() hwIO.dout.write(v) if self._debugOutput is not None: self._debugOutput.write(f"{self.hwIO._getFullName(),}, after {self.sim.now}, read 0x{int(addr):x} {v}\n") else: assert t == WRITE # yield WaitWriteOnly() # hwIO.dout.write(None) yield Timer(1) # after clock edge yield WaitWriteOnly() wData = req[2] wMask = req[3] if addr._is_full_valid(): if self.HAS_BE: maskWidth = hwIO.we._dtype.bit_length() wMaskExt = byte_mask_to_bit_mask_int(wMask, maskWidth) if self._debugOutput is not None: self._debugOutput.write(f"{self.hwIO._getFullName(),}, after {self.sim.now}, write 0x{int(addr):x} {wData}, 0x{wMask:x}\n") storeToRamMaskedByIndex(self.mem, int(addr), wData, HBits(maskWidth * 8).from_py(wMaskExt), isInHBits=True) else: if self._debugOutput is not None: self._debugOutput.write(f"{self.hwIO._getFullName(),}, after {self.sim.now}, write 0x{int(addr):x} {wData}\n") self.mem[addr.val] = wData else: if self._debugOutput is not None: self._debugOutput.write(f"{self.hwIO._getFullName(),}, after {self.sim.now}, write with invalid addr\n") self.mem.clear()
[docs] def driver(self): hwIO = self.hwIO if self.requireInit: yield WaitWriteOnly() hwIO.en.write(0) if self.HAS_WE: hwIO.we.write(0) self.requireInit = False readPending = self.readPending yield WaitCombRead() if self.requests and self.notReset(): yield WaitWriteOnly() req = self.requests.popleft() if req is NOP: hwIO.en.write(0) if self.HAS_WE: hwIO.we.write(0) self.readPending = False else: self.doReq(req) hwIO.en.write(1) else: yield WaitWriteOnly() hwIO.en.write(0) if self.HAS_WE: hwIO.we.write(0) self.readPending = False if readPending: # in previous clock the read request was dispatched, now we are collecting the data yield WaitCombStable() # now we are after clk edge d = hwIO.dout.read() self.r_data.append(d) if self._debugOutput is not None: self._debugOutput.write("%s, on %r read_data: %d\n" % ( self.hwIO._getFullName(), self.sim.now, d.val))
[docs] class HwIOBramPortAgent(HwIOBramPort_noClkAgent):
[docs] def getDrivers(self): yield from super(HwIOBramPortAgent, self).getDrivers() self.clk_ag = ClockAgent(self.sim, self.hwIO.clk) yield from self.clk_ag.getDrivers()