Files
rebbarb/exi_bba/w5500_spi_master.py
2026-06-13 18:35:38 +02:00

761 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""W5500 SPI master — sync domain (24 MHz).
SPI Mode 0 (CPOL=0, CPHA=0): CLK idles LOW, data captured on rising edge.
SCK = 12 MHz: the sync domain is 24 MHz and the bit engine toggles SCK via a
clock-enable (sync ÷ 2).
W5500 frame format
------------------
Byte 01 Address (16-bit big-endian)
Byte 2 Control: [7:3]=BSB [2]=R/W [1:0]=OM
Byte 3+ Data
BSB values used here:
0b00000 Common registers
0b00001 Socket 0 registers
0b00010 Socket 0 TX buffer
0b00011 Socket 0 RX buffer
After NCRA reset the driver issues the W5500 init sequence (MR reset, SHAR,
S0_MR MACRAW, S0_CR OPEN, S0_IMR).
The module provides:
- A streaming TX interface (tx_data/tx_valid/tx_ready + sof/eof framing)
- A streaming RX interface (rx_data/rx_valid/rx_ready + sof/eof)
- init_req / init_done for the NCRA-triggered init sequence
- MAC source address shadow input (par[0..5]) for SHAR programming
"""
from amaranth import *
__all__ = ["W5500SPIMaster"]
# W5500 register addresses. The 16-bit address is the OFFSET WITHIN A BLOCK;
# the block is selected by the BSB field of the control byte (see _CTRL_*),
# NOT by the address. So socket-0 registers use small offsets with BSB=1.
_W5500_MR = 0x0000 # Mode register (common block)
_W5500_SHAR = 0x0009 # Source MAC, 6 bytes (common block)
_W5500_S0_MR = 0x0000 # Socket 0 Mode (socket-0 block)
_W5500_S0_CR = 0x0001 # Socket 0 Command
_W5500_S0_IR = 0x0002 # Socket 0 Interrupt
_W5500_S0_RXBUF_SIZE = 0x001E # Socket 0 RX buffer size
_W5500_S0_TXBUF_SIZE = 0x001F # Socket 0 TX buffer size
_W5500_S0_TX_FSR = 0x0020 # Socket 0 TX Free Size (2 bytes)
_W5500_S0_TX_WR = 0x0024 # Socket 0 TX Write Pointer
_W5500_S0_RX_RSR = 0x0026 # Socket 0 RX Received Size (2 bytes)
_W5500_S0_RX_RD = 0x0028 # Socket 0 RX Read Pointer
_W5500_S0_IMR = 0x002C # Socket 0 Interrupt Mask
# Control byte = (BSB << 3) | (RWB << 2) | OM.
# RWB: 1=write 0=read. OM=00 → Variable Data Mode (CS frames the length).
# BSB: 0=common, 1=socket0 reg, 2=socket0 TX buffer, 3=socket0 RX buffer.
_CTRL_WR_COMMON = (0 << 3) | (1 << 2) # 0x04
_CTRL_WR_S0REG = (1 << 3) | (1 << 2) # 0x0C
_CTRL_RD_S0REG = (1 << 3) | (0 << 2) # 0x08
_CTRL_WR_S0TX = (2 << 3) | (1 << 2) # 0x14
_CTRL_RD_S0RX = (3 << 3) | (0 << 2) # 0x18
class W5500SPIMaster(Elaboratable):
"""W5500 SPI master in the sync clock domain.
Physical SPI pins
-----------------
spi_clk / spi_mosi / spi_miso / spi_cs_n : to W5500
w5500_int_n : W5500 INT_N input (active low)
w5500_rst_n : W5500 hardware reset (active low)
Init interface (from BBARegisterFile / BBATop)
----------------------------------------------
init_req : pulse to trigger the W5500 init sequence
init_done : pulse when init sequence completes
par : 6-byte MAC address (sampled at init_req)
TX streaming interface (from TXFrameDrain, sync domain)
-------------------------------------------------------
tx_data / tx_valid / tx_ready : byte stream
tx_sof / tx_eof : frame delimiters on the same cycle as tx_valid
RX streaming interface (to RXFrameAssembler, sync domain)
----------------------------------------------------------
rx_data / rx_valid / rx_ready : byte stream
rx_sof / rx_eof : frame delimiters
"""
def __init__(self, clk_div=1, reset_cycles=24000):
# MR-reset settle wait (in sync cycles). ~1 ms; the testbench
# overrides with a small value for fast simulation.
self._reset_cycles = reset_cycles
# SPI SCK = sync_clock / (2 * clk_div). clk_div=1 → full rate (SCK =
# sync/2): at the 24 MHz slow domain that is 12 MHz SCK (~12 Mbit/s),
# which comfortably exceeds real-world GC BBA TCP throughput. The W5500
# tolerates up to 80 MHz SCK, so the divider exists only as a safety
# knob for board-level signal-integrity issues, not a functional need.
self._clk_div = clk_div
# Physical SPI
self.spi_clk = Signal()
self.spi_mosi = Signal()
self.spi_miso = Signal()
self.spi_cs_n = Signal(init=1)
self.w5500_int_n = Signal(init=1)
self.w5500_rst_n = Signal(init=1)
# Init control
self.init_req = Signal()
self.init_done = Signal()
self.par = Signal(48) # MAC address (PAR0..5 packed)
# TX stream
self.tx_data = Signal(8)
self.tx_valid = Signal()
self.tx_ready = Signal()
self.tx_sof = Signal()
self.tx_eof = Signal()
# RX stream
self.rx_data = Signal(8)
self.rx_valid = Signal()
self.rx_ready = Signal()
self.rx_sof = Signal()
self.rx_eof = Signal()
def elaborate(self, platform):
m = Module()
# ── SPI clock enable ─────────────────────────────────────────────
# clk_en high every `clk_div` sync cycles. The bit engine toggles SCK
# on each enabled cycle, so SCK = sync / (2 * clk_div).
clk_en = Signal()
if self._clk_div <= 1:
m.d.comb += clk_en.eq(1) # full rate: SCK = sync/2
else:
div_ctr = Signal(range(self._clk_div))
with m.If(div_ctr == self._clk_div - 1):
m.d.sync += div_ctr.eq(0)
with m.Else():
m.d.sync += div_ctr.eq(div_ctr + 1)
m.d.comb += clk_en.eq(div_ctr == self._clk_div - 1)
# ── SPI pin registers (Mode 0: SCK idles LOW) ────────────────────
sck_r = Signal()
cs_r = Signal(init=1)
shift_out = Signal(8)
shift_in = Signal(8)
m.d.comb += self.spi_clk .eq(sck_r)
m.d.comb += self.spi_cs_n.eq(cs_r)
m.d.comb += self.spi_mosi.eq(shift_out[7]) # MSB first; valid pre-rising
# ── Byte-transfer engine (Mode 0) ────────────────────────────────
# On byte_start, shift out byte_tx MSB-first (8 SCK cycles) and capture
# MISO into byte_rx; pulse byte_done. CS is owned by the xfer engine.
byte_start = Signal()
byte_tx = Signal(8)
byte_rx = Signal(8)
byte_done = Signal()
bit_ctr = Signal(4)
m.d.sync += byte_done.eq(0)
with m.FSM(domain="sync", name="byte_fsm"):
with m.State("IDLE"):
m.d.sync += sck_r.eq(0)
with m.If(byte_start):
m.d.sync += shift_out.eq(byte_tx)
m.d.sync += bit_ctr.eq(0)
m.next = "RUN"
with m.State("RUN"):
with m.If(clk_en):
with m.If(~sck_r):
# rising edge: slave samples MOSI, master samples MISO
m.d.sync += sck_r.eq(1)
m.d.sync += shift_in.eq(Cat(self.spi_miso, shift_in[:-1]))
with m.Else():
# falling edge: advance / finish
m.d.sync += sck_r.eq(0)
with m.If(bit_ctr == 7):
m.d.sync += byte_rx.eq(shift_in)
m.d.sync += byte_done.eq(1)
m.next = "IDLE"
with m.Else():
m.d.sync += shift_out.eq(Cat(0, shift_out[:-1]))
m.d.sync += bit_ctr.eq(bit_ctr + 1)
# ── Generic register transaction engine (Variable Data Mode) ─────
# One CS-low frame: 3 header bytes (addr_hi, addr_lo, ctrl) then
# xfer_len payload bytes. Writes source payload from wbuf; reads
# capture MISO into rbuf.
WBUF = 8
xfer_start = Signal()
xfer_addr = Signal(16)
xfer_ctrl = Signal(8)
xfer_len = Signal(range(WBUF + 1))
xfer_done = Signal()
wbuf = Array([Signal(8, name=f"wbuf{i}") for i in range(WBUF)])
rbuf = Array([Signal(8, name=f"rbuf{i}") for i in range(WBUF)])
xfer_idx = Signal(range(WBUF + 3))
# Stream-write mode: after the 3-byte header, payload bytes are pulled
# from (s_data, s_valid, s_last) instead of wbuf, until s_last. Used to
# forward a frame straight into the W5500 TX buffer. s_consume pulses
# as each streamed byte is accepted; s_count tracks the byte count.
xfer_stream = Signal()
s_data = Signal(8)
s_valid = Signal()
s_last = Signal()
s_consume = Signal()
s_count = Signal(16)
s_last_r = Signal() # latched s_last for the in-flight byte
# Stream-read mode: after the header, read `xfer_rcount` payload bytes
# (sending 0x00 dummies) and push each out via (r_data, r_valid,
# r_first, r_last) with r_ready back-pressure. Used to pull a frame
# out of the W5500 RX buffer into RXFrameAssembler.
xfer_sread = Signal()
xfer_rcount = Signal(16)
r_data = Signal(8)
r_valid = Signal()
r_first = Signal()
r_last = Signal()
r_ready = Signal()
r_idx = Signal(16)
x_byte = Signal(8)
with m.If(xfer_idx == 0):
m.d.comb += x_byte.eq(xfer_addr[8:16])
with m.Elif(xfer_idx == 1):
m.d.comb += x_byte.eq(xfer_addr[0:8])
with m.Elif(xfer_idx == 2):
m.d.comb += x_byte.eq(xfer_ctrl)
with m.Else():
m.d.comb += x_byte.eq(wbuf[xfer_idx - 3])
m.d.comb += byte_start.eq(0)
m.d.comb += byte_tx.eq(0)
m.d.comb += s_consume.eq(0)
m.d.comb += r_valid.eq(0)
m.d.comb += r_data.eq(0)
m.d.comb += r_first.eq(0)
m.d.comb += r_last.eq(0)
m.d.sync += xfer_done.eq(0)
with m.FSM(domain="sync", name="xfer_fsm"):
with m.State("IDLE"):
with m.If(xfer_start):
m.d.sync += cs_r.eq(0) # assert CS for the frame
m.d.sync += xfer_idx.eq(0)
m.d.sync += s_count.eq(0)
m.d.sync += r_idx.eq(0)
m.next = "LOAD"
with m.State("LOAD"):
m.d.comb += byte_tx.eq(x_byte)
m.d.comb += byte_start.eq(1)
m.next = "WAIT"
with m.State("WAIT"):
with m.If(byte_done):
with m.If(xfer_idx >= 3):
m.d.sync += rbuf[xfer_idx - 3].eq(byte_rx)
with m.If((xfer_idx == 2) & xfer_stream):
m.next = "SLOAD" # stream the payload (write)
with m.Elif((xfer_idx == 2) & xfer_sread):
m.next = "RLOAD" # stream the payload (read)
with m.Elif(~xfer_stream & ~xfer_sread
& (xfer_idx == (xfer_len + 2))):
m.next = "FINISH" # 3 header + len 1
with m.Else():
m.d.sync += xfer_idx.eq(xfer_idx + 1)
m.next = "LOAD"
# ── Streamed-payload sub-loop (TX buffer write) ──────────────
with m.State("SLOAD"):
with m.If(s_valid):
m.d.comb += byte_tx.eq(s_data)
m.d.comb += byte_start.eq(1)
m.d.sync += s_last_r.eq(s_last)
m.next = "SWAIT"
with m.State("SWAIT"):
with m.If(byte_done):
m.d.comb += s_consume.eq(1) # accept this frame byte
m.d.sync += s_count.eq(s_count + 1)
with m.If(s_last_r):
m.next = "FINISH"
with m.Else():
m.next = "SLOAD"
# ── Streamed-payload sub-loop (RX buffer read) ───────────────
with m.State("RLOAD"):
with m.If(r_idx == xfer_rcount):
m.next = "FINISH"
with m.Else():
m.d.comb += byte_tx.eq(0) # dummy MOSI during read
m.d.comb += byte_start.eq(1)
m.next = "RWAIT"
with m.State("RWAIT"):
with m.If(byte_done):
m.next = "RPUSH"
with m.State("RPUSH"):
m.d.comb += r_data .eq(byte_rx)
m.d.comb += r_valid.eq(1)
m.d.comb += r_first.eq(r_idx == 0)
m.d.comb += r_last .eq(r_idx == (xfer_rcount - 1))
with m.If(r_ready):
m.d.sync += r_idx.eq(r_idx + 1)
m.next = "RLOAD"
with m.State("FINISH"):
m.d.sync += cs_r.eq(1) # deassert CS
m.d.sync += xfer_done.eq(1)
m.next = "IDLE"
# Saved MAC for SHAR programming; current W5500 TX write pointer.
mac_shadow = Array([Signal(8, name=f"mac{i}") for i in range(6)])
wait_ctr = Signal(range(self._reset_cycles + 2))
tx_wr = Signal(16)
rx_rsr = Signal(16) # RX received size
rx_rd = Signal(16) # RX read pointer
pkt_len = Signal(16) # MACRAW packet length (incl. 2-byte header)
# Frame stream from TXFrameDrain feeds the xfer engine's stream port.
# tx_ready pulses (= s_consume) as each frame byte is taken into the
# TX-buffer write transaction.
m.d.comb += [
s_data .eq(self.tx_data),
s_valid.eq(self.tx_valid),
s_last .eq(self.tx_eof),
self.tx_ready.eq(s_consume),
]
# RX buffer read stream → RXFrameAssembler.
m.d.comb += [
self.rx_data .eq(r_data),
self.rx_valid.eq(r_valid),
self.rx_sof .eq(r_first),
self.rx_eof .eq(r_last),
r_ready .eq(self.rx_ready),
]
# Helper: a setup state that programs one register-write transaction
# then waits for it to complete and jumps to `nxt`.
def write_reg(name, addr, ctrl, payload, nxt):
with m.State(name):
m.d.sync += xfer_addr.eq(addr)
m.d.sync += xfer_ctrl.eq(ctrl)
m.d.sync += xfer_len.eq(len(payload))
m.d.sync += xfer_stream.eq(0)
m.d.sync += xfer_sread.eq(0)
for i, b in enumerate(payload):
m.d.sync += wbuf[i].eq(b)
m.d.sync += xfer_start.eq(1)
m.next = name + "_W"
with m.State(name + "_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.next = nxt
# ── Main control FSM ─────────────────────────────────────────────
with m.FSM(domain="sync", name="main_fsm"):
with m.State("IDLE"):
m.d.sync += self.init_done.eq(0)
with m.If(self.init_req):
for i in range(6):
m.d.sync += mac_shadow[i].eq(self.par[i*8:(i+1)*8])
m.next = "MR_RST"
with m.Elif(~self.w5500_int_n):
m.next = "RX_CHECK"
with m.Elif(self.tx_valid & self.tx_sof):
m.next = "TX_START"
# Step 1: MR = 0x80 (software reset), then settle ~1 ms.
write_reg("MR_RST", _W5500_MR, _CTRL_WR_COMMON, [0x80], "MR_WAIT")
with m.State("MR_WAIT"):
with m.If(wait_ctr == self._reset_cycles):
m.d.sync += wait_ctr.eq(0)
m.next = "SHAR"
with m.Else():
m.d.sync += wait_ctr.eq(wait_ctr + 1)
# Step 2: SHAR = source MAC (6 bytes from PAR05).
with m.State("SHAR"):
m.d.sync += xfer_addr.eq(_W5500_SHAR)
m.d.sync += xfer_ctrl.eq(_CTRL_WR_COMMON)
m.d.sync += xfer_len.eq(6)
for i in range(6):
m.d.sync += wbuf[i].eq(mac_shadow[i])
m.d.sync += xfer_start.eq(1)
m.next = "SHAR_W"
with m.State("SHAR_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.next = "S0_MR"
# Step 35: S0_MR=MACRAW, S0_CR=OPEN, S0_IMR=RECV|SEND_OK.
write_reg("S0_MR", _W5500_S0_MR, _CTRL_WR_S0REG, [0x04], "S0_CR")
write_reg("S0_CR", _W5500_S0_CR, _CTRL_WR_S0REG, [0x01], "S0_IMR")
write_reg("S0_IMR", _W5500_S0_IMR, _CTRL_WR_S0REG, [0x05], "INIT_DONE")
with m.State("INIT_DONE"):
m.d.sync += self.init_done.eq(1)
m.next = "IDLE"
# ── TX path (MACRAW) ─────────────────────────────────────────
# 1) read S0_TX_WR, 2) stream the frame into the TX buffer at that
# offset, 3) advance S0_TX_WR by the byte count, 4) issue SEND.
with m.State("TX_START"):
m.d.sync += xfer_addr.eq(_W5500_S0_TX_WR)
m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_stream.eq(0)
m.d.sync += wbuf[0].eq(0) # read → send 0x00 dummies
m.d.sync += wbuf[1].eq(0)
m.d.sync += xfer_start.eq(1)
m.next = "TX_RDPTR_W"
with m.State("TX_RDPTR_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += tx_wr.eq(Cat(rbuf[1], rbuf[0])) # big-endian
m.next = "TX_DATA"
with m.State("TX_DATA"):
m.d.sync += xfer_addr.eq(tx_wr)
m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0TX) # socket-0 TX buffer
m.d.sync += xfer_stream.eq(1)
m.d.sync += xfer_start.eq(1)
m.next = "TX_DATA_W"
with m.State("TX_DATA_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += xfer_stream.eq(0)
m.d.sync += tx_wr.eq(tx_wr + s_count) # advanced pointer
m.next = "TX_UPDPTR"
with m.State("TX_UPDPTR"):
m.d.sync += xfer_addr.eq(_W5500_S0_TX_WR)
m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0REG)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_stream.eq(0)
m.d.sync += wbuf[0].eq(tx_wr[8:16]) # hi (already advanced)
m.d.sync += wbuf[1].eq(tx_wr[0:8]) # lo
m.d.sync += xfer_start.eq(1)
m.next = "TX_UPDPTR_W"
with m.State("TX_UPDPTR_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.next = "TX_SEND"
# S0_CR = SEND (0x20)
write_reg("TX_SEND", _W5500_S0_CR, _CTRL_WR_S0REG, [0x20], "IDLE")
# ── RX path (MACRAW) ─────────────────────────────────────────
# Triggered by W5500 INT (w5500_int_n low): read RX_RSR, read
# RX_RD, read the 2-byte MACRAW length, stream the frame out,
# advance RX_RD, issue RECV.
with m.State("RX_CHECK"): # read S0_RX_RSR
m.d.sync += xfer_addr.eq(_W5500_S0_RX_RSR)
m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_stream.eq(0)
m.d.sync += xfer_sread.eq(0)
m.d.sync += wbuf[0].eq(0)
m.d.sync += wbuf[1].eq(0)
m.d.sync += xfer_start.eq(1)
m.next = "RX_RSR_W"
with m.State("RX_RSR_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += rx_rsr.eq(Cat(rbuf[1], rbuf[0]))
m.next = "RX_RSR_CHK"
with m.State("RX_RSR_CHK"):
with m.If(rx_rsr == 0):
m.next = "IDLE" # nothing received
with m.Else():
m.next = "RX_RDPTR"
with m.State("RX_RDPTR"): # read S0_RX_RD
m.d.sync += xfer_addr.eq(_W5500_S0_RX_RD)
m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_start.eq(1)
m.next = "RX_RDPTR_W"
with m.State("RX_RDPTR_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += rx_rd.eq(Cat(rbuf[1], rbuf[0]))
m.next = "RX_LEN"
with m.State("RX_LEN"): # read 2-byte MACRAW length
m.d.sync += xfer_addr.eq(rx_rd)
m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0RX)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_start.eq(1)
m.next = "RX_LEN_W"
with m.State("RX_LEN_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += pkt_len.eq(Cat(rbuf[1], rbuf[0]))
m.next = "RX_FRAME"
with m.State("RX_FRAME"): # stream pkt_len2 frame bytes
m.d.sync += xfer_addr.eq(rx_rd + 2)
m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0RX)
m.d.sync += xfer_sread.eq(1)
m.d.sync += xfer_rcount.eq(pkt_len - 2)
m.d.sync += xfer_start.eq(1)
m.next = "RX_FRAME_W"
with m.State("RX_FRAME_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.d.sync += xfer_sread.eq(0)
m.next = "RX_UPDRD"
with m.State("RX_UPDRD"): # S0_RX_RD += pkt_len
m.d.sync += xfer_addr.eq(_W5500_S0_RX_RD)
m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0REG)
m.d.sync += xfer_len.eq(2)
m.d.sync += xfer_stream.eq(0)
m.d.sync += xfer_sread.eq(0)
m.d.sync += wbuf[0].eq((rx_rd + pkt_len)[8:16])
m.d.sync += wbuf[1].eq((rx_rd + pkt_len)[0:8])
m.d.sync += xfer_start.eq(1)
m.next = "RX_UPDRD_W"
with m.State("RX_UPDRD_W"):
m.d.sync += xfer_start.eq(0)
with m.If(xfer_done):
m.next = "RX_RECV"
# S0_CR = RECV (0x40), then clear the RECV interrupt so INT_N
# deasserts (write 1 to Sn_IR[2]); otherwise the FSM would re-enter
# RX_CHECK forever on a real W5500.
write_reg("RX_RECV", _W5500_S0_CR, _CTRL_WR_S0REG, [0x40], "RX_CLR_IR")
write_reg("RX_CLR_IR", _W5500_S0_IR, _CTRL_WR_S0REG, [0x04], "IDLE")
return m
# ── Testbench ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
from amaranth.sim import Simulator, Period
# Short reset wait so the init sequence runs quickly in simulation.
dut = W5500SPIMaster(reset_cycles=10)
errors = []
# MAC for SHAR: par[i*8:(i+1)*8] = mac byte i → mac = 11 22 33 44 55 66
MAC = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66]
PAR = sum(b << (8 * i) for i, b in enumerate(MAC))
# Expected W5500 init transactions: [addr_hi, addr_lo, ctrl, *payload].
# ctrl 0x04 = common-block write (VDM); 0x0C = socket-0-reg write (VDM).
EXPECTED = [
[0x00, 0x00, 0x04, 0x80], # MR = 0x80 (reset)
[0x00, 0x09, 0x04, *MAC], # SHAR = MAC
[0x00, 0x00, 0x0C, 0x04], # S0_MR = MACRAW
[0x00, 0x01, 0x0C, 0x01], # S0_CR = OPEN
[0x00, 0x2C, 0x0C, 0x05], # S0_IMR = RECV|SEND_OK
]
txns = [] # transactions captured by the W5500 slave model
# RX frame the W5500 will hand back, and the MACRAW length it reports.
RX_FRAME = [0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02]
RX_PKT_LEN = len(RX_FRAME) + 2 # MACRAW length includes the header
def build_response(bsb, addr):
"""Bytes the W5500 drives on MISO for a read of (bsb, addr)."""
if bsb == 1 and addr == _W5500_S0_RX_RSR:
return [(RX_PKT_LEN >> 8) & 0xFF, RX_PKT_LEN & 0xFF]
if bsb == 1 and addr == _W5500_S0_RX_RD:
return [0x00, 0x00] # RX read pointer = 0
if bsb == 3 and addr == 0x0000:
return [(RX_PKT_LEN >> 8) & 0xFF, RX_PKT_LEN & 0xFF] # length
if bsb == 3 and addr == 0x0002:
return list(RX_FRAME) # frame payload
return [0x00] * 64
async def w5500_model(ctx):
"""W5500 SPI slave model: captures CS-framed transactions (MOSI) and,
for reads, drives MISO with canned register/buffer data. Mode 0:
MOSI sampled on rising SCK, MISO shifted out MSB-first.
"""
prev_cs, prev_sck = 1, 0
rx_byte = rx_bits = nbytes = 0
hdr = [0, 0, 0]
is_read = False
resp, ridx = [], 0
msr = msr_bits = 0
cur_txn = []
async for vals in ctx.tick("sync").sample(
dut.spi_cs_n, dut.spi_clk, dut.spi_mosi):
cs, sck, mosi = vals[-3:]
rising = (prev_sck == 0 and sck == 1)
if prev_cs == 1 and cs == 0: # CS falling: start frame
cur_txn = []
rx_byte = rx_bits = nbytes = 0
is_read = False
resp, ridx, msr, msr_bits = [], 0, 0, 0
if cs == 0 and rising:
# MISO bit just sampled by the master → advance shift register
if is_read and nbytes >= 3:
msr = (msr << 1) & 0xFF
msr_bits -= 1
if msr_bits == 0:
msr = resp[ridx] if ridx < len(resp) else 0
ridx += 1
msr_bits = 8
# sample MOSI
rx_byte = ((rx_byte << 1) | mosi) & 0xFF
rx_bits += 1
if rx_bits == 8:
cur_txn.append(rx_byte)
if nbytes < 3:
hdr[nbytes] = rx_byte
if nbytes == 2: # header complete → decode
ctrl = hdr[2]
is_read = (ctrl & 0x04) == 0
bsb = ctrl >> 3
addr = (hdr[0] << 8) | hdr[1]
if is_read:
resp = build_response(bsb, addr)
msr, ridx, msr_bits = resp[0], 1, 8
nbytes += 1
rx_byte = rx_bits = 0
if prev_cs == 0 and cs == 1: # CS rising: end frame
txns.append(list(cur_txn))
ctx.set(dut.spi_miso, (msr >> 7) & 1)
prev_cs, prev_sck = cs, sck
rx_collected = []
async def rx_collector(ctx):
async for vals in ctx.tick("sync").sample(
dut.rx_valid, dut.rx_ready, dut.rx_data):
valid, ready, data = vals[-3:]
if valid and ready:
rx_collected.append(data)
async def testbench(ctx):
ctx.set(dut.par, PAR)
await ctx.tick("sync").repeat(4)
# T1: SPI idle — CLK low (Mode 0), CS high
if ctx.get(dut.spi_clk) != 0:
errors.append("T1 CLK idle != 0")
if ctx.get(dut.spi_cs_n) != 1:
errors.append("T1 CS idle != 1")
print(f"T1 idle: CLK={ctx.get(dut.spi_clk)} CS={ctx.get(dut.spi_cs_n)}")
# T2: run the init sequence
ctx.set(dut.init_req, 1)
await ctx.tick("sync").repeat(1)
ctx.set(dut.init_req, 0)
for _ in range(4000):
await ctx.tick("sync").repeat(1)
if ctx.get(dut.init_done):
break
if not ctx.get(dut.init_done):
errors.append("T2 init_done never asserted")
await ctx.tick("sync").repeat(4)
print(f"T2 init_done: {ctx.get(dut.init_done)}")
# T3: verify the captured init transaction sequence
print(f"T3 captured {len(txns)} init transactions:")
for t in txns:
print(" ", [f"0x{b:02X}" for b in t])
if txns != EXPECTED:
errors.append(f"T3 init sequence mismatch:\n got {txns}\n want {EXPECTED}")
# ── T4: TX a frame (MACRAW) ──────────────────────────────────────
txns.clear()
FRAME = [0xAA, 0xBB, 0xCC, 0xDD]
# With MISO=0 the read returns S0_TX_WR = 0x0000.
TX_EXPECTED = [
[0x00, 0x24, 0x08, 0x00, 0x00], # read S0_TX_WR (dummies)
[0x00, 0x00, 0x14, *FRAME], # write TX buffer @ 0x0000
[0x00, 0x24, 0x0C, 0x00, len(FRAME)], # S0_TX_WR += len
[0x00, 0x01, 0x0C, 0x20], # S0_CR = SEND
]
async def send_frame(frame):
for i, b in enumerate(frame):
ctx.set(dut.tx_data, b)
ctx.set(dut.tx_valid, 1)
ctx.set(dut.tx_sof, 1 if i == 0 else 0)
ctx.set(dut.tx_eof, 1 if i == len(frame) - 1 else 0)
for _ in range(2000):
if ctx.get(dut.tx_ready):
break
await ctx.tick("sync").repeat(1)
await ctx.tick("sync").repeat(1) # complete the consume
ctx.set(dut.tx_valid, 0)
ctx.set(dut.tx_sof, 0)
ctx.set(dut.tx_eof, 0)
await send_frame(FRAME)
# let the pointer-update + SEND transactions finish
for _ in range(2000):
await ctx.tick("sync").repeat(1)
if len(txns) >= len(TX_EXPECTED):
break
await ctx.tick("sync").repeat(4)
print(f"T4 captured {len(txns)} TX transactions:")
for t in txns:
print(" ", [f"0x{b:02X}" for b in t])
if txns != TX_EXPECTED:
errors.append(f"T4 TX sequence mismatch:\n got {txns}\n want {TX_EXPECTED}")
# ── T5: RX a frame (MACRAW) ──────────────────────────────────────
# The model returns RSR=pkt_len, RD=0, MACRAW length=pkt_len, then the
# frame. Expected transactions (read dummies are 0x00):
RX_EXPECTED = [
[0x00, 0x26, 0x08, 0x00, 0x00], # read S0_RX_RSR
[0x00, 0x28, 0x08, 0x00, 0x00], # read S0_RX_RD
[0x00, 0x00, 0x18, 0x00, 0x00], # read MACRAW length
[0x00, 0x02, 0x18, *([0x00] * len(RX_FRAME))], # read frame
[0x00, 0x28, 0x0C, 0x00, RX_PKT_LEN], # S0_RX_RD += pkt_len
[0x00, 0x01, 0x0C, 0x40], # S0_CR = RECV
[0x00, 0x02, 0x0C, 0x04], # S0_IR clear RECV
]
txns.clear()
ctx.set(dut.rx_ready, 1)
ctx.set(dut.w5500_int_n, 0) # signal a received packet
for _ in range(4000):
await ctx.tick("sync").repeat(1)
if len(txns) >= len(RX_EXPECTED):
break
ctx.set(dut.w5500_int_n, 1)
await ctx.tick("sync").repeat(8)
print(f"T5 captured {len(txns)} RX transactions:")
for t in txns:
print(" ", [f"0x{b:02X}" for b in t])
print(f"T5 rx frame: {[f'0x{b:02X}' for b in rx_collected]} "
f"(want {[f'0x{b:02X}' for b in RX_FRAME]})")
if txns != RX_EXPECTED:
errors.append(f"T5 RX sequence mismatch:\n got {txns}\n want {RX_EXPECTED}")
if rx_collected != RX_FRAME:
errors.append(f"T5 RX frame mismatch: got {rx_collected}, want {RX_FRAME}")
sim = Simulator(dut)
sim.add_clock(Period(MHz=24), domain="sync")
sim.add_testbench(testbench)
sim.add_process(w5500_model)
sim.add_process(rx_collector)
with sim.write_vcd("W5500SPIMaster.vcd"):
sim.run()
if errors:
print("\nFAILURES:")
for e in errors:
print(" ", e)
sys.exit(1)
else:
print("\nAll tests passed.")