"""TX frame drain — sync domain (24 MHz). Drains the tx_bytes AsyncFIFO (written by BBARegisterFile in the exi domain), forwards each byte to W5500SPIMaster with SOF/EOF framing, then pulses tx_irq to notify the GC that the transmit is complete. Flow ---- 1. Wait for tx_len FIFO to have a length word (signals a complete frame queued). 2. Pop the length from tx_len FIFO. 3. Assert tx_sof on first byte, tx_eof on last byte, consuming tx_bytes FIFO. 4. When W5500SPIMaster accepts the final byte: pulse tx_irq. The tx_bytes AsyncFIFO (exi→sync, 8-bit, depth=16) and tx_ctrl FIFO (exi→sync, 16-bit, depth=4) are instantiated in BBARegisterFile and their sync-domain read sides are exposed as ports wired here by BBATop. """ from amaranth import * __all__ = ["TXFrameDrain"] class TXFrameDrain(Elaboratable): """Drains BBA TX FIFOs and forwards frames to W5500SPIMaster. TX FIFO read interfaces (async FIFOs, sync-domain read side) --------------------------------------------------------------- tx_bytes_r_data / tx_bytes_r_en / tx_bytes_r_rdy : byte stream tx_ctrl_r_data / tx_ctrl_r_en / tx_ctrl_r_rdy : 16-bit frame length W5500 streaming output (sync domain, to W5500SPIMaster) ------------------------------------------------------- tx_data / tx_valid / tx_ready / tx_sof / tx_eof CDC output (sync→exi, via PulseSynchronizer in BBATop) ------------------------------------------------------- tx_irq : 1-cycle pulse when frame transmission is handed off to W5500SPIMaster """ def __init__(self): # tx_bytes FIFO read side self.tx_bytes_r_data = Signal(8) self.tx_bytes_r_en = Signal() self.tx_bytes_r_rdy = Signal() # tx_ctrl FIFO read side (frame length) self.tx_ctrl_r_data = Signal(16) self.tx_ctrl_r_en = Signal() self.tx_ctrl_r_rdy = Signal() # W5500 streaming TX interface self.tx_data = Signal(8) self.tx_valid = Signal() self.tx_ready = Signal() self.tx_sof = Signal() self.tx_eof = Signal() # TX done pulse → PulseSynchronizer self.tx_irq = Signal() def elaborate(self, platform): m = Module() frame_len = Signal(16) # bytes still to LOAD from FIFO (incl. held one) is_first = Signal() # next byte loaded is the first (SOF) load_pending = Signal() # 1-bit "more bytes to load" flag (replaces # a 16-bit frame_len!=0 compare in the # combinational FIFO read-enable path) # ── Registered holding stage presented to W5500 ────────────────── # All W5500-facing outputs are driven from these registers. This # breaks the long combinational path that previously ran from the # tx_bytes FIFO read pointer, out through W5500 (tx_ready) and the # is_first/eof logic, and back into the FIFO pointer increment — the # sync-domain critical path. The FIFO read-enable now depends only on # the registered hold_valid and the FIFO's own r_rdy. hold_data = Signal(8) hold_valid = Signal() hold_sof = Signal() hold_eof = Signal() m.d.sync += self.tx_irq.eq(0) # default m.d.comb += [ self.tx_data .eq(hold_data), self.tx_valid.eq(hold_valid), self.tx_sof .eq(hold_sof), self.tx_eof .eq(hold_eof), ] # W5500 took the currently-held byte this cycle hold_consumed = Signal() m.d.comb += hold_consumed.eq(hold_valid & self.tx_ready) # FIFO read-enable defaults (combinational, no W5500 dependency) m.d.comb += self.tx_bytes_r_en.eq(0) m.d.comb += self.tx_ctrl_r_en .eq(0) with m.FSM(domain="sync", name="tx_fsm"): with m.State("IDLE"): # Wait for a complete frame length in tx_ctrl FIFO with m.If(self.tx_ctrl_r_rdy): m.d.comb += self.tx_ctrl_r_en.eq(1) m.d.sync += frame_len.eq(self.tx_ctrl_r_data) m.d.sync += is_first.eq(1) # A frame with length 0 has nothing to load. m.d.sync += load_pending.eq(self.tx_ctrl_r_data != 0) m.next = "DRAIN" with m.State("DRAIN"): # Load the next byte into the holding register only when it is # empty. Costs one idle sync cycle per byte, negligible # against the W5500 SPI rate (~16 sync cycles/byte), and keeps # tx_ready off the FIFO read-enable path entirely. # # The gate uses the registered 1-bit load_pending instead of a # 16-bit (frame_len != 0) reduction, so the combinational path # consume_r_gry → r_rdy → do_load → tx_bytes_r_en stays shallow. do_load = Signal() m.d.comb += do_load.eq( ~hold_valid & self.tx_bytes_r_rdy & load_pending ) m.d.comb += self.tx_bytes_r_en.eq(do_load) with m.If(hold_consumed): m.d.sync += hold_valid.eq(0) with m.If(hold_eof): m.d.sync += self.tx_irq.eq(1) m.next = "IDLE" with m.If(do_load): m.d.sync += hold_data .eq(self.tx_bytes_r_data) m.d.sync += hold_valid.eq(1) m.d.sync += hold_sof .eq(is_first) m.d.sync += hold_eof .eq(frame_len == 1) m.d.sync += is_first .eq(0) m.d.sync += frame_len .eq(frame_len - 1) # Last byte just loaded → stop further loads (registered). with m.If(frame_len == 1): m.d.sync += load_pending.eq(0) return m # ── Testbench ───────────────────────────────────────────────────────────── if __name__ == "__main__": import sys from amaranth.sim import Simulator, Period dut = TXFrameDrain() errors = [] async def _send_frame(ctx, frame): """Drive one frame through the TXFrameDrain DUT. Returns (received_bytes, seen_sof, seen_eof, saw_irq). Key timing: tx_sof/tx_eof are combinatorial outputs that depend on registered signals (is_first, frame_len) BEFORE they update. We read them BEFORE each tick to capture the correct values, then advance the FIFO AFTER the tick. """ ctx.set(dut.tx_ctrl_r_data, len(frame)) ctx.set(dut.tx_ctrl_r_rdy, 1) ctx.set(dut.tx_bytes_r_data, frame[0]) ctx.set(dut.tx_bytes_r_rdy, 1) # Tick 0: IDLE pops ctrl word (comb), FSM→DRAIN, frame_len registered await ctx.tick("sync").repeat(1) # Deassert ctrl FIFO so FSM doesn't re-pop when it returns to IDLE ctx.set(dut.tx_ctrl_r_rdy, 0) received = [] seen_sof = False seen_eof = False saw_irq = False for _ in range(len(frame) + 10): # Read comb signals BEFORE the tick (is_first and frame_len still # reflect pre-tick registered values, so sof/eof are correct) if ctx.get(dut.tx_valid): d = ctx.get(dut.tx_data) sof = ctx.get(dut.tx_sof) eof = ctx.get(dut.tx_eof) received.append(d) seen_sof = seen_sof or sof seen_eof = seen_eof or eof await ctx.tick("sync").repeat(1) if ctx.get(dut.tx_irq): saw_irq = True break # Advance FIFO AFTER the tick: present next byte for next tick if len(received) < len(frame): ctx.set(dut.tx_bytes_r_data, frame[len(received)]) elif len(received) == len(frame): ctx.set(dut.tx_bytes_r_rdy, 0) return received, seen_sof, seen_eof, saw_irq async def testbench(ctx): await ctx.tick("sync").repeat(2) ctx.set(dut.tx_ready, 1) # ── T1: 4-byte frame ───────────────────────────────────────────────── frame = [0xDE, 0xAD, 0xBE, 0xEF] received, seen_sof, seen_eof, saw_irq = await _send_frame(ctx, frame) print(f"T1 received={[hex(b) for b in received]} sof={seen_sof} eof={seen_eof} tx_irq={saw_irq}") if received != frame: errors.append(f"T1 bytes mismatch: got {received}, want {frame}") if not seen_sof: errors.append("T1: SOF never seen") if not seen_eof: errors.append("T1: EOF never seen") if not saw_irq: errors.append("T1: tx_irq never pulsed") await ctx.tick("sync").repeat(4) # ── T2: Single-byte frame — SOF and EOF on same byte ───────────────── frame2 = [0x42] received2, s2_sof, s2_eof, s2_irq = await _send_frame(ctx, frame2) print(f"T2 byte=0x{received2[0] if received2 else 0:02X} sof={s2_sof} eof={s2_eof} tx_irq={s2_irq}") if received2 != frame2: errors.append(f"T2: bytes wrong, got {received2}") if not (s2_sof and s2_eof): errors.append("T2: SOF+EOF both must be set for 1-byte frame") if not s2_irq: errors.append("T2: tx_irq not seen for 1-byte frame") sim = Simulator(dut) sim.add_clock(Period(MHz=24), domain="sync") sim.add_testbench(testbench) with sim.write_vcd("TXFrameDrain.vcd"): sim.run() if errors: print("\nFAILURES:") for e in errors: print(" ", e) sys.exit(1) else: print("\nAll tests passed.")