"""W5500 SPI master — sync domain (24 MHz). SPI Mode 0 (CPOL=0, CPHA=0): CLK idles LOW, data captured on rising edge. SCK = 12 MHz: the sync domain is 24 MHz and the bit engine toggles SCK via a clock-enable (sync ÷ 2). W5500 frame format ------------------ Byte 0–1 Address (16-bit big-endian) Byte 2 Control: [7:3]=BSB [2]=R/W [1:0]=OM Byte 3+ Data BSB values used here: 0b00000 Common registers 0b00001 Socket 0 registers 0b00010 Socket 0 TX buffer 0b00011 Socket 0 RX buffer After NCRA reset the driver issues the W5500 init sequence (MR reset, SHAR, S0_MR MACRAW, S0_CR OPEN, S0_IMR). The module provides: - A streaming TX interface (tx_data/tx_valid/tx_ready + sof/eof framing) - A streaming RX interface (rx_data/rx_valid/rx_ready + sof/eof) - init_req / init_done for the NCRA-triggered init sequence - MAC source address shadow input (par[0..5]) for SHAR programming """ from amaranth import * __all__ = ["W5500SPIMaster"] # W5500 register addresses. The 16-bit address is the OFFSET WITHIN A BLOCK; # the block is selected by the BSB field of the control byte (see _CTRL_*), # NOT by the address. So socket-0 registers use small offsets with BSB=1. _W5500_MR = 0x0000 # Mode register (common block) _W5500_SHAR = 0x0009 # Source MAC, 6 bytes (common block) _W5500_S0_MR = 0x0000 # Socket 0 Mode (socket-0 block) _W5500_S0_CR = 0x0001 # Socket 0 Command _W5500_S0_IR = 0x0002 # Socket 0 Interrupt _W5500_S0_RXBUF_SIZE = 0x001E # Socket 0 RX buffer size _W5500_S0_TXBUF_SIZE = 0x001F # Socket 0 TX buffer size _W5500_S0_TX_FSR = 0x0020 # Socket 0 TX Free Size (2 bytes) _W5500_S0_TX_WR = 0x0024 # Socket 0 TX Write Pointer _W5500_S0_RX_RSR = 0x0026 # Socket 0 RX Received Size (2 bytes) _W5500_S0_RX_RD = 0x0028 # Socket 0 RX Read Pointer _W5500_S0_IMR = 0x002C # Socket 0 Interrupt Mask # Control byte = (BSB << 3) | (RWB << 2) | OM. # RWB: 1=write 0=read. OM=00 → Variable Data Mode (CS frames the length). # BSB: 0=common, 1=socket0 reg, 2=socket0 TX buffer, 3=socket0 RX buffer. _CTRL_WR_COMMON = (0 << 3) | (1 << 2) # 0x04 _CTRL_WR_S0REG = (1 << 3) | (1 << 2) # 0x0C _CTRL_RD_S0REG = (1 << 3) | (0 << 2) # 0x08 _CTRL_WR_S0TX = (2 << 3) | (1 << 2) # 0x14 _CTRL_RD_S0RX = (3 << 3) | (0 << 2) # 0x18 class W5500SPIMaster(Elaboratable): """W5500 SPI master in the sync clock domain. Physical SPI pins ----------------- spi_clk / spi_mosi / spi_miso / spi_cs_n : to W5500 w5500_int_n : W5500 INT_N input (active low) w5500_rst_n : W5500 hardware reset (active low) Init interface (from BBARegisterFile / BBATop) ---------------------------------------------- init_req : pulse to trigger the W5500 init sequence init_done : pulse when init sequence completes par : 6-byte MAC address (sampled at init_req) TX streaming interface (from TXFrameDrain, sync domain) ------------------------------------------------------- tx_data / tx_valid / tx_ready : byte stream tx_sof / tx_eof : frame delimiters on the same cycle as tx_valid RX streaming interface (to RXFrameAssembler, sync domain) ---------------------------------------------------------- rx_data / rx_valid / rx_ready : byte stream rx_sof / rx_eof : frame delimiters """ def __init__(self, clk_div=1, reset_cycles=24000): # MR-reset settle wait (in sync cycles). ~1 ms; the testbench # overrides with a small value for fast simulation. self._reset_cycles = reset_cycles # SPI SCK = sync_clock / (2 * clk_div). clk_div=1 → full rate (SCK = # sync/2): at the 24 MHz slow domain that is 12 MHz SCK (~12 Mbit/s), # which comfortably exceeds real-world GC BBA TCP throughput. The W5500 # tolerates up to 80 MHz SCK, so the divider exists only as a safety # knob for board-level signal-integrity issues, not a functional need. self._clk_div = clk_div # Physical SPI self.spi_clk = Signal() self.spi_mosi = Signal() self.spi_miso = Signal() self.spi_cs_n = Signal(init=1) self.w5500_int_n = Signal(init=1) self.w5500_rst_n = Signal(init=1) # Init control self.init_req = Signal() self.init_done = Signal() self.par = Signal(48) # MAC address (PAR0..5 packed) # TX stream self.tx_data = Signal(8) self.tx_valid = Signal() self.tx_ready = Signal() self.tx_sof = Signal() self.tx_eof = Signal() # RX stream self.rx_data = Signal(8) self.rx_valid = Signal() self.rx_ready = Signal() self.rx_sof = Signal() self.rx_eof = Signal() def elaborate(self, platform): m = Module() # ── SPI clock enable ───────────────────────────────────────────── # clk_en high every `clk_div` sync cycles. The bit engine toggles SCK # on each enabled cycle, so SCK = sync / (2 * clk_div). clk_en = Signal() if self._clk_div <= 1: m.d.comb += clk_en.eq(1) # full rate: SCK = sync/2 else: div_ctr = Signal(range(self._clk_div)) with m.If(div_ctr == self._clk_div - 1): m.d.sync += div_ctr.eq(0) with m.Else(): m.d.sync += div_ctr.eq(div_ctr + 1) m.d.comb += clk_en.eq(div_ctr == self._clk_div - 1) # ── SPI pin registers (Mode 0: SCK idles LOW) ──────────────────── sck_r = Signal() cs_r = Signal(init=1) shift_out = Signal(8) shift_in = Signal(8) m.d.comb += self.spi_clk .eq(sck_r) m.d.comb += self.spi_cs_n.eq(cs_r) m.d.comb += self.spi_mosi.eq(shift_out[7]) # MSB first; valid pre-rising # ── Byte-transfer engine (Mode 0) ──────────────────────────────── # On byte_start, shift out byte_tx MSB-first (8 SCK cycles) and capture # MISO into byte_rx; pulse byte_done. CS is owned by the xfer engine. byte_start = Signal() byte_tx = Signal(8) byte_rx = Signal(8) byte_done = Signal() bit_ctr = Signal(4) m.d.sync += byte_done.eq(0) with m.FSM(domain="sync", name="byte_fsm"): with m.State("IDLE"): m.d.sync += sck_r.eq(0) with m.If(byte_start): m.d.sync += shift_out.eq(byte_tx) m.d.sync += bit_ctr.eq(0) m.next = "RUN" with m.State("RUN"): with m.If(clk_en): with m.If(~sck_r): # rising edge: slave samples MOSI, master samples MISO m.d.sync += sck_r.eq(1) m.d.sync += shift_in.eq(Cat(self.spi_miso, shift_in[:-1])) with m.Else(): # falling edge: advance / finish m.d.sync += sck_r.eq(0) with m.If(bit_ctr == 7): m.d.sync += byte_rx.eq(shift_in) m.d.sync += byte_done.eq(1) m.next = "IDLE" with m.Else(): m.d.sync += shift_out.eq(Cat(0, shift_out[:-1])) m.d.sync += bit_ctr.eq(bit_ctr + 1) # ── Generic register transaction engine (Variable Data Mode) ───── # One CS-low frame: 3 header bytes (addr_hi, addr_lo, ctrl) then # xfer_len payload bytes. Writes source payload from wbuf; reads # capture MISO into rbuf. WBUF = 8 xfer_start = Signal() xfer_addr = Signal(16) xfer_ctrl = Signal(8) xfer_len = Signal(range(WBUF + 1)) xfer_done = Signal() wbuf = Array([Signal(8, name=f"wbuf{i}") for i in range(WBUF)]) rbuf = Array([Signal(8, name=f"rbuf{i}") for i in range(WBUF)]) xfer_idx = Signal(range(WBUF + 3)) # Stream-write mode: after the 3-byte header, payload bytes are pulled # from (s_data, s_valid, s_last) instead of wbuf, until s_last. Used to # forward a frame straight into the W5500 TX buffer. s_consume pulses # as each streamed byte is accepted; s_count tracks the byte count. xfer_stream = Signal() s_data = Signal(8) s_valid = Signal() s_last = Signal() s_consume = Signal() s_count = Signal(16) s_last_r = Signal() # latched s_last for the in-flight byte # Stream-read mode: after the header, read `xfer_rcount` payload bytes # (sending 0x00 dummies) and push each out via (r_data, r_valid, # r_first, r_last) with r_ready back-pressure. Used to pull a frame # out of the W5500 RX buffer into RXFrameAssembler. xfer_sread = Signal() xfer_rcount = Signal(16) r_data = Signal(8) r_valid = Signal() r_first = Signal() r_last = Signal() r_ready = Signal() r_idx = Signal(16) x_byte = Signal(8) with m.If(xfer_idx == 0): m.d.comb += x_byte.eq(xfer_addr[8:16]) with m.Elif(xfer_idx == 1): m.d.comb += x_byte.eq(xfer_addr[0:8]) with m.Elif(xfer_idx == 2): m.d.comb += x_byte.eq(xfer_ctrl) with m.Else(): m.d.comb += x_byte.eq(wbuf[xfer_idx - 3]) m.d.comb += byte_start.eq(0) m.d.comb += byte_tx.eq(0) m.d.comb += s_consume.eq(0) m.d.comb += r_valid.eq(0) m.d.comb += r_data.eq(0) m.d.comb += r_first.eq(0) m.d.comb += r_last.eq(0) m.d.sync += xfer_done.eq(0) with m.FSM(domain="sync", name="xfer_fsm"): with m.State("IDLE"): with m.If(xfer_start): m.d.sync += cs_r.eq(0) # assert CS for the frame m.d.sync += xfer_idx.eq(0) m.d.sync += s_count.eq(0) m.d.sync += r_idx.eq(0) m.next = "LOAD" with m.State("LOAD"): m.d.comb += byte_tx.eq(x_byte) m.d.comb += byte_start.eq(1) m.next = "WAIT" with m.State("WAIT"): with m.If(byte_done): with m.If(xfer_idx >= 3): m.d.sync += rbuf[xfer_idx - 3].eq(byte_rx) with m.If((xfer_idx == 2) & xfer_stream): m.next = "SLOAD" # stream the payload (write) with m.Elif((xfer_idx == 2) & xfer_sread): m.next = "RLOAD" # stream the payload (read) with m.Elif(~xfer_stream & ~xfer_sread & (xfer_idx == (xfer_len + 2))): m.next = "FINISH" # 3 header + len − 1 with m.Else(): m.d.sync += xfer_idx.eq(xfer_idx + 1) m.next = "LOAD" # ── Streamed-payload sub-loop (TX buffer write) ────────────── with m.State("SLOAD"): with m.If(s_valid): m.d.comb += byte_tx.eq(s_data) m.d.comb += byte_start.eq(1) m.d.sync += s_last_r.eq(s_last) m.next = "SWAIT" with m.State("SWAIT"): with m.If(byte_done): m.d.comb += s_consume.eq(1) # accept this frame byte m.d.sync += s_count.eq(s_count + 1) with m.If(s_last_r): m.next = "FINISH" with m.Else(): m.next = "SLOAD" # ── Streamed-payload sub-loop (RX buffer read) ─────────────── with m.State("RLOAD"): with m.If(r_idx == xfer_rcount): m.next = "FINISH" with m.Else(): m.d.comb += byte_tx.eq(0) # dummy MOSI during read m.d.comb += byte_start.eq(1) m.next = "RWAIT" with m.State("RWAIT"): with m.If(byte_done): m.next = "RPUSH" with m.State("RPUSH"): m.d.comb += r_data .eq(byte_rx) m.d.comb += r_valid.eq(1) m.d.comb += r_first.eq(r_idx == 0) m.d.comb += r_last .eq(r_idx == (xfer_rcount - 1)) with m.If(r_ready): m.d.sync += r_idx.eq(r_idx + 1) m.next = "RLOAD" with m.State("FINISH"): m.d.sync += cs_r.eq(1) # deassert CS m.d.sync += xfer_done.eq(1) m.next = "IDLE" # Saved MAC for SHAR programming; current W5500 TX write pointer. mac_shadow = Array([Signal(8, name=f"mac{i}") for i in range(6)]) wait_ctr = Signal(range(self._reset_cycles + 2)) tx_wr = Signal(16) rx_rsr = Signal(16) # RX received size rx_rd = Signal(16) # RX read pointer pkt_len = Signal(16) # MACRAW packet length (incl. 2-byte header) # Frame stream from TXFrameDrain feeds the xfer engine's stream port. # tx_ready pulses (= s_consume) as each frame byte is taken into the # TX-buffer write transaction. m.d.comb += [ s_data .eq(self.tx_data), s_valid.eq(self.tx_valid), s_last .eq(self.tx_eof), self.tx_ready.eq(s_consume), ] # RX buffer read stream → RXFrameAssembler. m.d.comb += [ self.rx_data .eq(r_data), self.rx_valid.eq(r_valid), self.rx_sof .eq(r_first), self.rx_eof .eq(r_last), r_ready .eq(self.rx_ready), ] # Helper: a setup state that programs one register-write transaction # then waits for it to complete and jumps to `nxt`. def write_reg(name, addr, ctrl, payload, nxt): with m.State(name): m.d.sync += xfer_addr.eq(addr) m.d.sync += xfer_ctrl.eq(ctrl) m.d.sync += xfer_len.eq(len(payload)) m.d.sync += xfer_stream.eq(0) m.d.sync += xfer_sread.eq(0) for i, b in enumerate(payload): m.d.sync += wbuf[i].eq(b) m.d.sync += xfer_start.eq(1) m.next = name + "_W" with m.State(name + "_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.next = nxt # ── Main control FSM ───────────────────────────────────────────── with m.FSM(domain="sync", name="main_fsm"): with m.State("IDLE"): m.d.sync += self.init_done.eq(0) with m.If(self.init_req): for i in range(6): m.d.sync += mac_shadow[i].eq(self.par[i*8:(i+1)*8]) m.next = "MR_RST" with m.Elif(~self.w5500_int_n): m.next = "RX_CHECK" with m.Elif(self.tx_valid & self.tx_sof): m.next = "TX_START" # Step 1: MR = 0x80 (software reset), then settle ~1 ms. write_reg("MR_RST", _W5500_MR, _CTRL_WR_COMMON, [0x80], "MR_WAIT") with m.State("MR_WAIT"): with m.If(wait_ctr == self._reset_cycles): m.d.sync += wait_ctr.eq(0) m.next = "SHAR" with m.Else(): m.d.sync += wait_ctr.eq(wait_ctr + 1) # Step 2: SHAR = source MAC (6 bytes from PAR0–5). with m.State("SHAR"): m.d.sync += xfer_addr.eq(_W5500_SHAR) m.d.sync += xfer_ctrl.eq(_CTRL_WR_COMMON) m.d.sync += xfer_len.eq(6) for i in range(6): m.d.sync += wbuf[i].eq(mac_shadow[i]) m.d.sync += xfer_start.eq(1) m.next = "SHAR_W" with m.State("SHAR_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.next = "S0_MR" # Step 3–5: S0_MR=MACRAW, S0_CR=OPEN, S0_IMR=RECV|SEND_OK. write_reg("S0_MR", _W5500_S0_MR, _CTRL_WR_S0REG, [0x04], "S0_CR") write_reg("S0_CR", _W5500_S0_CR, _CTRL_WR_S0REG, [0x01], "S0_IMR") write_reg("S0_IMR", _W5500_S0_IMR, _CTRL_WR_S0REG, [0x05], "INIT_DONE") with m.State("INIT_DONE"): m.d.sync += self.init_done.eq(1) m.next = "IDLE" # ── TX path (MACRAW) ───────────────────────────────────────── # 1) read S0_TX_WR, 2) stream the frame into the TX buffer at that # offset, 3) advance S0_TX_WR by the byte count, 4) issue SEND. with m.State("TX_START"): m.d.sync += xfer_addr.eq(_W5500_S0_TX_WR) m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_stream.eq(0) m.d.sync += wbuf[0].eq(0) # read → send 0x00 dummies m.d.sync += wbuf[1].eq(0) m.d.sync += xfer_start.eq(1) m.next = "TX_RDPTR_W" with m.State("TX_RDPTR_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += tx_wr.eq(Cat(rbuf[1], rbuf[0])) # big-endian m.next = "TX_DATA" with m.State("TX_DATA"): m.d.sync += xfer_addr.eq(tx_wr) m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0TX) # socket-0 TX buffer m.d.sync += xfer_stream.eq(1) m.d.sync += xfer_start.eq(1) m.next = "TX_DATA_W" with m.State("TX_DATA_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += xfer_stream.eq(0) m.d.sync += tx_wr.eq(tx_wr + s_count) # advanced pointer m.next = "TX_UPDPTR" with m.State("TX_UPDPTR"): m.d.sync += xfer_addr.eq(_W5500_S0_TX_WR) m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0REG) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_stream.eq(0) m.d.sync += wbuf[0].eq(tx_wr[8:16]) # hi (already advanced) m.d.sync += wbuf[1].eq(tx_wr[0:8]) # lo m.d.sync += xfer_start.eq(1) m.next = "TX_UPDPTR_W" with m.State("TX_UPDPTR_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.next = "TX_SEND" # S0_CR = SEND (0x20) write_reg("TX_SEND", _W5500_S0_CR, _CTRL_WR_S0REG, [0x20], "IDLE") # ── RX path (MACRAW) ───────────────────────────────────────── # Triggered by W5500 INT (w5500_int_n low): read RX_RSR, read # RX_RD, read the 2-byte MACRAW length, stream the frame out, # advance RX_RD, issue RECV. with m.State("RX_CHECK"): # read S0_RX_RSR m.d.sync += xfer_addr.eq(_W5500_S0_RX_RSR) m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_stream.eq(0) m.d.sync += xfer_sread.eq(0) m.d.sync += wbuf[0].eq(0) m.d.sync += wbuf[1].eq(0) m.d.sync += xfer_start.eq(1) m.next = "RX_RSR_W" with m.State("RX_RSR_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += rx_rsr.eq(Cat(rbuf[1], rbuf[0])) m.next = "RX_RSR_CHK" with m.State("RX_RSR_CHK"): with m.If(rx_rsr == 0): m.next = "IDLE" # nothing received with m.Else(): m.next = "RX_RDPTR" with m.State("RX_RDPTR"): # read S0_RX_RD m.d.sync += xfer_addr.eq(_W5500_S0_RX_RD) m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0REG) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_start.eq(1) m.next = "RX_RDPTR_W" with m.State("RX_RDPTR_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += rx_rd.eq(Cat(rbuf[1], rbuf[0])) m.next = "RX_LEN" with m.State("RX_LEN"): # read 2-byte MACRAW length m.d.sync += xfer_addr.eq(rx_rd) m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0RX) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_start.eq(1) m.next = "RX_LEN_W" with m.State("RX_LEN_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += pkt_len.eq(Cat(rbuf[1], rbuf[0])) m.next = "RX_FRAME" with m.State("RX_FRAME"): # stream pkt_len−2 frame bytes m.d.sync += xfer_addr.eq(rx_rd + 2) m.d.sync += xfer_ctrl.eq(_CTRL_RD_S0RX) m.d.sync += xfer_sread.eq(1) m.d.sync += xfer_rcount.eq(pkt_len - 2) m.d.sync += xfer_start.eq(1) m.next = "RX_FRAME_W" with m.State("RX_FRAME_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.d.sync += xfer_sread.eq(0) m.next = "RX_UPDRD" with m.State("RX_UPDRD"): # S0_RX_RD += pkt_len m.d.sync += xfer_addr.eq(_W5500_S0_RX_RD) m.d.sync += xfer_ctrl.eq(_CTRL_WR_S0REG) m.d.sync += xfer_len.eq(2) m.d.sync += xfer_stream.eq(0) m.d.sync += xfer_sread.eq(0) m.d.sync += wbuf[0].eq((rx_rd + pkt_len)[8:16]) m.d.sync += wbuf[1].eq((rx_rd + pkt_len)[0:8]) m.d.sync += xfer_start.eq(1) m.next = "RX_UPDRD_W" with m.State("RX_UPDRD_W"): m.d.sync += xfer_start.eq(0) with m.If(xfer_done): m.next = "RX_RECV" # S0_CR = RECV (0x40), then clear the RECV interrupt so INT_N # deasserts (write 1 to Sn_IR[2]); otherwise the FSM would re-enter # RX_CHECK forever on a real W5500. write_reg("RX_RECV", _W5500_S0_CR, _CTRL_WR_S0REG, [0x40], "RX_CLR_IR") write_reg("RX_CLR_IR", _W5500_S0_IR, _CTRL_WR_S0REG, [0x04], "IDLE") return m # ── Testbench ───────────────────────────────────────────────────────────── if __name__ == "__main__": import sys from amaranth.sim import Simulator, Period # Short reset wait so the init sequence runs quickly in simulation. dut = W5500SPIMaster(reset_cycles=10) errors = [] # MAC for SHAR: par[i*8:(i+1)*8] = mac byte i → mac = 11 22 33 44 55 66 MAC = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66] PAR = sum(b << (8 * i) for i, b in enumerate(MAC)) # Expected W5500 init transactions: [addr_hi, addr_lo, ctrl, *payload]. # ctrl 0x04 = common-block write (VDM); 0x0C = socket-0-reg write (VDM). EXPECTED = [ [0x00, 0x00, 0x04, 0x80], # MR = 0x80 (reset) [0x00, 0x09, 0x04, *MAC], # SHAR = MAC [0x00, 0x00, 0x0C, 0x04], # S0_MR = MACRAW [0x00, 0x01, 0x0C, 0x01], # S0_CR = OPEN [0x00, 0x2C, 0x0C, 0x05], # S0_IMR = RECV|SEND_OK ] txns = [] # transactions captured by the W5500 slave model # RX frame the W5500 will hand back, and the MACRAW length it reports. RX_FRAME = [0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02] RX_PKT_LEN = len(RX_FRAME) + 2 # MACRAW length includes the header def build_response(bsb, addr): """Bytes the W5500 drives on MISO for a read of (bsb, addr).""" if bsb == 1 and addr == _W5500_S0_RX_RSR: return [(RX_PKT_LEN >> 8) & 0xFF, RX_PKT_LEN & 0xFF] if bsb == 1 and addr == _W5500_S0_RX_RD: return [0x00, 0x00] # RX read pointer = 0 if bsb == 3 and addr == 0x0000: return [(RX_PKT_LEN >> 8) & 0xFF, RX_PKT_LEN & 0xFF] # length if bsb == 3 and addr == 0x0002: return list(RX_FRAME) # frame payload return [0x00] * 64 async def w5500_model(ctx): """W5500 SPI slave model: captures CS-framed transactions (MOSI) and, for reads, drives MISO with canned register/buffer data. Mode 0: MOSI sampled on rising SCK, MISO shifted out MSB-first. """ prev_cs, prev_sck = 1, 0 rx_byte = rx_bits = nbytes = 0 hdr = [0, 0, 0] is_read = False resp, ridx = [], 0 msr = msr_bits = 0 cur_txn = [] async for vals in ctx.tick("sync").sample( dut.spi_cs_n, dut.spi_clk, dut.spi_mosi): cs, sck, mosi = vals[-3:] rising = (prev_sck == 0 and sck == 1) if prev_cs == 1 and cs == 0: # CS falling: start frame cur_txn = [] rx_byte = rx_bits = nbytes = 0 is_read = False resp, ridx, msr, msr_bits = [], 0, 0, 0 if cs == 0 and rising: # MISO bit just sampled by the master → advance shift register if is_read and nbytes >= 3: msr = (msr << 1) & 0xFF msr_bits -= 1 if msr_bits == 0: msr = resp[ridx] if ridx < len(resp) else 0 ridx += 1 msr_bits = 8 # sample MOSI rx_byte = ((rx_byte << 1) | mosi) & 0xFF rx_bits += 1 if rx_bits == 8: cur_txn.append(rx_byte) if nbytes < 3: hdr[nbytes] = rx_byte if nbytes == 2: # header complete → decode ctrl = hdr[2] is_read = (ctrl & 0x04) == 0 bsb = ctrl >> 3 addr = (hdr[0] << 8) | hdr[1] if is_read: resp = build_response(bsb, addr) msr, ridx, msr_bits = resp[0], 1, 8 nbytes += 1 rx_byte = rx_bits = 0 if prev_cs == 0 and cs == 1: # CS rising: end frame txns.append(list(cur_txn)) ctx.set(dut.spi_miso, (msr >> 7) & 1) prev_cs, prev_sck = cs, sck rx_collected = [] async def rx_collector(ctx): async for vals in ctx.tick("sync").sample( dut.rx_valid, dut.rx_ready, dut.rx_data): valid, ready, data = vals[-3:] if valid and ready: rx_collected.append(data) async def testbench(ctx): ctx.set(dut.par, PAR) await ctx.tick("sync").repeat(4) # T1: SPI idle — CLK low (Mode 0), CS high if ctx.get(dut.spi_clk) != 0: errors.append("T1 CLK idle != 0") if ctx.get(dut.spi_cs_n) != 1: errors.append("T1 CS idle != 1") print(f"T1 idle: CLK={ctx.get(dut.spi_clk)} CS={ctx.get(dut.spi_cs_n)}") # T2: run the init sequence ctx.set(dut.init_req, 1) await ctx.tick("sync").repeat(1) ctx.set(dut.init_req, 0) for _ in range(4000): await ctx.tick("sync").repeat(1) if ctx.get(dut.init_done): break if not ctx.get(dut.init_done): errors.append("T2 init_done never asserted") await ctx.tick("sync").repeat(4) print(f"T2 init_done: {ctx.get(dut.init_done)}") # T3: verify the captured init transaction sequence print(f"T3 captured {len(txns)} init transactions:") for t in txns: print(" ", [f"0x{b:02X}" for b in t]) if txns != EXPECTED: errors.append(f"T3 init sequence mismatch:\n got {txns}\n want {EXPECTED}") # ── T4: TX a frame (MACRAW) ────────────────────────────────────── txns.clear() FRAME = [0xAA, 0xBB, 0xCC, 0xDD] # With MISO=0 the read returns S0_TX_WR = 0x0000. TX_EXPECTED = [ [0x00, 0x24, 0x08, 0x00, 0x00], # read S0_TX_WR (dummies) [0x00, 0x00, 0x14, *FRAME], # write TX buffer @ 0x0000 [0x00, 0x24, 0x0C, 0x00, len(FRAME)], # S0_TX_WR += len [0x00, 0x01, 0x0C, 0x20], # S0_CR = SEND ] async def send_frame(frame): for i, b in enumerate(frame): ctx.set(dut.tx_data, b) ctx.set(dut.tx_valid, 1) ctx.set(dut.tx_sof, 1 if i == 0 else 0) ctx.set(dut.tx_eof, 1 if i == len(frame) - 1 else 0) for _ in range(2000): if ctx.get(dut.tx_ready): break await ctx.tick("sync").repeat(1) await ctx.tick("sync").repeat(1) # complete the consume ctx.set(dut.tx_valid, 0) ctx.set(dut.tx_sof, 0) ctx.set(dut.tx_eof, 0) await send_frame(FRAME) # let the pointer-update + SEND transactions finish for _ in range(2000): await ctx.tick("sync").repeat(1) if len(txns) >= len(TX_EXPECTED): break await ctx.tick("sync").repeat(4) print(f"T4 captured {len(txns)} TX transactions:") for t in txns: print(" ", [f"0x{b:02X}" for b in t]) if txns != TX_EXPECTED: errors.append(f"T4 TX sequence mismatch:\n got {txns}\n want {TX_EXPECTED}") # ── T5: RX a frame (MACRAW) ────────────────────────────────────── # The model returns RSR=pkt_len, RD=0, MACRAW length=pkt_len, then the # frame. Expected transactions (read dummies are 0x00): RX_EXPECTED = [ [0x00, 0x26, 0x08, 0x00, 0x00], # read S0_RX_RSR [0x00, 0x28, 0x08, 0x00, 0x00], # read S0_RX_RD [0x00, 0x00, 0x18, 0x00, 0x00], # read MACRAW length [0x00, 0x02, 0x18, *([0x00] * len(RX_FRAME))], # read frame [0x00, 0x28, 0x0C, 0x00, RX_PKT_LEN], # S0_RX_RD += pkt_len [0x00, 0x01, 0x0C, 0x40], # S0_CR = RECV [0x00, 0x02, 0x0C, 0x04], # S0_IR clear RECV ] txns.clear() ctx.set(dut.rx_ready, 1) ctx.set(dut.w5500_int_n, 0) # signal a received packet for _ in range(4000): await ctx.tick("sync").repeat(1) if len(txns) >= len(RX_EXPECTED): break ctx.set(dut.w5500_int_n, 1) await ctx.tick("sync").repeat(8) print(f"T5 captured {len(txns)} RX transactions:") for t in txns: print(" ", [f"0x{b:02X}" for b in t]) print(f"T5 rx frame: {[f'0x{b:02X}' for b in rx_collected]} " f"(want {[f'0x{b:02X}' for b in RX_FRAME]})") if txns != RX_EXPECTED: errors.append(f"T5 RX sequence mismatch:\n got {txns}\n want {RX_EXPECTED}") if rx_collected != RX_FRAME: errors.append(f"T5 RX frame mismatch: got {rx_collected}, want {RX_FRAME}") sim = Simulator(dut) sim.add_clock(Period(MHz=24), domain="sync") sim.add_testbench(testbench) sim.add_process(w5500_model) sim.add_process(rx_collector) with sim.write_vcd("W5500SPIMaster.vcd"): sim.run() if errors: print("\nFAILURES:") for e in errors: print(" ", e) sys.exit(1) else: print("\nAll tests passed.")