from copy import deepcopy
from functools import reduce
from itertools import compress
from operator import and_
from typing import List, Tuple, Dict, Optional, Callable, Set, Generator
from hwt.doc_markers import internal
from hwt.hdl.operatorUtils import replace_input_in_expr
from hwt.hdl.sensitivityCtx import SensitivityCtx
from hwt.hdl.statements.statement import HdlStatement, SignalReplaceSpecType
from hwt.hdl.statements.utils.comparison import statementsAreSame, isSameStatementList
from hwt.hdl.statements.utils.ioDiscovery import HdlStatement_discover_enclosure_for_statements
from hwt.hdl.statements.utils.listOfHdlStatements import ListOfHdlStatement
from hwt.hdl.statements.utils.reduction import HdlStatement_merge_statement_lists, \
HdlStatement_try_reduce_list, is_mergable_statement_list
from hwt.hdl.statements.utils.signalCut import HdlStatement_cut_off_drivers_of_list
from hwt.serializer.utils import RtlSignal_sort_key
from hwt.synthesizer.rtlLevel.fill_stm_list_with_enclosure import fill_stm_list_with_enclosure
from hwt.synthesizer.rtlLevel.mainBases import RtlSignalBase
from hwt.synthesizer.rtlLevel.signalUtils.walkers import discover_sensitivity_of_sig
[docs]class IfContainer(HdlStatement):
"""
Structural container of if statement for hdl rendering
:ivar ~._ifTrue_enclosed_for: set of signals for which if ifTrue branch enclosed
(has not branch where signal is not assignment)
:ivar ~._elIfs_enclosed_for: list of sets of enclosed signals for each elif
:ivar ~._ifFalse_enclosed_for: set of enclosed signals for ifFalse branch
"""
_DEEPCOPY_SHALLOW_ONLY = (*HdlStatement._DEEPCOPY_SHALLOW_ONLY, '_ifTrue_enclosed_for', '_elIfs_enclosed_for', '_ifFalse_enclosed_for')
_DEEPCOPY_SKIP = (*HdlStatement._DEEPCOPY_SKIP, 'cond', 'elIfs')
[docs] def __init__(self, cond: RtlSignalBase, ifTrue=None, ifFalse=None, elIfs=None,
parentStm=None, event_dependent_from_branch: Optional[int]=None):
"""
:param cond: RtlSignal as conditions for this if
:param ifTrue: list of statements which should be active if cond.
is met
:param elIfs: list of tuples (list of conditions, list of statements)
:param ifFalse: list of statements which should be active if cond.
and any other cond. in elIfs is met
"""
assert isinstance(cond, RtlSignalBase)
self.cond = cond
super(IfContainer, self).__init__(
parentStm,
event_dependent_from_branch=event_dependent_from_branch)
if ifTrue is None:
ifTrue = ListOfHdlStatement()
self.ifTrue: ListOfHdlStatement = ifTrue
if elIfs is None:
elIfs = []
self.elIfs: List[Tuple[RtlSignalBase, ListOfHdlStatement]] = elIfs
self.ifFalse: Optional[ListOfHdlStatement] = ifFalse
self._ifTrue_enclosed_for: Optional[Set[RtlSignalBase]] = None
self._elIfs_enclosed_for: Optional[Set[RtlSignalBase]] = None
self._ifFalse_enclosed_for: Optional[Set[RtlSignalBase]] = None
def __deepcopy__(self, memo: dict):
result = super(IfContainer, self).__deepcopy__(memo)
result.cond = self.cond
result.elIfs = [(c, deepcopy(stms, memo)) for c, stms in self.elIfs]
return result
[docs] @internal
def _collect_io(self):
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._collect_io`
"""
if isinstance(self.cond, RtlSignalBase):
self._inputs.append(self.cond)
for c, _ in self.elIfs:
if isinstance(c, RtlSignalBase):
self._inputs.append(c)
super(IfContainer, self)._collect_io()
[docs] @internal
def _cut_off_drivers_of(self, sig: RtlSignalBase):
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._cut_off_drivers_of`
"""
if self._try_cut_off_whole_stm(sig):
return self
# try to cut off all statements which are drivers of specified signal
# in all branches
child_keep_mask = []
newIfTrue = []
all_cut_off = True
all_cut_off &= HdlStatement_cut_off_drivers_of_list(
sig, self.ifTrue, child_keep_mask, newIfTrue)
self.ifTrue = ListOfHdlStatement(compress(self.ifTrue, child_keep_mask))
newElifs = []
anyElifHit = False
for cond, stms in self.elIfs:
newCase = []
child_keep_mask.clear()
all_cut_off &= HdlStatement_cut_off_drivers_of_list(
sig, stms, child_keep_mask, newCase)
_stms = list(compress(stms, child_keep_mask))
stms.clear()
stms.extend(_stms)
if newCase:
anyElifHit = True
newElifs.append((cond, newCase))
newIfFalse = None
if self.ifFalse:
newIfFalse = []
child_keep_mask.clear()
all_cut_off &= HdlStatement_cut_off_drivers_of_list(
sig, self.ifFalse, child_keep_mask, newIfFalse)
self.ifFalse = ListOfHdlStatement(compress(self.ifFalse, child_keep_mask))
assert not all_cut_off, "everything was cut of but this should be already known at the start"
if newIfTrue or newIfFalse or anyElifHit or newIfFalse:
# parts were cut off
# generate new statement for them
cond_sig = self.cond
n = self.__class__(cond_sig, newIfTrue)
for c_sig, stms in newElifs:
n.Elif(c_sig, stms)
if newIfFalse is not None:
n.Else(newIfFalse)
if self.parentStm is None:
ctx = n._get_rtl_context()
ctx.statements.add(n)
self._cut_off_drivers_of_regenerate_io(sig, n)
return n
[docs] @internal
def _discover_enclosure(self):
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._discover_enclosure`
"""
outputs = self._outputs
self._ifTrue_enclosed_for = HdlStatement_discover_enclosure_for_statements(
self.ifTrue, outputs)
elif_encls = self._elIfs_enclosed_for = []
for _, stms in self.elIfs:
e = HdlStatement_discover_enclosure_for_statements(
stms, outputs)
elif_encls.append(e)
self._ifFalse_enclosed_for = HdlStatement_discover_enclosure_for_statements(
self.ifFalse, outputs)
assert self._enclosed_for is None
encl = self._enclosed_for = set()
for s in self._ifTrue_enclosed_for:
enclosed = True
for elif_e in elif_encls:
if s not in elif_e:
enclosed = False
break
if enclosed and s in self._ifFalse_enclosed_for:
encl.add(s)
[docs] @internal
def _discover_sensitivity(self, seen: set) -> None:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._discover_sensitivity`
"""
assert self._sensitivity is None, self
ctx = self._sensitivity = SensitivityCtx()
discover_sensitivity_of_sig(self.cond, seen, ctx)
if ctx.contains_ev_dependency:
return
for stm in self.ifTrue:
stm._discover_sensitivity(seen)
ctx.extend(stm._sensitivity)
# elifs
for cond, stms in self.elIfs:
if ctx.contains_ev_dependency:
break
discover_sensitivity_of_sig(cond, seen, ctx)
if ctx.contains_ev_dependency:
break
for stm in stms:
if ctx.contains_ev_dependency:
break
stm._discover_sensitivity(seen)
ctx.extend(stm._sensitivity)
if self.ifFalse:
assert not ctx.contains_ev_dependency, "can not negate event"
# else
for stm in self.ifFalse:
stm._discover_sensitivity(seen)
ctx.extend(stm._sensitivity)
[docs] @internal
def _fill_enclosure(self, enclosure: Dict[RtlSignalBase, Callable[[], HdlStatement]]) -> None:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._fill_enclosure`
"""
enc = []
outputs = self._outputs
for e in sorted(enclosure.keys(), key=RtlSignal_sort_key):
if e in outputs and e not in self._enclosed_for:
enc.append(e)
if not enc:
return
fill_stm_list_with_enclosure(self, self._ifTrue_enclosed_for,
self.ifTrue, enc, enclosure)
for (_, stms), e in zip(self.elIfs, self._elIfs_enclosed_for):
fill_stm_list_with_enclosure(self, e, stms, enc, enclosure)
self.ifFalse = fill_stm_list_with_enclosure(self, self._ifFalse_enclosed_for,
self.ifFalse, enc, enclosure)
self._enclosed_for.update(enc)
[docs] @internal
def _iter_stms(self) -> Generator[HdlStatement, None, None]:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._iter_stms`
"""
yield from self.ifTrue
for _, stms in self.elIfs:
yield from stms
if self.ifFalse is not None:
yield from self.ifFalse
[docs] @internal
def _iter_stms_for_output(self, output: RtlSignalBase) -> Generator[HdlStatement, None, None]:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._iter_stms_for_output`
"""
yield from self.ifTrue.iterStatementsWithOutput(output)
for _, stms in self.elIfs:
yield from stms.iterStatementsWithOutput(output)
if self.ifFalse is not None:
yield from self.ifFalse.iterStatementsWithOutput(output)
[docs] @internal
def _iter_all_elifs(self) -> Generator[Tuple[RtlSignalBase, ListOfHdlStatement], None, None]:
yield (self.cond, self.ifTrue)
yield from self.elIfs
[docs] @internal
def _try_reduce(self) -> Tuple[bool, ListOfHdlStatement]:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._try_reduce`
"""
# flag if IO of statement has changed
io_change = False
self.ifTrue, rank_decrease, _io_change = HdlStatement_try_reduce_list(
self.ifTrue)
self.rank -= rank_decrease
io_change |= _io_change
new_elifs = []
for cond, statements in self.elIfs:
_statements, rank_decrease, _io_change = HdlStatement_try_reduce_list(
statements)
self.rank -= rank_decrease
io_change |= _io_change
new_elifs.append((cond, _statements))
self.elIfs = new_elifs
if self.ifFalse is not None:
self.ifFalse, rank_decrease, _io_update_required = HdlStatement_try_reduce_list(
self.ifFalse)
self.rank -= rank_decrease
io_change |= _io_change
reduce_self = not self._condHasEffect(
self.ifTrue, self.ifFalse, self.elIfs)
if reduce_self:
res = self.ifTrue
else:
res = ListOfHdlStatement((self,))
self._on_reduce(reduce_self, io_change, res)
# try merge nested ifs as elifs
if self.ifFalse is not None and len(self.ifFalse) == 1:
child = self.ifFalse[0]
if isinstance(child, IfContainer):
self._merge_nested_if_from_else(child)
return res, io_change
[docs] @internal
def _merge_nested_if_from_else(self, ifStm: "IfContainer"):
"""
Merge nested IfContarner form else branch to this IfContainer
as elif and else branches
"""
self.elIfs.append((ifStm.cond, ifStm.ifTrue))
self.elIfs.extend(ifStm.elIfs)
self.ifFalse = ifStm.ifFalse
[docs] @internal
def _is_mergable(self, other: HdlStatement) -> bool:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._is_mergable`
"""
if not isinstance(other, IfContainer):
return False
if (self.cond is not other.cond
or not is_mergable_statement_list(self.ifTrue, other.ifTrue)):
return False
if len(self.elIfs) != len(other.elIfs):
return False
for (a_c, a_stm), (b_c, b_stm) in zip(self.elIfs, other.elIfs):
if a_c is not b_c or not is_mergable_statement_list(a_stm, b_stm):
return False
return is_mergable_statement_list(self.ifFalse, other.ifFalse)
[docs] @internal
def _merge_with_other_stm(self, other: "IfContainer") -> None:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._merge_with_other_stm`
"""
merge = HdlStatement_merge_statement_lists
self.ifTrue = merge(self.ifTrue, other.ifTrue)
new_elifs = []
for ((c, elifA), (_, elifB)) in zip(self.elIfs, other.elIfs):
new_elifs.append((c, merge(elifA, elifB)))
self.elIfs = new_elifs
self.ifFalse = merge(self.ifFalse, other.ifFalse)
other.ifTrue = None
other.elIfs = None
other.ifFalse = None
self._on_merge(other)
[docs] @internal
@staticmethod
def _condHasEffect(ifTrue, ifFalse, elIfs):
stmCnt = len(ifTrue)
# [TODO] condition in empty if stm
if ifFalse is not None \
and stmCnt == len(ifFalse) \
and reduce(and_,
[len(stm) == stmCnt
for _, stm in elIfs],
True):
for stms in zip(ifTrue, ifFalse, *map(lambda x: x[1], elIfs)):
if not statementsAreSame(stms):
return True
return False
return True
[docs] def isSame(self, other: HdlStatement) -> bool:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement.isSame`
"""
if self is other:
return True
if self.rank != other.rank:
return False
if isinstance(other, IfContainer):
if self.cond is other.cond:
if len(self.ifTrue) == len(other.ifTrue) \
and ((self.ifFalse is None and other.ifFalse is None) or
len(self.ifFalse) == len(other.ifFalse)) \
and len(self.elIfs) == len(other.elIfs):
if not isSameStatementList(self.ifTrue,
other.ifTrue) \
or not isSameStatementList(self.ifFalse,
other.ifFalse):
return False
for (ac, astms), (bc, bstms) in zip(self.elIfs,
other.elIfs):
if not (ac == bc) or\
not isSameStatementList(astms, bstms):
return False
return True
return False
[docs] @internal
def _replace_child_statement(self, stm:HdlStatement,
replacement:ListOfHdlStatement,
update_io:bool) -> None:
"""
:see: :meth:`hwt.hdl.statements.statement.HdlStatement._replace_child_statement`
"""
if update_io:
raise NotImplementedError()
for branch_list in (self.ifTrue, *(elif_stms for _, elif_stms in self.elIfs), self.ifFalse):
if branch_list is None:
continue
try:
i = branch_list.index(stm)
except ValueError:
# not in list
continue
branch_list.replace(self, stm, i, replacement)
# reset IO because it was shared with this statement
stm._destroy()
return
raise ValueError("Statement", stm, "not found in ", self)