Source code for hwt.interfaces.agents.fifo

from collections import deque

from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.process_utils import OnRisingCallbackLoop
from hwtSimApi.triggers import Timer, WaitWriteOnly, WaitCombRead, WaitCombStable, \
    WaitTimeslotEnd
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.constants import CLK_PERIOD


[docs]class FifoReaderAgent(SyncAgentBase): """ Simulation agent for FifoReader interface """
[docs] def __init__(self, sim, intf, allowNoReset=False): super(FifoReaderAgent, self).__init__(sim, intf, allowNoReset) self.data = deque() self.readPending = False self.lastData = None # flags to keep data coherent when enable state changes self.lastData_invalidate = False self.readPending_invalidate = False if intf.DATA_WIDTH == 0: raise NotImplementedError()
[docs] def setEnable_asDriver(self, en): lastEn = self._enabled super(FifoReaderAgent, self).setEnable_asDriver(en) self.intf.wait.write(not en) self.lastData_invalidate = not en if not lastEn: self.dataWriter.setEnable(en)
[docs] def setEnable_asMonitor(self, en): lastEn = self._enabled super(FifoReaderAgent, self).setEnable_asMonitor(en) self.intf.en.write(en) self.readPending_invalidate = not en if not lastEn: self.dataReader.setEnable(en)
# else dataReader will disable itself
[docs] def driver_init(self): yield WaitWriteOnly() self.intf.wait.write(not self._enabled)
[docs] def monitor_init(self): yield WaitWriteOnly() self.intf.en.write(self._enabled)
[docs] def get_data(self): return self.intf.data.read()
[docs] def dataReader(self): yield Timer(1) if self.readPending: yield WaitCombRead() d = self.get_data() self.data.append(d) if self.readPending_invalidate: self.readPending = False if not self.readPending and not self._enabled: self.dataWriter.setEnable(False)
[docs] def getMonitors(self): self.dataReader = OnRisingCallbackLoop(self.sim, self.clk, self.dataReader, self.getEnable) yield self.monitor_init() yield from super(FifoReaderAgent, self).getMonitors() yield self.dataReader()
[docs] def monitor(self): """ Initialize data reading if wait is 0 """ intf = self.intf yield WaitCombRead() if self.notReset(): # wait until wait signal is stable wait_last = None while True: yield WaitCombRead() wait = intf.wait.read() try: wait = int(wait) except ValueError: raise AssertionError(self.sim.now, intf, "wait signal in invalid state") if wait is wait_last: break else: wait_last = wait yield WaitWriteOnly() rd = not wait else: rd = False yield WaitWriteOnly() intf.en.write(rd) self.readPending = rd
[docs] def getDrivers(self): self.dataWriter = OnRisingCallbackLoop(self.sim, self.clk, self.dataWriter, self.getEnable) yield self.driver_init() yield from super(FifoReaderAgent, self).getDrivers() yield self.dataWriter()
[docs] def set_data(self, d): self.intf.data.write(d)
[docs] def dataWriter(self): # delay data litle bit to have nicer wave # otherwise wirte happens before next clk period # and it means in 0 time and we will not be able to see it in wave yield Timer(1) yield WaitWriteOnly() self.set_data(self.lastData) if self.lastData_invalidate: self.lastData = None if not self._enabled: self.dataWriter.setEnable(False)
[docs] def driver(self): # now we are before clock event # * set wait signal # * set last data (done in separate process) # * if en == 1, pop next data for next clk intf = self.intf yield WaitCombRead() rst_n = self.notReset() # speculative write if rst_n and self.data: wait = 0 else: wait = 1 yield WaitWriteOnly() intf.wait.write(wait) if rst_n: # wait for potential update of en # check if write can be performed and if it possible do real write yield WaitTimeslotEnd() en = intf.en.read() try: en = int(en) except ValueError: raise AssertionError(self.sim.now, intf, "en signal in invalid state") if en: assert self.data, (self.sim.now, intf, "underflow") self.lastData = self.data.popleft()
[docs]class FifoWriterAgent(SyncAgentBase): """ Simulation agent for FifoWriter interface """
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=False): super(FifoWriterAgent, self).__init__( sim, intf, allowNoReset=allowNoReset) self.data = deque() if intf.DATA_WIDTH == 0: raise NotImplementedError()
[docs] def driver_init(self): yield WaitWriteOnly() self.intf.en.write(self._enabled)
[docs] def monitor_init(self): yield WaitWriteOnly() self.intf.wait.write(not self._enabled)
[docs] def setEnable_asDriver(self, en): SyncAgentBase.setEnable_asDriver(self, en) self.intf.en.write(en)
[docs] def setEnable_asMonitor(self, en): SyncAgentBase.setEnable_asMonitor(self, en) self.intf.wait.write(not en)
[docs] def get_data(self): return self.intf.data.read()
[docs] def set_data(self, d): self.intf.data.write(d)
[docs] def monitor(self): # set wait signal # if en == 1 take data intf = self.intf yield WaitWriteOnly() intf.wait.write(0) yield WaitCombStable() # wait for potential update of en en = intf.en.read() try: en = int(en) except ValueError: raise AssertionError(self.sim.now, intf, "en signal in invalid state") if en: yield Timer(CLK_PERIOD // 10) yield WaitCombRead() self.data.append(self.get_data())
[docs] def driver(self): # if wait == 0 set en=1 and set data intf = self.intf d = None v = 0 yield WaitCombRead() if self.notReset() and self.data: yield WaitCombRead() wait = intf.wait.read() try: wait = int(wait) except ValueError: raise AssertionError(self.sim.now, intf, "wait signal in invalid state") if not wait: d = self.data.popleft() v = 1 yield WaitWriteOnly() self.set_data(d) intf.en.write(v)
[docs] def getDrivers(self): yield from SyncAgentBase.getDrivers(self) yield self.driver_init()
[docs] def getMonitors(self): yield from SyncAgentBase.getMonitors(self) yield self.monitor_init()