Files
2026-06-13 18:35:38 +02:00

270 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ExiCapture — fast EXI byte-capture front-end (capture domain, 54 MHz).
Wraps the SPIMode3Slave bit engine and bridges it to the slower `exi` domain
(24 MHz) through two AsyncFIFOs:
capture (54 MHz) exi (24 MHz)
┌────────────────────┐ rx_fifo ───► received bytes (header + data)
│ SPIMode3Slave │ (8-bit, capture→exi)
│ (bit engine) │ tx_fifo ◄─── response bytes to drive on MISO
└────────────────────┘ (8-bit, exi→capture)
Why split: the bit engine must oversample a 27 MHz EXI clock 2×, which needs a
54 MHz clock — far faster than the register-file logic can close (~44 MHz).
Only this small, shallow front-end runs fast; everything else stays at 24 MHz.
TX response gating
------------------
Every EXI transaction begins with 2 header bytes (write_flag/addr/len) during
which the GC ignores MISO. The core cannot have produced a response yet (it
hasn't even decoded the header), so the wrapper must NOT pop tx_fifo for those
2 bytes. A per-transaction counter (`txld_cnt`, reset by frame_start) gates the
pop: header bytes drive a don't-care 0xFF; from the first data byte onward the
wrapper pops tx_fifo (one byte per tx_load). `tx_hold` is registered at tx_load
time — before the FIFO advances — so the bit engine latches the correct byte on
the following SPI rising edge (the classic FWFT-advance off-by-one is avoided).
"""
from amaranth import *
from amaranth.lib.cdc import FFSynchronizer
from amaranth.lib.fifo import AsyncFIFO
from exi_bba.spi_mode3_slave import SPIMode3Slave
__all__ = ["ExiCapture"]
class ExiCapture(Elaboratable):
"""EXI front-end: SPI bit engine (capture domain) + byte FIFOs to core.
Physical SPI pins (capture domain)
----------------------------------
spi_clk / spi_mosi / spi_cs_n : raw async inputs from the GC
spi_miso : output to the GC
Core-facing RX byte stream (core domain, FWFT read side of rx_fifo)
------------------------------------------------------------------
rx_data : current received byte
rx_rdy : a received byte is available
rx_en : pop (assert for one core cycle to consume rx_data)
Core-facing TX byte stream (core domain, write side of tx_fifo)
--------------------------------------------------------------
tx_data : response byte to enqueue
tx_en : write strobe
tx_rdy : tx_fifo has room
"""
def __init__(self, rx_depth=4, tx_depth=2):
self._rx_depth = rx_depth
self._tx_depth = tx_depth
# Physical SPI (capture domain, wired to pins by BBATop)
self.spi_clk = Signal(init=1)
self.spi_mosi = Signal()
self.spi_cs_n = Signal(init=1)
self.spi_miso = Signal()
# Core-facing RX read side
self.rx_data = Signal(8)
self.rx_rdy = Signal()
self.rx_en = Signal()
# Core-facing TX write side
self.tx_data = Signal(8)
self.tx_en = Signal()
self.tx_rdy = Signal()
# Core-facing: high (exi domain) while a transaction is in progress.
# The register file uses it to stream variable-length (DMA) reads until
# CS deasserts.
self.cs_active = Signal()
def elaborate(self, platform):
m = Module()
spi = SPIMode3Slave(domain="capture")
m.submodules.spi = spi
rx_fifo = AsyncFIFO(width=8, depth=self._rx_depth,
w_domain="capture", r_domain="exi")
tx_fifo = AsyncFIFO(width=8, depth=self._tx_depth,
w_domain="exi", r_domain="capture")
m.submodules.rx_fifo = rx_fifo
m.submodules.tx_fifo = tx_fifo
# cs_active (capture) → exi domain for the register file
m.submodules.cs_sync = FFSynchronizer(spi.cs_active, self.cs_active,
o_domain="exi")
# ── Physical pins ↔ bit engine ───────────────────────────────────
m.d.comb += [
spi.spi_clk .eq(self.spi_clk),
spi.spi_mosi.eq(self.spi_mosi),
spi.spi_cs_n.eq(self.spi_cs_n),
self.spi_miso.eq(spi.spi_miso),
]
# ── RX: every received byte → rx_fifo (capture write side) ───────
m.d.comb += [
rx_fifo.w_data.eq(spi.rx_byte),
rx_fifo.w_en .eq(spi.rx_valid),
]
# Core read side
m.d.comb += [
self.rx_data .eq(rx_fifo.r_data),
self.rx_rdy .eq(rx_fifo.r_rdy),
rx_fifo.r_en .eq(self.rx_en),
]
# ── TX: core write side ──────────────────────────────────────────
m.d.comb += [
tx_fifo.w_data.eq(self.tx_data),
tx_fifo.w_en .eq(self.tx_en),
self.tx_rdy .eq(tx_fifo.w_rdy),
]
# ── TX response gating (capture domain) ──────────────────────────
# The bit engine drives MISO LIVE from tx_byte = tx_fifo head, so the
# response byte at the head is what gets sent for the current data byte.
# `txld_cnt` counts completed bytes within the transaction (tx_load
# pulses at each byte completion):
# completion 0,1 → header bytes (no pop)
# completion ≥2 → a data byte finished → pop to advance the head
# The first data byte (data0) is served live from the head without a
# pop; the pop after it advances the head to data1's response, etc.
txld_cnt = Signal(2)
m.d.comb += spi.tx_byte.eq(tx_fifo.r_data)
# Pop depends ONLY on the registered tx_load and txld_cnt — NOT on
# frame_start. (frame_start precedes byte-0's tx_load by a cycle and
# has already reset txld_cnt to 0, so byte 0 is never a data byte.)
# Keeping cs_fall/frame_start off the pop path shortens the capture-
# domain critical path through the FIFO consume pointer.
#
# `flushing` clears prefetch over-push left in tx_fifo by the previous
# transaction: the register file streams response bytes ahead of the GC
# clock for DMA reads, so when CS deasserts mid-stream a few unsent
# bytes remain. On CS-fall (frame_start) drain tx_fifo to empty before
# the new transaction's data phase, so stale bytes never reach MISO.
flushing = Signal()
m.d.comb += tx_fifo.r_en.eq(
(spi.tx_load & (txld_cnt >= 2)) | (flushing & tx_fifo.r_rdy)
)
with m.If(spi.frame_start):
m.d.capture += flushing.eq(1)
with m.Elif(~tx_fifo.r_rdy):
m.d.capture += flushing.eq(0)
with m.If(spi.frame_start):
m.d.capture += txld_cnt.eq(0)
with m.Elif(spi.tx_load & (txld_cnt < 3)):
m.d.capture += txld_cnt.eq(txld_cnt + 1)
return m
# ── Testbench ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
from amaranth.sim import Simulator, Period
dut = ExiCapture()
errors = []
# SPI half-period in capture ticks. At 54 MHz capture / 27 MHz EXI the real
# ratio is ~2; use 4 here for a clean, well-oversampled functional check.
HALF = 4
async def spi_byte(ctx, mosi_val):
"""Clock one SPI Mode 3 byte; return the assembled MISO byte."""
miso = 0
for bit in range(7, -1, -1):
ctx.set(dut.spi_mosi, (mosi_val >> bit) & 1)
ctx.set(dut.spi_clk, 0)
await ctx.tick("capture").repeat(HALF)
miso = (miso << 1) | ctx.get(dut.spi_miso)
ctx.set(dut.spi_clk, 1)
await ctx.tick("capture").repeat(HALF)
return miso
async def core_drain_rx(ctx, into):
"""Pop one byte from the core RX side if available."""
if ctx.get(dut.rx_rdy):
into.append(ctx.get(dut.rx_data))
ctx.set(dut.rx_en, 1)
await ctx.tick("exi").repeat(1)
ctx.set(dut.rx_en, 0)
return True
return False
async def push_tx(ctx, b):
ctx.set(dut.tx_data, b)
ctx.set(dut.tx_en, 1)
await ctx.tick("exi").repeat(1)
ctx.set(dut.tx_en, 0)
async def do_txn(ctx, hdr, responses, n_data, rx_seen):
"""One EXI transaction: clock `hdr` bytes, model the clock-idle gap
(drain rx + prefetch `responses` into tx_fifo), then clock `n_data`
data bytes; return the MISO data bytes read."""
ctx.set(dut.spi_cs_n, 0)
ctx.set(dut.spi_clk, 1)
await ctx.tick("capture").repeat(HALF)
for h in hdr:
await spi_byte(ctx, h)
for _ in range(20): # clock-idle gap
await core_drain_rx(ctx, rx_seen)
await ctx.tick("exi").repeat(1)
for r in responses:
await push_tx(ctx, r)
await ctx.tick("capture").repeat(2)
miso = [await spi_byte(ctx, 0x00) for _ in range(n_data)]
ctx.set(dut.spi_cs_n, 1)
await ctx.tick("capture").repeat(HALF)
for _ in range(20): # drain data-phase dummies
await core_drain_rx(ctx, rx_seen)
await ctx.tick("exi").repeat(1)
return miso
async def testbench(ctx):
rx_seen = []
await ctx.tick("capture").repeat(2)
# ── T1: header + 2 data bytes read back ──────────────────────────
miso = await do_txn(ctx, [0x12, 0x34], [0xA5, 0x5A], 2, rx_seen)
print(f"T1 rx={[hex(b) for b in rx_seen[:2]]} MISO={[f'0x{b:02X}' for b in miso]}")
if rx_seen[:2] != [0x12, 0x34]:
errors.append(f"T1 header rx wrong: {rx_seen[:2]}")
if miso != [0xA5, 0x5A]:
errors.append(f"T1 MISO wrong: {[hex(b) for b in miso]}")
# ── T2: prefetch over-push must NOT leak into the next transaction ─
# Txn A pushes 2 responses but the GC clocks only 1 data byte, leaving
# one stale byte in tx_fifo. Txn B must read its OWN fresh responses,
# proving the CS-fall flush cleared the stale prefetch.
rx_seen.clear()
await do_txn(ctx, [0x12, 0x34], [0xA5, 0x5A], 1, rx_seen) # leaves 0x5A
misoB = await do_txn(ctx, [0x12, 0x34], [0x11, 0x22], 2, rx_seen)
print(f"T2 MISO after over-push: {[f'0x{b:02X}' for b in misoB]} (want 0x11 0x22)")
if misoB != [0x11, 0x22]:
errors.append(f"T2 flush failed — stale byte leaked: {[hex(b) for b in misoB]}")
sim = Simulator(dut)
sim.add_clock(Period(MHz=54), domain="capture")
sim.add_clock(Period(MHz=24), domain="exi")
sim.add_testbench(testbench)
with sim.write_vcd("ExiCapture.vcd"):
sim.run()
if errors:
print("\nFAILURES:")
for e in errors:
print(" ", e)
sys.exit(1)
else:
print("\nAll tests passed.")