Source code for hwt.interfaces.agents.signal

from collections import deque

from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.exceptions import IntfLvlConfErr
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.constants import CLK_PERIOD
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import Timer, WaitWriteOnly, WaitCombRead, WaitCombStable


[docs]class SignalAgent(SyncAgentBase): """ Agent for signal interface, it can use clock and reset interface for synchronization or can be synchronized by delay :attention: clock synchronization has higher priority """
[docs] def __init__(self, sim: HdlSimulator, intf: "Signal", delay=None): AgentBase.__init__(self, sim, intf) self.delay = delay self.initDelay = 0 # resolve clk and rstn try: self.clk = self.intf._getAssociatedClk() except IntfLvlConfErr: self.clk = None self.rst, self.rstOffIn = self._discoverReset(intf, True) self.data = deque() self.initPending = True if self.clk is None: if self.delay is None: self.delay = CLK_PERIOD self.monitor = self.monitorWithTimer self.driver = self.driverWithTimer else: if self.initDelay: raise NotImplementedError("initDelay only without clock") if self.delay: raise ValueError("clock and delay synchronization at once") c = self.SELECTED_EDGE_CALLBACK self.monitor = c(self.sim, self.clk, self.monitorWithClk, self.getEnable) self.driver = c(self.sim, self.clk, self.driverWithClk, self.getEnable)
[docs] def getDrivers(self): yield self.driverInit() if self.clk is None: if self.delay is None: self.delay = CLK_PERIOD yield self.driverWithTimer() else: if self.initDelay: raise NotImplementedError("initDelay only without clock") if self.delay: raise ValueError("clock and delay synchronization at once") c = self.SELECTED_EDGE_CALLBACK if not isinstance(self.driver, c): self.driver = c(self.sim, self.clk, self.driverWithClk, self.getEnable) yield self.driver()
[docs] def getMonitors(self): if self.clk is None: if self.delay is None: self.delay = CLK_PERIOD yield self.monitorWithTimer() else: if self.initDelay: raise NotImplementedError("initDelay only without clock") if self.delay: raise ValueError("clock and delay synchronization at once") c = self.SELECTED_EDGE_CALLBACK if not isinstance(self.monitor, c): self.monitor = c(self.sim, self.clk, self.monitorWithClk, self.getEnable) yield self.monitor()
[docs] def driverInit(self): yield WaitWriteOnly() if not self._enabled: return try: d = self.data[0] except IndexError: d = None self.set_data(d)
[docs] def get_data(self): return self.intf.read()
[docs] def set_data(self, data): self.intf.write(data)
[docs] def driverWithClk(self): # if clock is specified this function is periodically called every # clk tick, if agent is enabled yield WaitCombRead() if not self._enabled: return if self.data and self.notReset(): yield WaitWriteOnly() if not self._enabled: return d = self.data.popleft() self.set_data(d)
[docs] def driverWithTimer(self): if self.initPending: if self.initDelay: yield Timer(self.initDelay) self.initPending = False # if clock is specified this function is periodically called every # clk tick while True: yield WaitWriteOnly() if self._enabled and self.data and self.notReset(): yield WaitWriteOnly() if self._enabled: d = self.data.popleft() self.set_data(d) yield Timer(self.delay)
[docs] def monitorWithTimer(self): if self.initPending and self.initDelay: yield Timer(self.initDelay) self.initPending = False # if there is no clk, we have to manage periodic call by our self while True: yield WaitCombRead() if self._enabled and self.notReset(): d = self.get_data() self.data.append(d) yield Timer(self.delay)
[docs] def monitorWithClk(self): # if clock is specified this function is periodically called every # clk tick, when agent is enabled yield WaitCombStable() if self._enabled and self.notReset(): d = self.get_data() self.data.append(d)