diff --git a/exi_bba/bba_top.py b/exi_bba/bba_top.py index f6a6ada..76e2acd 100644 --- a/exi_bba/bba_top.py +++ b/exi_bba/bba_top.py @@ -12,14 +12,15 @@ See CLAUDE.md "Module Breakdown" and "CDC Signal Inventory" for the full list. from amaranth import * -from exi_bba.exi_capture import ExiCapture -from exi_bba.bba_register_file import BBARegisterFile -from exi_bba.spram_arbiter import SPRAMArbiter -from exi_bba.rx_frame_assembler import RXFrameAssembler -from exi_bba.tx_frame_drain import TXFrameDrain -from exi_bba.w5500_spi_master import W5500SPIMaster +from exi_bba.exi_capture import ExiCapture +from exi_bba.bba_register_file import BBARegisterFile +from exi_bba.spram_arbiter import SPRAMArbiter +from exi_bba.rx_frame_assembler import RXFrameAssembler +from exi_bba.tx_frame_drain import TXFrameDrain +from exi_bba.w5500_spi_master import W5500SPIMaster from exi_bba.w5100_parallel_master import W5100ParallelMaster -from exi_bba.status_panel import StatusPanel +from exi_bba.status_panel import StatusPanel +from exi_bba.uart_console import UARTConsole from amaranth.lib.cdc import FFSynchronizer @@ -43,7 +44,8 @@ class BBATop(Elaboratable): w5500_rst_n : W5500 hardware reset (output, active low) """ - def __init__(self, eth="w5100", reset_cycles=24000, status_panel=False): + def __init__(self, eth="w5100", reset_cycles=24000, + status_panel=False, uart_console=False): # Ethernet back-end: "w5100" (indirect parallel bus, reaches the EXI # ceiling) or "w5500" (SPI, ~12 Mbit/s). Both expose the identical # tx/rx/init/par interface, so only the physical pins differ. @@ -54,6 +56,9 @@ class BBATop(Elaboratable): # Optional bring-up status panel (drives onboard LEDs/button on the # iCEbreaker — see synth.py). panel_led bit order matches StatusPanel. self._status_panel = status_panel + # Optional UART debug console (8N1 115200, sync domain). + # uart_tx → FT2232H Channel B pin 9; uart_rx ← pin 6. + self._uart_console = uart_console # EXI (GC side) self.exi_clk = Signal(init=1) @@ -89,6 +94,10 @@ class BBATop(Elaboratable): self.panel_led = Signal(5) # to onboard LEDs (see StatusPanel) self.panel_btn = Signal(3) # from onboard button(s) + if uart_console: + self.uart_tx = Signal(init=1) # FPGA → PC (FT2232H Channel B) + self.uart_rx = Signal(init=1) # PC → FPGA + def elaborate(self, platform): m = Module() @@ -277,25 +286,28 @@ class BBATop(Elaboratable): # The RX ring-buffer path is active only after the GC sets NCRA[3]. m.d.comb += asm.rx_enabled.eq(reg.ncra_sr) - # ── Optional bring-up status panel (sync domain) ────────────────── - # init_req = NCRA reset (exi→sync PS), OR'd with the panel's manual - # re-init button when the panel is present. + # ── Optional bring-up peripherals (sync domain) ─────────────────── + # Build init_req as an OR of all reinit sources (NCRA pulse plus any + # manual re-init from the status panel and/or the UART 'r' command). + # "ready" is latched high by eth.init_done and cleared by any init_req. + # It is computed only when at least one peripheral needs it. + + init_req = reg.ncra_rst_o # base: GC-issued NCRA reset + + need_ready = self._status_panel or self._uart_console + if need_ready: + ready = Signal() + if self._status_panel: panel = StatusPanel() m.submodules.panel = panel + init_req = init_req | panel.reinit # cs_active lives in the exi domain; bring it to sync for the LED. cs_a_sync = Signal() m.submodules.panel_cs = FFSynchronizer( cap.cs_active, cs_a_sync, o_domain="sync") - # "ready" = ethernet init complete (latched until the next init). - ready = Signal() - with m.If(eth.init_done): - m.d.sync += ready.eq(1) - with m.Elif(reg.ncra_rst_o | panel.reinit): - m.d.sync += ready.eq(0) - m.d.comb += [ panel.cs_active.eq(cs_a_sync), panel.rx_pulse .eq(asm.rx_irq), @@ -303,10 +315,29 @@ class BBATop(Elaboratable): panel.ready .eq(ready), panel.btn .eq(self.panel_btn), self.panel_led .eq(panel.led), - eth.init_req .eq(reg.ncra_rst_o | panel.reinit), ] - else: - m.d.comb += eth.init_req.eq(reg.ncra_rst_o) + + if self._uart_console: + console = UARTConsole() + m.submodules.console = console + init_req = init_req | console.reinit + + m.d.comb += [ + console.ncra_rst.eq(reg.ncra_rst_o), + console.rx_pulse.eq(asm.rx_irq), + console.tx_pulse.eq(drain.tx_irq), + console.ready .eq(ready), + self.uart_tx .eq(console.uart_tx), + console.uart_rx .eq(self.uart_rx), + ] + + if need_ready: + with m.If(eth.init_done): + m.d.sync += ready.eq(1) + with m.Elif(init_req): + m.d.sync += ready.eq(0) + + m.d.comb += eth.init_req.eq(init_req) return m diff --git a/exi_bba/synth.py b/exi_bba/synth.py index 15c34a9..de53847 100644 --- a/exi_bba/synth.py +++ b/exi_bba/synth.py @@ -68,12 +68,21 @@ class IceBreakerPlatform(LatticeICE40Platform): # Bring-up status panel → iCEbreaker ONBOARD parts (dedicated pins, not # on any PMOD, so they coexist with EXI + W5100). LEDR/LEDG are # active-low discrete LEDs; BTN_N is the user button. - # (The onboard RGB LED on pins 39/40/41 needs an SB_RGBA_DRV instance - # wired to raw pads — board/version-specific — left as a future add-on - # to expose rx/tx/ready as colours; the 2 discrete LEDs cover bring-up.) + # RGB LED (pins 39/40/41) is driven via SB_RGBA_DRV — not declared here + # as a platform resource (see BBATopSynth.elaborate, status_panel block). Resource("ledr", 0, Pins("11", dir="o"), Attrs(IO_STANDARD="SB_LVCMOS")), Resource("ledg", 0, Pins("37", dir="o"), Attrs(IO_STANDARD="SB_LVCMOS")), Resource("btn", 0, Pins("10", dir="i"), Attrs(IO_STANDARD="SB_LVCMOS")), + + # UART debug console → iCEbreaker FT2232H Channel B (onboard USB-UART). + # No external hardware needed — the FT2232H is already on the board. + # On the PC: open the second USB serial port at 115200 8N1. + # pin 9 = FPGA TX → FT2232H Channel B RX (FPGA drives) + # pin 6 = FPGA RX ← FT2232H Channel B TX (FPGA reads) + Resource("uart", 0, + Subsignal("tx", Pins("9", dir="o")), + Subsignal("rx", Pins("6", dir="i")), + Attrs(IO_STANDARD="SB_LVCMOS")), ] connectors = [] @@ -153,6 +162,14 @@ class BBATopSynth(BBATop): o_RGB2=Signal(name="rgb_b"), ) + # ── UART debug console → FT2232H Channel B ───────────────────── + if self._uart_console: + uart = platform.request("uart", 0) + m.d.comb += [ + uart.tx.o .eq(self.uart_tx), + self.uart_rx .eq(uart.rx.i), + ] + return m @@ -181,7 +198,7 @@ if __name__ == "__main__": print(f"{'='*60}") opts = (f"--opt-timing --seed {seed} --timing-allow-fail") try: - platform.build(BBATopSynth(status_panel=True), do_program=False, + platform.build(BBATopSynth(status_panel=True, uart_console=True), do_program=False, verbose=True, nextpnr_opts=opts) except Exception as exc: # nextpnr exits non-zero even with --timing-allow-fail on some @@ -212,7 +229,7 @@ if __name__ == "__main__": if do_flash: print(f"\nFlashing with seed {best_seed}...") opts = f"--opt-timing --seed {best_seed} --timing-allow-fail" - platform.build(BBATopSynth(status_panel=True), do_program=True, + platform.build(BBATopSynth(status_panel=True, uart_console=True), do_program=True, verbose=True, nextpnr_opts=opts) print("Done.") diff --git a/exi_bba/uart_console.py b/exi_bba/uart_console.py new file mode 100644 index 0000000..5b3369e --- /dev/null +++ b/exi_bba/uart_console.py @@ -0,0 +1,394 @@ +"""UART debug console (sync domain, 24 MHz). + +Logs key BBA events as short ASCII messages at 115200 baud 8N1 and accepts +single-byte commands over RX ('r' = force reinit). + +iCEbreaker pin assignments (onboard FT2232H Channel B): + uart_tx → pin 9 (FPGA → PC) + uart_rx ← pin 6 (PC → FPGA) + +On the PC, open the second USB serial port at 115200 8N1: + minicom -D /dev/ttyUSB1 -b 115200 # Linux + screen /dev/tty.usbserial-XXX1 115200 # macOS + +Events logged +------------- + "BBA READY\\r\\n" — one-shot on first clock (bitstream loaded) + "RST\\r\\n" — NCRA reset issued by the GC + "ETH UP\\r\\n" — ethernet init complete (rising edge of ready) + "RX\\r\\n" — ethernet frame received + "TX\\r\\n" — ethernet frame transmitted + +Commands (single byte on RX) +----------------------------- + 'r' → one-cycle reinit pulse (caller should re-init the BBA) + +Event overflow: if a second event fires while the sender is busy the +pending flag is set and the message is queued (one slot per event type). +Simultaneous rapid-fire events of the same type coalesce to one message. +""" + +from amaranth import * +from amaranth.lib.cdc import FFSynchronizer + +__all__ = ["UARTConsole"] + +# ── Message ROM ─────────────────────────────────────────────────────────────── + +_MSGS = [ + b"BBA READY\r\n", # 0 EVT_BOOT + b"RST\r\n", # 1 EVT_RST + b"ETH UP\r\n", # 2 EVT_READY + b"RX\r\n", # 3 EVT_RX + b"TX\r\n", # 4 EVT_TX +] +_N = len(_MSGS) +_ROM = b"".join(_MSGS) +_START = [sum(len(m) for m in _MSGS[:i]) for i in range(_N)] +_LEN = [len(m) for m in _MSGS] + + +class UARTConsole(Elaboratable): + """Bring-up UART console — TX event log + RX command interface. + + All signals are in the sync domain (24 MHz). + + Parameters + ---------- + clk_freq : source clock in Hz (default 24 MHz) + baud_rate : UART baud rate (default 115 200) + """ + + EVT_BOOT = 0 + EVT_RST = 1 + EVT_READY = 2 + EVT_RX = 3 + EVT_TX = 4 + + def __init__(self, clk_freq=24_000_000, baud_rate=115_200): + self._div = round(clk_freq / baud_rate) + + # ── Event inputs (sync domain) ──────────────────────────────────── + self.ncra_rst = Signal() # pulse: GC issued NCRA reset + self.rx_pulse = Signal() # pulse: ethernet frame received + self.tx_pulse = Signal() # pulse: ethernet frame sent + self.ready = Signal() # level: ethernet init complete + + # ── Outputs ─────────────────────────────────────────────────────── + self.reinit = Signal() # pulse: 'r' received → caller re-inits + + # ── UART pins (connect directly to platform resources) ──────────── + self.uart_tx = Signal(init=1) # idle high + self.uart_rx = Signal(init=1) + + def elaborate(self, platform): + m = Module() + div = self._div + + # ── Message ROM (compile-time constant array) ───────────────────── + rom = Array([Const(b, 8) for b in _ROM]) + + # ── Event detection ─────────────────────────────────────────────── + pending = Signal(_N) + boot_done = Signal() + ready_r = Signal() + + # Boot: fire once on the first clock after reset + with m.If(~boot_done): + m.d.sync += [pending[self.EVT_BOOT].eq(1), boot_done.eq(1)] + + # Rising edge of ready → "ETH UP" (log once per init cycle) + m.d.sync += ready_r.eq(self.ready) + with m.If(self.ncra_rst): + m.d.sync += pending[self.EVT_RST].eq(1) + with m.If(self.ready & ~ready_r): + m.d.sync += pending[self.EVT_READY].eq(1) + with m.If(self.rx_pulse): + m.d.sync += pending[self.EVT_RX].eq(1) + with m.If(self.tx_pulse): + m.d.sync += pending[self.EVT_TX].eq(1) + + # ── UART TX — 8N1 shift register ────────────────────────────────── + # shreg[0] drives uart_tx. Load: start(0) + data[7:0] + stop(1). + # Shift right each baud period; fill MSB with 1 (idle level). + tx_cnt = Signal(range(div)) + tx_bits = Signal(range(11)) # 10 → 0 while sending + tx_shreg = Signal(10, init=0b1111111111) + tx_busy = Signal() + tx_load = Signal() + tx_byte = Signal(8) + + m.d.comb += [ + tx_busy.eq(tx_bits != 0), + self.uart_tx.eq(tx_shreg[0]), + ] + with m.If(tx_load & ~tx_busy): + m.d.sync += [ + tx_shreg.eq(Cat(Const(0, 1), tx_byte, Const(1, 1))), + tx_bits.eq(10), + tx_cnt.eq(div - 1), + ] + with m.Elif(tx_busy): + with m.If(tx_cnt == 0): + m.d.sync += [ + tx_shreg.eq(Cat(tx_shreg[1:], Const(1, 1))), + tx_bits.eq(tx_bits - 1), + tx_cnt.eq(div - 1), + ] + with m.Else(): + m.d.sync += tx_cnt.eq(tx_cnt - 1) + + # ── Message sender FSM ──────────────────────────────────────────── + # In IDLE: pick the highest-priority pending event and load its ROM + # range. In SEND: stream bytes from ROM through the UART TX one at + # a time, waiting for the transmitter to become free between bytes. + rom_ptr = Signal(range(len(_ROM) + 1)) + rom_end = Signal(range(len(_ROM) + 1)) + + # Default: tx_load/tx_byte driven only from SEND state + m.d.comb += [tx_load.eq(0), tx_byte.eq(0)] + + with m.FSM(name="sender"): + with m.State("IDLE"): + with m.If(pending[self.EVT_BOOT]): + m.d.sync += [ + rom_ptr.eq(_START[self.EVT_BOOT]), + rom_end.eq(_START[self.EVT_BOOT] + _LEN[self.EVT_BOOT]), + pending[self.EVT_BOOT].eq(0), + ] + m.next = "SEND" + with m.Elif(pending[self.EVT_RST]): + m.d.sync += [ + rom_ptr.eq(_START[self.EVT_RST]), + rom_end.eq(_START[self.EVT_RST] + _LEN[self.EVT_RST]), + pending[self.EVT_RST].eq(0), + ] + m.next = "SEND" + with m.Elif(pending[self.EVT_READY]): + m.d.sync += [ + rom_ptr.eq(_START[self.EVT_READY]), + rom_end.eq(_START[self.EVT_READY] + _LEN[self.EVT_READY]), + pending[self.EVT_READY].eq(0), + ] + m.next = "SEND" + with m.Elif(pending[self.EVT_RX]): + m.d.sync += [ + rom_ptr.eq(_START[self.EVT_RX]), + rom_end.eq(_START[self.EVT_RX] + _LEN[self.EVT_RX]), + pending[self.EVT_RX].eq(0), + ] + m.next = "SEND" + with m.Elif(pending[self.EVT_TX]): + m.d.sync += [ + rom_ptr.eq(_START[self.EVT_TX]), + rom_end.eq(_START[self.EVT_TX] + _LEN[self.EVT_TX]), + pending[self.EVT_TX].eq(0), + ] + m.next = "SEND" + + with m.State("SEND"): + with m.If(~tx_busy): + with m.If(rom_ptr == rom_end): + m.next = "IDLE" + with m.Else(): + m.d.comb += [tx_load.eq(1), tx_byte.eq(rom[rom_ptr])] + m.d.sync += rom_ptr.eq(rom_ptr + 1) + + # ── UART RX — 8N1 mid-bit sampling ─────────────────────────────── + # Synchronise the raw pin, detect the start-bit falling edge, then + # sample each subsequent bit at the centre of its baud period. + rx_sync = Signal(init=1) + rx_prev = Signal(init=1) + rx_cnt = Signal(range(div)) + rx_bits = Signal(range(9)) + rx_shreg = Signal(8) + rx_byte = Signal(8) + rx_valid = Signal() + + m.submodules += FFSynchronizer(self.uart_rx, rx_sync, init=1) + + # Default: no valid byte this cycle (overridden in STOP when needed) + m.d.sync += rx_valid.eq(0) + + with m.FSM(name="rx_fsm"): + with m.State("IDLE"): + m.d.sync += rx_prev.eq(rx_sync) + with m.If(rx_prev & ~rx_sync): # falling edge = start bit + m.d.sync += rx_cnt.eq(div // 2 - 1) + m.next = "START" + + with m.State("START"): + # Wait until the centre of the start bit, then verify it is + # still low (rules out glitches / false starts). + with m.If(rx_cnt == 0): + with m.If(~rx_sync): + m.d.sync += [rx_cnt.eq(div - 1), rx_bits.eq(7)] + m.next = "DATA" + with m.Else(): + m.next = "IDLE" + with m.Else(): + m.d.sync += rx_cnt.eq(rx_cnt - 1) + + with m.State("DATA"): + with m.If(rx_cnt == 0): + # UART sends LSB first; shift new bit into MSB so that + # after 8 samples rx_shreg[0] = bit-0, rx_shreg[7] = bit-7. + m.d.sync += [ + rx_shreg.eq(Cat(rx_shreg[1:], rx_sync)), + rx_cnt.eq(div - 1), + ] + with m.If(rx_bits == 0): + m.next = "STOP" + with m.Else(): + m.d.sync += rx_bits.eq(rx_bits - 1) + with m.Else(): + m.d.sync += rx_cnt.eq(rx_cnt - 1) + + with m.State("STOP"): + with m.If(rx_cnt == 0): + with m.If(rx_sync): # valid stop bit + m.d.sync += [rx_byte.eq(rx_shreg), rx_valid.eq(1)] + m.next = "IDLE" + with m.Else(): + m.d.sync += rx_cnt.eq(rx_cnt - 1) + + # ── Command decoder ─────────────────────────────────────────────── + m.d.sync += self.reinit.eq(0) + with m.If(rx_valid): + with m.If(rx_byte == ord('r')): + m.d.sync += self.reinit.eq(1) + + return m + + +# ── Testbench ───────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import sys + from amaranth.sim import Simulator, Period + + CLK_FREQ = 24_000_000 + BAUD_RATE = 115_200 + DIV = round(CLK_FREQ / BAUD_RATE) # 208 cycles per bit + + dut = UARTConsole(clk_freq=CLK_FREQ, baud_rate=BAUD_RATE) + errors = [] + + # ── TX helpers ──────────────────────────────────────────────────────── + async def recv_byte(ctx): + """Receive one 8N1 byte from uart_tx. Waits for start bit.""" + # Wait for start bit (falling edge) + while ctx.get(dut.uart_tx) != 0: + await ctx.tick() + # Centre of start bit + await ctx.tick().repeat(DIV // 2) + assert ctx.get(dut.uart_tx) == 0, "false start" + # Sample 8 data bits + byte = 0 + for i in range(8): + await ctx.tick().repeat(DIV) + byte |= ctx.get(dut.uart_tx) << i + # Stop bit + await ctx.tick().repeat(DIV) + assert ctx.get(dut.uart_tx) == 1, "missing stop bit" + return byte + + async def recv_str(ctx, n): + return bytes([await recv_byte(ctx) for _ in range(n)]) + + # ── RX helper ───────────────────────────────────────────────────────── + async def send_byte_watch(ctx, val, watch_sig): + """Send one 8N1 byte on uart_rx, polling watch_sig every tick. + + Returns True if watch_sig was seen high at any point. The reinit + pulse fires ~100 cycles before the stop-bit wait completes, so + a tick-by-tick poll is required to catch it. + """ + seen = False + ctx.set(dut.uart_rx, 0) # start bit + for _ in range(DIV): + await ctx.tick() + if ctx.get(watch_sig): seen = True + for i in range(8): + ctx.set(dut.uart_rx, (val >> i) & 1) + for _ in range(DIV): + await ctx.tick() + if ctx.get(watch_sig): seen = True + ctx.set(dut.uart_rx, 1) # stop bit + margin + for _ in range(DIV + 4): + await ctx.tick() + if ctx.get(watch_sig): seen = True + return seen + + # ── Tests ───────────────────────────────────────────────────────────── + async def testbench(ctx): + ctx.set(dut.uart_rx, 1) + + # T1: boot message fires on first clock + msg = await recv_str(ctx, len(b"BBA READY\r\n")) + print(f"T1 boot: {msg!r}") + if msg != b"BBA READY\r\n": + errors.append(f"T1 boot: got {msg!r}") + + # T2: pulse ncra_rst → "RST\r\n" + ctx.set(dut.ncra_rst, 1) + await ctx.tick() + ctx.set(dut.ncra_rst, 0) + msg = await recv_str(ctx, len(b"RST\r\n")) + print(f"T2 rst: {msg!r}") + if msg != b"RST\r\n": + errors.append(f"T2 rst: got {msg!r}") + + # T3: ready rising edge → "ETH UP\r\n" + ctx.set(dut.ready, 1) + await ctx.tick() + msg = await recv_str(ctx, len(b"ETH UP\r\n")) + print(f"T3 ready: {msg!r}") + if msg != b"ETH UP\r\n": + errors.append(f"T3 ready: got {msg!r}") + + # T4: rx_pulse → "RX\r\n" + ctx.set(dut.rx_pulse, 1) + await ctx.tick() + ctx.set(dut.rx_pulse, 0) + msg = await recv_str(ctx, len(b"RX\r\n")) + print(f"T4 rx: {msg!r}") + if msg != b"RX\r\n": + errors.append(f"T4 rx: got {msg!r}") + + # T5: tx_pulse → "TX\r\n" + ctx.set(dut.tx_pulse, 1) + await ctx.tick() + ctx.set(dut.tx_pulse, 0) + msg = await recv_str(ctx, len(b"TX\r\n")) + print(f"T5 tx: {msg!r}") + if msg != b"TX\r\n": + errors.append(f"T5 tx: got {msg!r}") + + # T6: send 'r' → reinit pulse. + # Poll tick-by-tick: the pulse fires ~100 cycles before send completes. + got = await send_byte_watch(ctx, ord('r'), dut.reinit) + print(f"T6 reinit after 'r': {got}") + if not got: + errors.append("T6 reinit: pulse not seen") + + # T7: send unknown byte → no reinit + got = await send_byte_watch(ctx, ord('x'), dut.reinit) + print(f"T7 reinit after 'x': {got} (want 0)") + if got: + errors.append("T7 spurious reinit on 'x'") + + sim = Simulator(dut) + sim.add_clock(Period(MHz=24)) + sim.add_testbench(testbench) + + with sim.write_vcd("UARTConsole.vcd"): + sim.run() + + if errors: + print("\nFAILURES:") + for e in errors: + print(" ", e) + sys.exit(1) + else: + print("\nAll UARTConsole tests passed.")