Added Uart.

This commit is contained in:
2026-06-14 09:37:59 +02:00
parent a7c88109a9
commit 85f82c8740
3 changed files with 468 additions and 26 deletions
+394
View File
@@ -0,0 +1,394 @@
"""UART debug console (sync domain, 24 MHz).
Logs key BBA events as short ASCII messages at 115200 baud 8N1 and accepts
single-byte commands over RX ('r' = force reinit).
iCEbreaker pin assignments (onboard FT2232H Channel B):
uart_tx → pin 9 (FPGA → PC)
uart_rx ← pin 6 (PC → FPGA)
On the PC, open the second USB serial port at 115200 8N1:
minicom -D /dev/ttyUSB1 -b 115200 # Linux
screen /dev/tty.usbserial-XXX1 115200 # macOS
Events logged
-------------
"BBA READY\\r\\n" — one-shot on first clock (bitstream loaded)
"RST\\r\\n" — NCRA reset issued by the GC
"ETH UP\\r\\n" — ethernet init complete (rising edge of ready)
"RX\\r\\n" — ethernet frame received
"TX\\r\\n" — ethernet frame transmitted
Commands (single byte on RX)
-----------------------------
'r' → one-cycle reinit pulse (caller should re-init the BBA)
Event overflow: if a second event fires while the sender is busy the
pending flag is set and the message is queued (one slot per event type).
Simultaneous rapid-fire events of the same type coalesce to one message.
"""
from amaranth import *
from amaranth.lib.cdc import FFSynchronizer
__all__ = ["UARTConsole"]
# ── Message ROM ───────────────────────────────────────────────────────────────
_MSGS = [
b"BBA READY\r\n", # 0 EVT_BOOT
b"RST\r\n", # 1 EVT_RST
b"ETH UP\r\n", # 2 EVT_READY
b"RX\r\n", # 3 EVT_RX
b"TX\r\n", # 4 EVT_TX
]
_N = len(_MSGS)
_ROM = b"".join(_MSGS)
_START = [sum(len(m) for m in _MSGS[:i]) for i in range(_N)]
_LEN = [len(m) for m in _MSGS]
class UARTConsole(Elaboratable):
"""Bring-up UART console — TX event log + RX command interface.
All signals are in the sync domain (24 MHz).
Parameters
----------
clk_freq : source clock in Hz (default 24 MHz)
baud_rate : UART baud rate (default 115 200)
"""
EVT_BOOT = 0
EVT_RST = 1
EVT_READY = 2
EVT_RX = 3
EVT_TX = 4
def __init__(self, clk_freq=24_000_000, baud_rate=115_200):
self._div = round(clk_freq / baud_rate)
# ── Event inputs (sync domain) ────────────────────────────────────
self.ncra_rst = Signal() # pulse: GC issued NCRA reset
self.rx_pulse = Signal() # pulse: ethernet frame received
self.tx_pulse = Signal() # pulse: ethernet frame sent
self.ready = Signal() # level: ethernet init complete
# ── Outputs ───────────────────────────────────────────────────────
self.reinit = Signal() # pulse: 'r' received → caller re-inits
# ── UART pins (connect directly to platform resources) ────────────
self.uart_tx = Signal(init=1) # idle high
self.uart_rx = Signal(init=1)
def elaborate(self, platform):
m = Module()
div = self._div
# ── Message ROM (compile-time constant array) ─────────────────────
rom = Array([Const(b, 8) for b in _ROM])
# ── Event detection ───────────────────────────────────────────────
pending = Signal(_N)
boot_done = Signal()
ready_r = Signal()
# Boot: fire once on the first clock after reset
with m.If(~boot_done):
m.d.sync += [pending[self.EVT_BOOT].eq(1), boot_done.eq(1)]
# Rising edge of ready → "ETH UP" (log once per init cycle)
m.d.sync += ready_r.eq(self.ready)
with m.If(self.ncra_rst):
m.d.sync += pending[self.EVT_RST].eq(1)
with m.If(self.ready & ~ready_r):
m.d.sync += pending[self.EVT_READY].eq(1)
with m.If(self.rx_pulse):
m.d.sync += pending[self.EVT_RX].eq(1)
with m.If(self.tx_pulse):
m.d.sync += pending[self.EVT_TX].eq(1)
# ── UART TX — 8N1 shift register ──────────────────────────────────
# shreg[0] drives uart_tx. Load: start(0) + data[7:0] + stop(1).
# Shift right each baud period; fill MSB with 1 (idle level).
tx_cnt = Signal(range(div))
tx_bits = Signal(range(11)) # 10 → 0 while sending
tx_shreg = Signal(10, init=0b1111111111)
tx_busy = Signal()
tx_load = Signal()
tx_byte = Signal(8)
m.d.comb += [
tx_busy.eq(tx_bits != 0),
self.uart_tx.eq(tx_shreg[0]),
]
with m.If(tx_load & ~tx_busy):
m.d.sync += [
tx_shreg.eq(Cat(Const(0, 1), tx_byte, Const(1, 1))),
tx_bits.eq(10),
tx_cnt.eq(div - 1),
]
with m.Elif(tx_busy):
with m.If(tx_cnt == 0):
m.d.sync += [
tx_shreg.eq(Cat(tx_shreg[1:], Const(1, 1))),
tx_bits.eq(tx_bits - 1),
tx_cnt.eq(div - 1),
]
with m.Else():
m.d.sync += tx_cnt.eq(tx_cnt - 1)
# ── Message sender FSM ────────────────────────────────────────────
# In IDLE: pick the highest-priority pending event and load its ROM
# range. In SEND: stream bytes from ROM through the UART TX one at
# a time, waiting for the transmitter to become free between bytes.
rom_ptr = Signal(range(len(_ROM) + 1))
rom_end = Signal(range(len(_ROM) + 1))
# Default: tx_load/tx_byte driven only from SEND state
m.d.comb += [tx_load.eq(0), tx_byte.eq(0)]
with m.FSM(name="sender"):
with m.State("IDLE"):
with m.If(pending[self.EVT_BOOT]):
m.d.sync += [
rom_ptr.eq(_START[self.EVT_BOOT]),
rom_end.eq(_START[self.EVT_BOOT] + _LEN[self.EVT_BOOT]),
pending[self.EVT_BOOT].eq(0),
]
m.next = "SEND"
with m.Elif(pending[self.EVT_RST]):
m.d.sync += [
rom_ptr.eq(_START[self.EVT_RST]),
rom_end.eq(_START[self.EVT_RST] + _LEN[self.EVT_RST]),
pending[self.EVT_RST].eq(0),
]
m.next = "SEND"
with m.Elif(pending[self.EVT_READY]):
m.d.sync += [
rom_ptr.eq(_START[self.EVT_READY]),
rom_end.eq(_START[self.EVT_READY] + _LEN[self.EVT_READY]),
pending[self.EVT_READY].eq(0),
]
m.next = "SEND"
with m.Elif(pending[self.EVT_RX]):
m.d.sync += [
rom_ptr.eq(_START[self.EVT_RX]),
rom_end.eq(_START[self.EVT_RX] + _LEN[self.EVT_RX]),
pending[self.EVT_RX].eq(0),
]
m.next = "SEND"
with m.Elif(pending[self.EVT_TX]):
m.d.sync += [
rom_ptr.eq(_START[self.EVT_TX]),
rom_end.eq(_START[self.EVT_TX] + _LEN[self.EVT_TX]),
pending[self.EVT_TX].eq(0),
]
m.next = "SEND"
with m.State("SEND"):
with m.If(~tx_busy):
with m.If(rom_ptr == rom_end):
m.next = "IDLE"
with m.Else():
m.d.comb += [tx_load.eq(1), tx_byte.eq(rom[rom_ptr])]
m.d.sync += rom_ptr.eq(rom_ptr + 1)
# ── UART RX — 8N1 mid-bit sampling ───────────────────────────────
# Synchronise the raw pin, detect the start-bit falling edge, then
# sample each subsequent bit at the centre of its baud period.
rx_sync = Signal(init=1)
rx_prev = Signal(init=1)
rx_cnt = Signal(range(div))
rx_bits = Signal(range(9))
rx_shreg = Signal(8)
rx_byte = Signal(8)
rx_valid = Signal()
m.submodules += FFSynchronizer(self.uart_rx, rx_sync, init=1)
# Default: no valid byte this cycle (overridden in STOP when needed)
m.d.sync += rx_valid.eq(0)
with m.FSM(name="rx_fsm"):
with m.State("IDLE"):
m.d.sync += rx_prev.eq(rx_sync)
with m.If(rx_prev & ~rx_sync): # falling edge = start bit
m.d.sync += rx_cnt.eq(div // 2 - 1)
m.next = "START"
with m.State("START"):
# Wait until the centre of the start bit, then verify it is
# still low (rules out glitches / false starts).
with m.If(rx_cnt == 0):
with m.If(~rx_sync):
m.d.sync += [rx_cnt.eq(div - 1), rx_bits.eq(7)]
m.next = "DATA"
with m.Else():
m.next = "IDLE"
with m.Else():
m.d.sync += rx_cnt.eq(rx_cnt - 1)
with m.State("DATA"):
with m.If(rx_cnt == 0):
# UART sends LSB first; shift new bit into MSB so that
# after 8 samples rx_shreg[0] = bit-0, rx_shreg[7] = bit-7.
m.d.sync += [
rx_shreg.eq(Cat(rx_shreg[1:], rx_sync)),
rx_cnt.eq(div - 1),
]
with m.If(rx_bits == 0):
m.next = "STOP"
with m.Else():
m.d.sync += rx_bits.eq(rx_bits - 1)
with m.Else():
m.d.sync += rx_cnt.eq(rx_cnt - 1)
with m.State("STOP"):
with m.If(rx_cnt == 0):
with m.If(rx_sync): # valid stop bit
m.d.sync += [rx_byte.eq(rx_shreg), rx_valid.eq(1)]
m.next = "IDLE"
with m.Else():
m.d.sync += rx_cnt.eq(rx_cnt - 1)
# ── Command decoder ───────────────────────────────────────────────
m.d.sync += self.reinit.eq(0)
with m.If(rx_valid):
with m.If(rx_byte == ord('r')):
m.d.sync += self.reinit.eq(1)
return m
# ── Testbench ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
from amaranth.sim import Simulator, Period
CLK_FREQ = 24_000_000
BAUD_RATE = 115_200
DIV = round(CLK_FREQ / BAUD_RATE) # 208 cycles per bit
dut = UARTConsole(clk_freq=CLK_FREQ, baud_rate=BAUD_RATE)
errors = []
# ── TX helpers ────────────────────────────────────────────────────────
async def recv_byte(ctx):
"""Receive one 8N1 byte from uart_tx. Waits for start bit."""
# Wait for start bit (falling edge)
while ctx.get(dut.uart_tx) != 0:
await ctx.tick()
# Centre of start bit
await ctx.tick().repeat(DIV // 2)
assert ctx.get(dut.uart_tx) == 0, "false start"
# Sample 8 data bits
byte = 0
for i in range(8):
await ctx.tick().repeat(DIV)
byte |= ctx.get(dut.uart_tx) << i
# Stop bit
await ctx.tick().repeat(DIV)
assert ctx.get(dut.uart_tx) == 1, "missing stop bit"
return byte
async def recv_str(ctx, n):
return bytes([await recv_byte(ctx) for _ in range(n)])
# ── RX helper ─────────────────────────────────────────────────────────
async def send_byte_watch(ctx, val, watch_sig):
"""Send one 8N1 byte on uart_rx, polling watch_sig every tick.
Returns True if watch_sig was seen high at any point. The reinit
pulse fires ~100 cycles before the stop-bit wait completes, so
a tick-by-tick poll is required to catch it.
"""
seen = False
ctx.set(dut.uart_rx, 0) # start bit
for _ in range(DIV):
await ctx.tick()
if ctx.get(watch_sig): seen = True
for i in range(8):
ctx.set(dut.uart_rx, (val >> i) & 1)
for _ in range(DIV):
await ctx.tick()
if ctx.get(watch_sig): seen = True
ctx.set(dut.uart_rx, 1) # stop bit + margin
for _ in range(DIV + 4):
await ctx.tick()
if ctx.get(watch_sig): seen = True
return seen
# ── Tests ─────────────────────────────────────────────────────────────
async def testbench(ctx):
ctx.set(dut.uart_rx, 1)
# T1: boot message fires on first clock
msg = await recv_str(ctx, len(b"BBA READY\r\n"))
print(f"T1 boot: {msg!r}")
if msg != b"BBA READY\r\n":
errors.append(f"T1 boot: got {msg!r}")
# T2: pulse ncra_rst → "RST\r\n"
ctx.set(dut.ncra_rst, 1)
await ctx.tick()
ctx.set(dut.ncra_rst, 0)
msg = await recv_str(ctx, len(b"RST\r\n"))
print(f"T2 rst: {msg!r}")
if msg != b"RST\r\n":
errors.append(f"T2 rst: got {msg!r}")
# T3: ready rising edge → "ETH UP\r\n"
ctx.set(dut.ready, 1)
await ctx.tick()
msg = await recv_str(ctx, len(b"ETH UP\r\n"))
print(f"T3 ready: {msg!r}")
if msg != b"ETH UP\r\n":
errors.append(f"T3 ready: got {msg!r}")
# T4: rx_pulse → "RX\r\n"
ctx.set(dut.rx_pulse, 1)
await ctx.tick()
ctx.set(dut.rx_pulse, 0)
msg = await recv_str(ctx, len(b"RX\r\n"))
print(f"T4 rx: {msg!r}")
if msg != b"RX\r\n":
errors.append(f"T4 rx: got {msg!r}")
# T5: tx_pulse → "TX\r\n"
ctx.set(dut.tx_pulse, 1)
await ctx.tick()
ctx.set(dut.tx_pulse, 0)
msg = await recv_str(ctx, len(b"TX\r\n"))
print(f"T5 tx: {msg!r}")
if msg != b"TX\r\n":
errors.append(f"T5 tx: got {msg!r}")
# T6: send 'r' → reinit pulse.
# Poll tick-by-tick: the pulse fires ~100 cycles before send completes.
got = await send_byte_watch(ctx, ord('r'), dut.reinit)
print(f"T6 reinit after 'r': {got}")
if not got:
errors.append("T6 reinit: pulse not seen")
# T7: send unknown byte → no reinit
got = await send_byte_watch(ctx, ord('x'), dut.reinit)
print(f"T7 reinit after 'x': {got} (want 0)")
if got:
errors.append("T7 spurious reinit on 'x'")
sim = Simulator(dut)
sim.add_clock(Period(MHz=24))
sim.add_testbench(testbench)
with sim.write_vcd("UARTConsole.vcd"):
sim.run()
if errors:
print("\nFAILURES:")
for e in errors:
print(" ", e)
sys.exit(1)
else:
print("\nAll UARTConsole tests passed.")