Source code for hwt.interfaces.agents.bramPort
from collections import deque
from hwt.hdl.constants import READ, WRITE, NOP
from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.agents.clk import ClockAgent
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import WaitCombRead, WaitWriteOnly, WaitCombStable, Timer
[docs]class BramPort_withoutClkAgent(SyncAgentBase):
"""
A simulation agent for BramPort_withoutClk interface
In slave mode acts as a memory, in master mode dispathes
requests stored in "requests" dequeu
:ivar ~.requests: list of tuples (request type, address, [write data])
- used for driver
:ivar ~.data: list of data in memory, used for monitor
:ivar ~.mem: if agent is in monitor mode (= is slave) all reads and writes
are performed on mem object
"""
[docs] def __init__(self, sim: HdlSimulator, intf):
super().__init__(sim, intf, allowNoReset=True)
if intf.HAS_BE:
raise NotImplementedError()
self.HAS_WE = hasattr(intf, "we")
self.requests = deque()
self.readPending = False
self.r_data = deque()
self.mem = {}
self.requireInit = True
self.clk_ag = None
[docs] def doReq(self, req):
rw = req[0]
addr = req[1]
intf = self.intf
if rw == READ:
assert intf.HAS_R, intf
rw = 0
wdata = None
self.readPending = True
if self._debugOutput is not None:
self._debugOutput.write("%s, after %r read_req: %d\n" % (
self.intf._getFullName(),
self.sim.now, addr))
elif rw == WRITE:
assert intf.HAS_W, intf
wdata = req[2]
rw = 1
if self._debugOutput is not None:
self._debugOutput.write("%s, after %r write: %d:%d\n" % (
self.intf._getFullName(), self.sim.now,
addr, wdata))
else:
raise NotImplementedError(rw)
intf.addr.write(addr)
if self.HAS_WE:
intf.we.write(rw)
if intf.HAS_W:
intf.din.write(wdata)
[docs] def onReadReq(self, addr):
"""
on readReqRecieved in monitor mode
"""
self.requests.append((READ, addr))
[docs] def onWriteReq(self, addr, data):
"""
on writeReqRecieved in monitor mode
"""
self.requests.append((WRITE, addr, data))
[docs] def monitor(self):
"""
Handle read/write request on this interfaces
This method is executed on clock edge.
This means that the read data should be put on dout after clock edge.
"""
intf = self.intf
yield WaitCombStable()
if self.notReset():
en = intf.en.read()
en = int(en)
if en:
if self.HAS_WE:
we = intf.we.read()
we = int(we)
elif intf.HAS_W:
we = 1
else:
we = 0
addr = intf.addr.read()
if we:
data = intf.din.read()
self.onWriteReq(addr, data)
else:
self.onReadReq(addr)
if self.requests:
req = self.requests.popleft()
t = req[0]
addr = req[1]
if t == READ:
v = self.mem.get(addr.val, None)
yield Timer(1)
yield WaitWriteOnly()
intf.dout.write(v)
else:
assert t == WRITE
# yield WaitWriteOnly()
# intf.dout.write(None)
yield Timer(1)
# after clock edge
yield WaitWriteOnly()
self.mem[addr.val] = req[2]
[docs] def driver(self):
intf = self.intf
if self.requireInit:
yield WaitWriteOnly()
intf.en.write(0)
if self.HAS_WE:
intf.we.write(0)
self.requireInit = False
readPending = self.readPending
yield WaitCombRead()
if self.requests and self.notReset():
yield WaitWriteOnly()
req = self.requests.popleft()
if req is NOP:
intf.en.write(0)
if self.HAS_WE:
intf.we.write(0)
self.readPending = False
else:
self.doReq(req)
intf.en.write(1)
else:
yield WaitWriteOnly()
intf.en.write(0)
if self.HAS_WE:
intf.we.write(0)
self.readPending = False
if readPending:
# in previous clock the read request was dispatched, now we are collecting the data
yield WaitCombStable()
# now we are after clk edge
d = intf.dout.read()
self.r_data.append(d)
if self._debugOutput is not None:
self._debugOutput.write("%s, on %r read_data: %d\n" % (
self.intf._getFullName(),
self.sim.now, d.val))
[docs]class BramPortAgent(BramPort_withoutClkAgent):
[docs] def getDrivers(self):
yield from super(BramPortAgent, self).getDrivers()
self.clk_ag = ClockAgent(self.sim, self.intf.clk)
yield from self.clk_ag.getDrivers()