277 lines
10 KiB
Python
277 lines
10 KiB
Python
"""SPRAM arbiter — sync domain (24 MHz).
|
||
|
||
Owns the iCE40UP5K 128 KB SPRAM (SB_SPRAM256KA, 16-bit wide) and arbitrates
|
||
between two clients:
|
||
|
||
Client A (EXI read) : prefetch pipeline; low priority.
|
||
Client B (ETH write): RXFrameAssembler; high priority.
|
||
|
||
ETH writes win when both clients are active. This is safe because the GC only
|
||
reads pages that the ETH engine has already finished writing (ring-buffer
|
||
invariant).
|
||
|
||
SPRAM addressing
|
||
-----------------
|
||
SB_SPRAM256KA is 64 K × 16-bit. Byte addressing:
|
||
ADDRESS = byte_addr >> 1
|
||
MASKWREN[3:0]:
|
||
0b0011 → write lower byte (byte_addr even)
|
||
0b1100 → write upper byte (byte_addr odd)
|
||
Read: both bytes returned; pick the right one from DATAOUT based on addr bit 0.
|
||
|
||
Read latency: 1 synchronous cycle — result of cycle N is valid at N+1.
|
||
|
||
In simulation (platform is None) a behavioural Array model is used instead of
|
||
the SB_SPRAM256KA Instance so tests run without IceStorm.
|
||
"""
|
||
|
||
from amaranth import *
|
||
from amaranth.lib.memory import Memory
|
||
|
||
__all__ = ["SPRAMArbiter"]
|
||
|
||
_SPRAM_WORDS = 65536 # 64 K 16-bit words = 128 KB
|
||
|
||
|
||
class SPRAMArbiter(Elaboratable):
|
||
"""Arbitrated SPRAM controller in the sync domain.
|
||
|
||
EXI read interface (from BBARegisterFile spram_req / spram_rsp FIFOs)
|
||
----------------------------------------------------------------------
|
||
exi_req_addr : 16-bit byte address to read
|
||
exi_req_valid : FIFO r_rdy — a request is waiting
|
||
exi_req_ready : FIFO r_en — pop the request (asserted when serviced)
|
||
exi_rsp_data : 8-bit result byte
|
||
exi_rsp_valid : FIFO w_en — push result when valid
|
||
|
||
ETH write interface (from RXFrameAssembler)
|
||
-------------------------------------------
|
||
eth_wr_addr : 16-bit byte address to write
|
||
eth_wr_data : 8-bit byte value
|
||
eth_wr_valid : write request present
|
||
eth_wr_ready : write accepted this cycle
|
||
"""
|
||
|
||
def __init__(self):
|
||
# EXI read interface
|
||
self.exi_req_addr = Signal(16)
|
||
self.exi_req_valid = Signal()
|
||
self.exi_req_ready = Signal()
|
||
self.exi_rsp_data = Signal(8)
|
||
self.exi_rsp_valid = Signal()
|
||
|
||
# ETH write interface
|
||
self.eth_wr_addr = Signal(16)
|
||
self.eth_wr_data = Signal(8)
|
||
self.eth_wr_valid = Signal()
|
||
self.eth_wr_ready = Signal()
|
||
|
||
def elaborate(self, platform):
|
||
m = Module()
|
||
|
||
# ── SPRAM instantiation (hardware vs simulation) ──────────────────
|
||
spram_addr = Signal(14) # word address (byte_addr >> 1)
|
||
spram_din = Signal(16)
|
||
spram_dout = Signal(16)
|
||
spram_wren = Signal()
|
||
spram_mask = Signal(4) # MASKWREN
|
||
|
||
if platform is None:
|
||
# Behavioural model: synchronous read with 1-cycle latency.
|
||
# Memory is a Component; read/write ports are obtained from it
|
||
# and wired via its submodule ports (not added as separate submodules).
|
||
mem = Memory(shape=16, depth=_SPRAM_WORDS, init=[])
|
||
m.submodules.mem = mem
|
||
mem_rd = mem.read_port(domain="sync", transparent_for=[])
|
||
mem_wr = mem.write_port(domain="sync", granularity=8)
|
||
|
||
# en[0] = lower byte enable, en[1] = upper byte enable
|
||
byte0_en = Signal()
|
||
byte1_en = Signal()
|
||
m.d.comb += [
|
||
byte0_en .eq(spram_wren & (spram_mask[0] | spram_mask[1])),
|
||
byte1_en .eq(spram_wren & (spram_mask[2] | spram_mask[3])),
|
||
mem_rd.addr .eq(spram_addr),
|
||
mem_rd.en .eq(1),
|
||
spram_dout .eq(mem_rd.data),
|
||
mem_wr.addr .eq(spram_addr),
|
||
mem_wr.data .eq(spram_din),
|
||
mem_wr.en .eq(Cat(byte0_en, byte1_en)),
|
||
]
|
||
else:
|
||
# Hardware: instantiate two SB_SPRAM256KA (64K×16 each; use one)
|
||
m.submodules.spram = Instance(
|
||
"SB_SPRAM256KA",
|
||
i_ADDRESS = spram_addr,
|
||
i_DATAIN = spram_din,
|
||
i_MASKWREN = spram_mask,
|
||
i_WREN = spram_wren,
|
||
i_CHIPSELECT = Const(1, 1),
|
||
i_CLOCK = ClockSignal("sync"),
|
||
i_STANDBY = Const(0, 1),
|
||
i_SLEEP = Const(0, 1),
|
||
i_POWEROFF = Const(1, 1),
|
||
o_DATAOUT = spram_dout,
|
||
)
|
||
|
||
# ── Arbiter pipeline ─────────────────────────────────────────────
|
||
# Stage 1: issue SPRAM address and control signals (combinatorial)
|
||
# Stage 2: capture SPRAM output into rsp_buf (synchronous, 1-cycle)
|
||
|
||
read_pending = Signal() # a read address was issued last cycle
|
||
read_was_odd = Signal() # byte address bit 0 of the pending read
|
||
rsp_buf = Signal(8) # registered response byte; valid when exi_rsp_valid
|
||
|
||
# Combinatorial defaults
|
||
m.d.comb += [
|
||
spram_wren .eq(0),
|
||
spram_mask .eq(0),
|
||
spram_din .eq(0),
|
||
spram_addr .eq(0),
|
||
self.exi_req_ready.eq(0),
|
||
self.eth_wr_ready .eq(0),
|
||
self.exi_rsp_data .eq(rsp_buf), # always sourced from registered buffer
|
||
]
|
||
# Registered defaults
|
||
m.d.sync += [
|
||
self.exi_rsp_valid.eq(0),
|
||
read_pending .eq(0),
|
||
]
|
||
|
||
# ETH write has priority
|
||
with m.If(self.eth_wr_valid):
|
||
m.d.comb += [
|
||
spram_addr .eq(self.eth_wr_addr[1:]),
|
||
spram_wren .eq(1),
|
||
self.eth_wr_ready.eq(1),
|
||
]
|
||
with m.If(self.eth_wr_addr[0]):
|
||
m.d.comb += [
|
||
spram_din [8:16].eq(self.eth_wr_data),
|
||
spram_mask .eq(0b1100),
|
||
]
|
||
with m.Else():
|
||
m.d.comb += [
|
||
spram_din [0:8].eq(self.eth_wr_data),
|
||
spram_mask .eq(0b0011),
|
||
]
|
||
|
||
# EXI read (lower priority)
|
||
with m.Elif(self.exi_req_valid):
|
||
m.d.comb += [
|
||
spram_addr .eq(self.exi_req_addr[1:]),
|
||
self.exi_req_ready.eq(1),
|
||
]
|
||
m.d.sync += [
|
||
read_pending.eq(1),
|
||
read_was_odd.eq(self.exi_req_addr[0]),
|
||
]
|
||
|
||
# Capture SPRAM output into registered buffer after 1-cycle latency
|
||
with m.If(read_pending):
|
||
with m.If(read_was_odd):
|
||
m.d.sync += rsp_buf.eq(spram_dout[8:16])
|
||
with m.Else():
|
||
m.d.sync += rsp_buf.eq(spram_dout[0:8])
|
||
m.d.sync += self.exi_rsp_valid.eq(1)
|
||
|
||
return m
|
||
|
||
|
||
# ── Testbench ─────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
from amaranth.sim import Simulator, Period
|
||
|
||
dut = SPRAMArbiter()
|
||
errors = []
|
||
|
||
async def testbench(ctx):
|
||
await ctx.tick("sync").repeat(2)
|
||
|
||
# T1: ETH write to even byte address 0x0100, then EXI read it back
|
||
ctx.set(dut.eth_wr_addr, 0x0100)
|
||
ctx.set(dut.eth_wr_data, 0xAB)
|
||
ctx.set(dut.eth_wr_valid, 1)
|
||
await ctx.tick("sync").repeat(1)
|
||
accepted = ctx.get(dut.eth_wr_ready)
|
||
if not accepted:
|
||
errors.append("T1 eth write not accepted")
|
||
ctx.set(dut.eth_wr_valid, 0)
|
||
await ctx.tick("sync").repeat(1)
|
||
|
||
# Issue EXI read of the same address
|
||
ctx.set(dut.exi_req_addr, 0x0100)
|
||
ctx.set(dut.exi_req_valid, 1)
|
||
await ctx.tick("sync").repeat(1) # clock A: read issued, read_pending=1
|
||
ctx.set(dut.exi_req_valid, 0)
|
||
await ctx.tick("sync").repeat(1) # clock B: SPRAM output captured, valid=1
|
||
# Check HERE — exi_rsp_valid is 1 for exactly this one cycle
|
||
|
||
rdata = ctx.get(dut.exi_rsp_data)
|
||
rvalid = ctx.get(dut.exi_rsp_valid)
|
||
if rdata != 0xAB:
|
||
errors.append(f"T1 read back: expected 0xAB, got 0x{rdata:02X}")
|
||
if not rvalid:
|
||
errors.append("T1 exi_rsp_valid not set")
|
||
print(f"T1 even addr read-back: data=0x{rdata:02X} valid={rvalid}")
|
||
|
||
await ctx.tick("sync").repeat(2)
|
||
|
||
# T2: ETH write to ODD byte address 0x0101, read back
|
||
ctx.set(dut.eth_wr_addr, 0x0101)
|
||
ctx.set(dut.eth_wr_data, 0xCD)
|
||
ctx.set(dut.eth_wr_valid, 1)
|
||
await ctx.tick("sync").repeat(1)
|
||
ctx.set(dut.eth_wr_valid, 0)
|
||
await ctx.tick("sync").repeat(1)
|
||
|
||
ctx.set(dut.exi_req_addr, 0x0101)
|
||
ctx.set(dut.exi_req_valid, 1)
|
||
await ctx.tick("sync").repeat(1)
|
||
ctx.set(dut.exi_req_valid, 0)
|
||
await ctx.tick("sync").repeat(1)
|
||
|
||
rdata = ctx.get(dut.exi_rsp_data)
|
||
if rdata != 0xCD:
|
||
errors.append(f"T2 odd addr read-back: expected 0xCD, got 0x{rdata:02X}")
|
||
print(f"T2 odd addr read-back: data=0x{rdata:02X}")
|
||
|
||
await ctx.tick("sync").repeat(2)
|
||
|
||
# T3: ETH write wins when both clients active simultaneously
|
||
# Write 0xEE to 0x0200
|
||
ctx.set(dut.eth_wr_addr, 0x0200)
|
||
ctx.set(dut.eth_wr_data, 0xEE)
|
||
ctx.set(dut.eth_wr_valid, 1)
|
||
ctx.set(dut.exi_req_addr, 0x0100) # also wants to read
|
||
ctx.set(dut.exi_req_valid, 1)
|
||
await ctx.tick("sync").repeat(1)
|
||
|
||
eth_won = ctx.get(dut.eth_wr_ready)
|
||
exi_blocked = not ctx.get(dut.exi_req_ready)
|
||
ctx.set(dut.eth_wr_valid, 0)
|
||
ctx.set(dut.exi_req_valid, 0)
|
||
|
||
if not eth_won:
|
||
errors.append("T3 ETH priority: ETH write not accepted")
|
||
if not exi_blocked:
|
||
errors.append("T3 ETH priority: EXI read was not blocked")
|
||
print(f"T3 ETH priority: eth_won={eth_won} exi_blocked={exi_blocked}")
|
||
|
||
sim = Simulator(dut)
|
||
sim.add_clock(Period(MHz=24), domain="sync")
|
||
sim.add_testbench(testbench)
|
||
|
||
with sim.write_vcd("SPRAMArbiter.vcd"):
|
||
sim.run()
|
||
|
||
if errors:
|
||
print("\nFAILURES:")
|
||
for e in errors:
|
||
print(" ", e)
|
||
sys.exit(1)
|
||
else:
|
||
print("\nAll tests passed.")
|