254 lines
9.9 KiB
Python
254 lines
9.9 KiB
Python
"""TX frame drain — sync domain (24 MHz).
|
|
|
|
Drains the tx_bytes AsyncFIFO (written by BBARegisterFile in the exi domain),
|
|
forwards each byte to W5500SPIMaster with SOF/EOF framing, then pulses tx_irq
|
|
to notify the GC that the transmit is complete.
|
|
|
|
Flow
|
|
----
|
|
1. Wait for tx_len FIFO to have a length word (signals a complete frame queued).
|
|
2. Pop the length from tx_len FIFO.
|
|
3. Assert tx_sof on first byte, tx_eof on last byte, consuming tx_bytes FIFO.
|
|
4. When W5500SPIMaster accepts the final byte: pulse tx_irq.
|
|
|
|
The tx_bytes AsyncFIFO (exi→sync, 8-bit, depth=16) and tx_ctrl FIFO (exi→sync,
|
|
16-bit, depth=4) are instantiated in BBARegisterFile and their sync-domain read
|
|
sides are exposed as ports wired here by BBATop.
|
|
"""
|
|
|
|
from amaranth import *
|
|
|
|
__all__ = ["TXFrameDrain"]
|
|
|
|
|
|
class TXFrameDrain(Elaboratable):
|
|
"""Drains BBA TX FIFOs and forwards frames to W5500SPIMaster.
|
|
|
|
TX FIFO read interfaces (async FIFOs, sync-domain read side)
|
|
---------------------------------------------------------------
|
|
tx_bytes_r_data / tx_bytes_r_en / tx_bytes_r_rdy : byte stream
|
|
tx_ctrl_r_data / tx_ctrl_r_en / tx_ctrl_r_rdy : 16-bit frame length
|
|
|
|
W5500 streaming output (sync domain, to W5500SPIMaster)
|
|
-------------------------------------------------------
|
|
tx_data / tx_valid / tx_ready / tx_sof / tx_eof
|
|
|
|
CDC output (sync→exi, via PulseSynchronizer in BBATop)
|
|
-------------------------------------------------------
|
|
tx_irq : 1-cycle pulse when frame transmission is handed off to W5500SPIMaster
|
|
"""
|
|
|
|
def __init__(self):
|
|
# tx_bytes FIFO read side
|
|
self.tx_bytes_r_data = Signal(8)
|
|
self.tx_bytes_r_en = Signal()
|
|
self.tx_bytes_r_rdy = Signal()
|
|
|
|
# tx_ctrl FIFO read side (frame length)
|
|
self.tx_ctrl_r_data = Signal(16)
|
|
self.tx_ctrl_r_en = Signal()
|
|
self.tx_ctrl_r_rdy = Signal()
|
|
|
|
# W5500 streaming TX interface
|
|
self.tx_data = Signal(8)
|
|
self.tx_valid = Signal()
|
|
self.tx_ready = Signal()
|
|
self.tx_sof = Signal()
|
|
self.tx_eof = Signal()
|
|
|
|
# TX done pulse → PulseSynchronizer
|
|
self.tx_irq = Signal()
|
|
|
|
def elaborate(self, platform):
|
|
m = Module()
|
|
|
|
frame_len = Signal(16) # bytes still to LOAD from FIFO (incl. held one)
|
|
is_first = Signal() # next byte loaded is the first (SOF)
|
|
load_pending = Signal() # 1-bit "more bytes to load" flag (replaces
|
|
# a 16-bit frame_len!=0 compare in the
|
|
# combinational FIFO read-enable path)
|
|
|
|
# ── Registered holding stage presented to W5500 ──────────────────
|
|
# All W5500-facing outputs are driven from these registers. This
|
|
# breaks the long combinational path that previously ran from the
|
|
# tx_bytes FIFO read pointer, out through W5500 (tx_ready) and the
|
|
# is_first/eof logic, and back into the FIFO pointer increment — the
|
|
# sync-domain critical path. The FIFO read-enable now depends only on
|
|
# the registered hold_valid and the FIFO's own r_rdy.
|
|
hold_data = Signal(8)
|
|
hold_valid = Signal()
|
|
hold_sof = Signal()
|
|
hold_eof = Signal()
|
|
|
|
m.d.sync += self.tx_irq.eq(0) # default
|
|
|
|
m.d.comb += [
|
|
self.tx_data .eq(hold_data),
|
|
self.tx_valid.eq(hold_valid),
|
|
self.tx_sof .eq(hold_sof),
|
|
self.tx_eof .eq(hold_eof),
|
|
]
|
|
|
|
# W5500 took the currently-held byte this cycle
|
|
hold_consumed = Signal()
|
|
m.d.comb += hold_consumed.eq(hold_valid & self.tx_ready)
|
|
|
|
# FIFO read-enable defaults (combinational, no W5500 dependency)
|
|
m.d.comb += self.tx_bytes_r_en.eq(0)
|
|
m.d.comb += self.tx_ctrl_r_en .eq(0)
|
|
|
|
with m.FSM(domain="sync", name="tx_fsm"):
|
|
|
|
with m.State("IDLE"):
|
|
# Wait for a complete frame length in tx_ctrl FIFO
|
|
with m.If(self.tx_ctrl_r_rdy):
|
|
m.d.comb += self.tx_ctrl_r_en.eq(1)
|
|
m.d.sync += frame_len.eq(self.tx_ctrl_r_data)
|
|
m.d.sync += is_first.eq(1)
|
|
# A frame with length 0 has nothing to load.
|
|
m.d.sync += load_pending.eq(self.tx_ctrl_r_data != 0)
|
|
m.next = "DRAIN"
|
|
|
|
with m.State("DRAIN"):
|
|
# Load the next byte into the holding register only when it is
|
|
# empty. Costs one idle sync cycle per byte, negligible
|
|
# against the W5500 SPI rate (~16 sync cycles/byte), and keeps
|
|
# tx_ready off the FIFO read-enable path entirely.
|
|
#
|
|
# The gate uses the registered 1-bit load_pending instead of a
|
|
# 16-bit (frame_len != 0) reduction, so the combinational path
|
|
# consume_r_gry → r_rdy → do_load → tx_bytes_r_en stays shallow.
|
|
do_load = Signal()
|
|
m.d.comb += do_load.eq(
|
|
~hold_valid & self.tx_bytes_r_rdy & load_pending
|
|
)
|
|
m.d.comb += self.tx_bytes_r_en.eq(do_load)
|
|
|
|
with m.If(hold_consumed):
|
|
m.d.sync += hold_valid.eq(0)
|
|
with m.If(hold_eof):
|
|
m.d.sync += self.tx_irq.eq(1)
|
|
m.next = "IDLE"
|
|
|
|
with m.If(do_load):
|
|
m.d.sync += hold_data .eq(self.tx_bytes_r_data)
|
|
m.d.sync += hold_valid.eq(1)
|
|
m.d.sync += hold_sof .eq(is_first)
|
|
m.d.sync += hold_eof .eq(frame_len == 1)
|
|
m.d.sync += is_first .eq(0)
|
|
m.d.sync += frame_len .eq(frame_len - 1)
|
|
# Last byte just loaded → stop further loads (registered).
|
|
with m.If(frame_len == 1):
|
|
m.d.sync += load_pending.eq(0)
|
|
|
|
return m
|
|
|
|
|
|
# ── Testbench ─────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
from amaranth.sim import Simulator, Period
|
|
|
|
dut = TXFrameDrain()
|
|
errors = []
|
|
|
|
async def _send_frame(ctx, frame):
|
|
"""Drive one frame through the TXFrameDrain DUT.
|
|
|
|
Returns (received_bytes, seen_sof, seen_eof, saw_irq).
|
|
|
|
Key timing: tx_sof/tx_eof are combinatorial outputs that depend on
|
|
registered signals (is_first, frame_len) BEFORE they update. We read
|
|
them BEFORE each tick to capture the correct values, then advance the
|
|
FIFO AFTER the tick.
|
|
"""
|
|
ctx.set(dut.tx_ctrl_r_data, len(frame))
|
|
ctx.set(dut.tx_ctrl_r_rdy, 1)
|
|
ctx.set(dut.tx_bytes_r_data, frame[0])
|
|
ctx.set(dut.tx_bytes_r_rdy, 1)
|
|
|
|
# Tick 0: IDLE pops ctrl word (comb), FSM→DRAIN, frame_len registered
|
|
await ctx.tick("sync").repeat(1)
|
|
# Deassert ctrl FIFO so FSM doesn't re-pop when it returns to IDLE
|
|
ctx.set(dut.tx_ctrl_r_rdy, 0)
|
|
|
|
received = []
|
|
seen_sof = False
|
|
seen_eof = False
|
|
saw_irq = False
|
|
|
|
for _ in range(len(frame) + 10):
|
|
# Read comb signals BEFORE the tick (is_first and frame_len still
|
|
# reflect pre-tick registered values, so sof/eof are correct)
|
|
if ctx.get(dut.tx_valid):
|
|
d = ctx.get(dut.tx_data)
|
|
sof = ctx.get(dut.tx_sof)
|
|
eof = ctx.get(dut.tx_eof)
|
|
received.append(d)
|
|
seen_sof = seen_sof or sof
|
|
seen_eof = seen_eof or eof
|
|
|
|
await ctx.tick("sync").repeat(1)
|
|
|
|
if ctx.get(dut.tx_irq):
|
|
saw_irq = True
|
|
break
|
|
|
|
# Advance FIFO AFTER the tick: present next byte for next tick
|
|
if len(received) < len(frame):
|
|
ctx.set(dut.tx_bytes_r_data, frame[len(received)])
|
|
elif len(received) == len(frame):
|
|
ctx.set(dut.tx_bytes_r_rdy, 0)
|
|
|
|
return received, seen_sof, seen_eof, saw_irq
|
|
|
|
async def testbench(ctx):
|
|
await ctx.tick("sync").repeat(2)
|
|
ctx.set(dut.tx_ready, 1)
|
|
|
|
# ── T1: 4-byte frame ─────────────────────────────────────────────────
|
|
frame = [0xDE, 0xAD, 0xBE, 0xEF]
|
|
received, seen_sof, seen_eof, saw_irq = await _send_frame(ctx, frame)
|
|
|
|
print(f"T1 received={[hex(b) for b in received]} sof={seen_sof} eof={seen_eof} tx_irq={saw_irq}")
|
|
|
|
if received != frame:
|
|
errors.append(f"T1 bytes mismatch: got {received}, want {frame}")
|
|
if not seen_sof:
|
|
errors.append("T1: SOF never seen")
|
|
if not seen_eof:
|
|
errors.append("T1: EOF never seen")
|
|
if not saw_irq:
|
|
errors.append("T1: tx_irq never pulsed")
|
|
|
|
await ctx.tick("sync").repeat(4)
|
|
|
|
# ── T2: Single-byte frame — SOF and EOF on same byte ─────────────────
|
|
frame2 = [0x42]
|
|
received2, s2_sof, s2_eof, s2_irq = await _send_frame(ctx, frame2)
|
|
|
|
print(f"T2 byte=0x{received2[0] if received2 else 0:02X} sof={s2_sof} eof={s2_eof} tx_irq={s2_irq}")
|
|
|
|
if received2 != frame2:
|
|
errors.append(f"T2: bytes wrong, got {received2}")
|
|
if not (s2_sof and s2_eof):
|
|
errors.append("T2: SOF+EOF both must be set for 1-byte frame")
|
|
if not s2_irq:
|
|
errors.append("T2: tx_irq not seen for 1-byte frame")
|
|
|
|
sim = Simulator(dut)
|
|
sim.add_clock(Period(MHz=24), domain="sync")
|
|
sim.add_testbench(testbench)
|
|
|
|
with sim.write_vcd("TXFrameDrain.vcd"):
|
|
sim.run()
|
|
|
|
if errors:
|
|
print("\nFAILURES:")
|
|
for e in errors:
|
|
print(" ", e)
|
|
sys.exit(1)
|
|
else:
|
|
print("\nAll tests passed.")
|