Source code for hwt.interfaces.agents.handshaked
from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.agents.handshaked import HandshakedAgent as pcHandshakedAgent
from hwtSimApi.hdlSimulator import HdlSimulator
[docs]class HandshakedAgent(SyncAgentBase, pcHandshakedAgent):
"""
Simulation/verification agent for :class:`hwt.interfaces.std.Handshaked`
interface there is onMonitorReady(simulator)
and onDriverWriteAck(simulator) unimplemented method
which can be used for interfaces with bi-directional data streams
:note: 2-phase (xor) handshake
:attention: requires clk and rst/rstn signal (
If you do not have any create simulation wrapper with it.
Without it you can very easily end up with a combinational loop.)
"""
[docs] def __init__(self, sim: HdlSimulator, intf: "Handshaked", allowNoReset=False):
rst = self._discoverReset(intf, allowNoReset)
clk = intf._getAssociatedClk()
pcHandshakedAgent.__init__(self, sim, intf, clk, rst)
self._vld = self.get_valid_signal(intf)
self._rd = self.get_ready_signal(intf)
[docs] @classmethod
def get_ready_signal(cls, intf):
return intf.rd._sigInside
[docs] def get_ready(self):
return self._rd.read()
[docs] def set_ready(self, val):
self._rd.write(val)
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.vld._sigInside
[docs] def get_valid(self):
"""get "valid" signal"""
return self._vld.read()
[docs] def set_valid(self, val):
return self._vld.write(val)
[docs] def get_data(self):
"""extract data from interface"""
return self.intf.data.read()
[docs] def set_data(self, data):
"""write data to interface"""
self.intf.data.write(data)
[docs]class UniversalHandshakedAgent(HandshakedAgent):
"""
Same thing like :class:`hwt.interfaces.agents.handshaked.HandshakedAgent`
just the get_data/set_data method is predefined to use a tuple constructed
from signals available on this interface.
:ivar ~._signals: tuple of data signals of this interface (excluding ready and valid signal)
:ivar ~._sigCnt: len(_signals)
"""
[docs] def __init__(self, sim: HdlSimulator, intf: "Handshaked", allowNoReset=False):
HandshakedAgent.__init__(self, sim, intf, allowNoReset=allowNoReset)
signals = []
rd = self.get_ready_signal(intf)
vld = self.get_valid_signal(intf)
for i in intf._interfaces:
if i._sigInside is not rd and i._sigInside is not vld:
signals.append(i)
self._signals = tuple(signals)
self._sigCnt = len(signals)
[docs] def get_data(self):
if self._sigCnt == 1:
return self._signals[0].read()
else:
return tuple(sig.read() for sig in self._signals)
[docs] def set_data(self, data):
if data is None:
for sig in self._signals:
sig.write(None)
else:
if self._sigCnt == 1:
self._signals[0].write(data)
else:
assert len(data) == self._sigCnt, (
"invalid number of data for an interface",
len(data),
self._signals,
self.intf._getFullName())
for sig, val in zip(self._signals, data):
try:
sig.write(val)
except:
raise ValueError("Error while writing ", val, "to ", sig)
[docs]class HandshakeSyncAgent(HandshakedAgent):
"""
Simulation/verification agent for HandshakedSycn interface
:attention: there is no data channel on this interface
it is synchronization only and it actually does not have
any meaningful data collected data in monitor
mode are just values of simulation time when item was collected
"""
[docs] def set_data(self, data):
pass
[docs] def get_data(self):
return self.sim.now
[docs]class HandshakedReadListener():
[docs] def __init__(self, hsAgent: HandshakedAgent):
self.original_afterRead = hsAgent._afterRead
hsAgent._afterRead = self._afterReadWrap
self.agent = hsAgent
self.callbacks = {}
[docs] def _afterReadWrap(self):
if self.original_afterRead is not None:
self.original_afterRead()
try:
cb = self.callbacks.pop(len(self.agent.data))
except KeyError:
return
cb()
[docs] def register(self, transCnt, callback):
self.callbacks[transCnt] = callback