Source code for hwt.hdl.statements.utils.reduction
from itertools import islice, zip_longest
from typing import Tuple
from hwt.doc_markers import internal
from hwt.hdl.statements.assignmentContainer import HdlAssignmentContainer
from hwt.hdl.statements.utils.listOfHdlStatements import ListOfHdlStatement
from hwt.pyUtils.arrayQuery import groupedby
from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
[docs]@internal
def HdlStatement_merge_statement_lists(stmsA: ListOfHdlStatement, stmsB: ListOfHdlStatement)\
->ListOfHdlStatement:
"""
Merge two lists of statements into one
:return: list of merged statements
"""
if stmsA is None and stmsB is None:
return None
elif stmsA is None:
return stmsB
elif stmsB is None:
return stmsA
tmp = ListOfHdlStatement()
# copy all with already known not to merge
if stmsA.firstStmWithBranchesI is NOT_SPECIFIED:
# we need to check items ourselfs
a_it = iter(stmsA)
elif stmsA.firstStmWithBranchesI is not None:
# we know the first item which requires extra care, we can copy all predecessors
tmp.extend(islice(stmsA, 0, stmsA.firstStmWithBranchesI))
a_it = islice(stmsA, stmsA.firstStmWithBranchesI, None)
else:
# items do no require any care, we can copy them all, but instead we use original list
# to avoid copy
tmp = stmsA
a_it = iter(tuple())
if stmsB.firstStmWithBranchesI is not None:
tmp.extend(islice(stmsB, 0, stmsB.firstStmWithBranchesI))
b_it = islice(stmsB, stmsB.firstStmWithBranchesI, None)
else:
b_it = iter(stmsB)
a = None
b = None
a_empty = False
b_empty = False
while not a_empty and not b_empty:
while not a_empty:
a = next(a_it, None)
if a is None:
a_empty = True
break
elif a.rank == 0:
# simple statement does not require merging
tmp.append(a)
a = None
else:
break
while not b_empty:
b = next(b_it, None)
if b is None:
b_empty = True
break
elif b.rank == 0:
# simple statement does not require merging
tmp.append(b)
b = None
else:
break
if a is not None or b is not None:
if a is None:
a = b
b = None
if a is not None and b is not None:
a._merge_with_other_stm(b)
tmp.append(a)
a = None
b = None
return tmp
[docs]@internal
def HdlStatement_try_reduce_list(statements: ListOfHdlStatement)\
->Tuple[ListOfHdlStatement, int, bool]:
"""
Simplify statements in the list
"""
io_change = False
new_statements = ListOfHdlStatement()
for stm in statements:
reduced, _io_change = stm._try_reduce()
new_statements.extend(reduced)
io_change |= _io_change
new_statements, rank_decrease = HdlStatement_merge_statements(
new_statements)
new_statements, io_change, _rank_decrease = HdlStatement_reduce_overriden_assignments(new_statements)
rank_decrease += _rank_decrease
return new_statements, rank_decrease, io_change
[docs]@internal
def HdlStatement_reduce_overriden_assignments(statements: ListOfHdlStatement)\
->Tuple[ListOfHdlStatement, bool, int]:
io_change = False
new_statements = []
rank_decrease = 0
fully_driven_outputs = set()
for stm in reversed(statements):
if fully_driven_outputs.issuperset(stm._outputs):
rank_decrease += stm.rank
io_change = True
continue
if isinstance(stm, HdlAssignmentContainer):
fully_driven_outputs.update(stm._outputs)
new_statements.append(stm)
return ListOfHdlStatement(reversed(new_statements)), io_change, rank_decrease
[docs]@internal
def HdlStatement_merge_statements(statements: ListOfHdlStatement)\
->Tuple[ListOfHdlStatement, int]:
"""
Merge statements in list to remove duplicated if-then-else trees
:return: tuple (list of merged statements, rank decrease due merging)
:note: rank decrease is sum of ranks of reduced statements
:attention: statement list has to me mergable
"""
order = {}
for i, stm in enumerate(statements):
order[stm] = i
new_statements = ListOfHdlStatement()
rank_decrease = 0
for rank, stms in groupedby(statements, lambda s: s.rank):
if rank == 0:
new_statements.extend(stms)
else:
if len(stms) == 1:
new_statements.extend(stms)
continue
# try to merge statements if they are same condition tree
for iA, stmA in enumerate(stms):
if stmA is None:
continue
for iB, stmB in enumerate(islice(stms, iA + 1, None)):
if stmB is None:
continue
if stmA._is_mergable(stmB):
rank_decrease += stmB.rank
stmA._merge_with_other_stm(stmB)
stms[iA + 1 + iB] = None
new_statements.append(stmA)
new_statements.sort(key=lambda stm: order[stm])
return new_statements, rank_decrease
[docs]@internal
def is_mergable_statement_list(stmsA: ListOfHdlStatement, stmsB: ListOfHdlStatement):
"""
Walk statements and compare if they can be merged into one statement list
"""
if stmsA is None and stmsB is None:
return True
elif stmsA is None or stmsB is None:
return False
# [todo] there is a performance error when the list has no statements with rank != 0
# all items needs to be checked everytime, for "rtl register if (clk)" statements
# this is a problem as this list can grow large (100K+ items) and needs to be compared with every
# not yet merged statement
for (a, b) in zip_longest(stmsA._iter_stms_with_branches(),
stmsB._iter_stms_with_branches()):
if a is None or b is None or not a._is_mergable(b):
return False
# lists are empty
return True