Source code for hwt.interfaces.agents.vldSynced
from collections import deque
from hwt.hdl.constants import NOP
from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import WaitCombRead, WaitWriteOnly, WaitCombStable
from pyMathBitPrecise.bit_utils import ValidityError
[docs]class VldSyncedAgent(SyncAgentBase):
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=False):
super(VldSyncedAgent, self).__init__(
sim,
intf,
allowNoReset=allowNoReset)
self.data = deque()
self._vld = self.get_valid_signal(intf)
[docs] def get_data(self):
return self.intf.data.read()
[docs] def set_data(self, data):
self.intf.data.write(data)
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.vld
[docs] def get_valid(self):
return self._vld.read()
[docs] def set_valid(self, val):
self._lastVld = val
return self._vld.write(val)
[docs] def setEnable_asDriver(self, en):
super(VldSyncedAgent, self).setEnable_asDriver(en)
if not en:
self.set_valid(0)
[docs] def monitor(self):
yield WaitCombStable()
if self.notReset():
intf = self.intf
vld = self.get_valid()
try:
vld = int(vld)
except ValidityError:
raise ValidityError(self.sim.now, self.intf._getFullName(),
"vld signal in invalid state (would cause desynchronisation)")
if vld:
d = self.get_data()
if self._debugOutput is not None:
self._debugOutput.write("%s, read, %d: %r\n" % (
intf._getFullName(),
self.sim.now, d))
self.data.append(d)
[docs] def driver(self):
yield WaitCombRead()
if self.data and self.notReset():
d = self.data.popleft()
else:
d = NOP
yield WaitWriteOnly()
if d is NOP:
self.set_data(None)
self.set_valid(0)
else:
self.set_data(d)
self.set_valid(1)
if self._debugOutput is not None:
self._debugOutput.write("%s, wrote, %d: %r\n" % (
self.intf._getFullName(),
self.sim.now, self.actualData))