Files
2026-06-13 18:35:38 +02:00

183 lines
6.0 KiB
Python

from amaranth import *
from amaranth.sim import Simulator
def bin_to_gray(x):
return x ^ (x >> 1)
def gray_to_bin(g, width):
# convert gray to binary iteratively
b = 0
for i in range(width - 1, -1, -1):
if i == width - 1:
b |= ((g >> i) & 1) << i
else:
b |= (((b >> (i + 1)) & 1) ^ ((g >> i) & 1)) << i
return b
class AsyncFIFO(Elaboratable):
"""Parameterizable gray-pointer dual-clock FIFO.
- width: data width in bits
- depth: must be a power of two
- wdomain: write (source) domain name
- rdomain: read (destination) domain name
"""
def __init__(self, width=1, depth=16, wdomain="src", rdomain="dst"):
assert depth & (depth - 1) == 0
self.width = width
self.depth = depth
self.aw = (depth - 1).bit_length() # address width
self.wdomain = wdomain
self.rdomain = rdomain
# write-side interface
self.wdata = Signal(width)
self.w_en = Signal()
self.w_full = Signal()
# read-side interface
self.rdata = Signal(width)
self.r_en = Signal()
self.r_valid = Signal()
self.r_empty = Signal()
def elaborate(self, platform):
m = Module()
mem = Memory(width=self.width, depth=self.depth)
wp = mem.write_port(domain=self.wdomain)
rp = mem.read_port(domain=self.rdomain, transparent=False)
m.submodules += wp, rp
# pointers are AW+1 bits (extra MSB for wrap)
wbin = Signal(self.aw + 1)
wgray = Signal(self.aw + 1)
rbin = Signal(self.aw + 1)
rgray = Signal(self.aw + 1)
# synchronized opposing domain gray pointers
rgray_sync0 = Signal(self.aw + 1)
rgray_sync1 = Signal(self.aw + 1)
wgray_sync0 = Signal(self.aw + 1)
wgray_sync1 = Signal(self.aw + 1)
# write domain logic
with m.Domain(self.wdomain):
waddr = Signal(self.aw)
next_wbin = Signal(self.aw + 1)
next_wgray = Signal(self.aw + 1)
# compute next pointer
m.d.comb += next_wbin.eq(wbin + self.w_en)
m.d.comb += next_wgray.eq(next_wbin ^ (next_wbin >> 1))
# synchronize rgray into write domain (two flops per bit)
m.d.comb += []
for i in range(self.aw + 1):
m.d[self.wdomain] += rgray_sync0[i].eq(rgray[i])
m.d[self.wdomain] += rgray_sync1[i].eq(rgray_sync0[i])
# full detection: next_wgray equals rgray_sync with top two bits inverted
if self.aw >= 1:
top = self.aw
msb_cmp = Signal()
low_eq = Signal()
m.d.comb += low_eq.eq(next_wgray[top - 1:0] == rgray_sync1[top - 1:0])
m.d.comb += msb_cmp.eq((next_wgray[top] != rgray_sync1[top]) & (next_wgray[top - 1] != rgray_sync1[top - 1]))
m.d.comb += self.w_full.eq(low_eq & msb_cmp)
else:
# depth==2 special case
m.d.comb += self.w_full.eq(next_wgray != rgray_sync1)
# write to memory when enabled & not full
with m.If(self.w_en & ~self.w_full):
m.d[self.wdomain] += wp.addr.eq(wbin[self.aw - 1:0])
m.d[self.wdomain] += wp.data.eq(self.wdata)
m.d[self.wdomain] += wp.en.eq(1)
m.d[self.wdomain] += wbin.eq(next_wbin)
m.d[self.wdomain] += wgray.eq(next_wgray)
with m.Else():
m.d[self.wdomain] += wp.en.eq(0)
# read domain logic
with m.Domain(self.rdomain):
raddr = Signal(self.aw)
next_rbin = Signal(self.aw + 1)
next_rgray = Signal(self.aw + 1)
# compute next pointer
m.d.comb += next_rbin.eq(rbin + self.r_en)
m.d.comb += next_rgray.eq(next_rbin ^ (next_rbin >> 1))
# synchronize wgray into read domain
for i in range(self.aw + 1):
m.d[self.rdomain] += wgray_sync0[i].eq(wgray[i])
m.d[self.rdomain] += wgray_sync1[i].eq(wgray_sync0[i])
# empty detection
m.d.comb += self.r_empty.eq(rgray == wgray_sync1)
# read when enabled and not empty
with m.If(self.r_en & ~self.r_empty):
m.d[self.rdomain] += rp.addr.eq(rbin[self.aw - 1:0])
m.d[self.rdomain] += rp.en.eq(1)
m.d[self.rdomain] += rbin.eq(next_rbin)
m.d[self.rdomain] += rgray.eq(next_rgray)
m.d[self.rdomain] += self.r_valid.eq(1)
m.d[self.rdomain] += self.rdata.eq(rp.data)
with m.Else():
m.d[self.rdomain] += rp.en.eq(0)
m.d[self.rdomain] += self.r_valid.eq(0)
return m
def _sim_fifo():
top = Module()
fifo = AsyncFIFO(width=1, depth=16, wdomain="src", rdomain="dst")
top.submodules.fifo = fifo
sim = Simulator(top)
sim.add_clock(1e-6, domain="src")
sim.add_clock(1.7e-6, domain="dst")
def writer():
# write a sequence of bits (0..31 repeating pattern)
for i in range(32):
yield fifo.wdata.eq(i & 1)
yield fifo.w_en.eq(1)
yield
yield fifo.w_en.eq(0)
# allow some idle cycles
for _ in range((i % 3)):
yield
def reader():
seen = []
for _ in range(200):
# try to consume if not empty
empty = (yield fifo.r_empty)
if not empty:
yield fifo.r_en.eq(1)
yield
yield fifo.r_en.eq(0)
if (yield fifo.r_valid):
d = (yield fifo.rdata)
seen.append(d)
print(f"read: {d}")
else:
yield
print(f"total read: {len(seen)}")
sim.add_sync_process(writer, domain="src")
sim.add_sync_process(reader, domain="dst")
sim.run()
if __name__ == "__main__":
_sim_fifo()