from amaranth import * from amaranth.sim import Simulator def bin_to_gray(x): return x ^ (x >> 1) def gray_to_bin(g, width): # convert gray to binary iteratively b = 0 for i in range(width - 1, -1, -1): if i == width - 1: b |= ((g >> i) & 1) << i else: b |= (((b >> (i + 1)) & 1) ^ ((g >> i) & 1)) << i return b class AsyncFIFO(Elaboratable): """Parameterizable gray-pointer dual-clock FIFO. - width: data width in bits - depth: must be a power of two - wdomain: write (source) domain name - rdomain: read (destination) domain name """ def __init__(self, width=1, depth=16, wdomain="src", rdomain="dst"): assert depth & (depth - 1) == 0 self.width = width self.depth = depth self.aw = (depth - 1).bit_length() # address width self.wdomain = wdomain self.rdomain = rdomain # write-side interface self.wdata = Signal(width) self.w_en = Signal() self.w_full = Signal() # read-side interface self.rdata = Signal(width) self.r_en = Signal() self.r_valid = Signal() self.r_empty = Signal() def elaborate(self, platform): m = Module() mem = Memory(width=self.width, depth=self.depth) wp = mem.write_port(domain=self.wdomain) rp = mem.read_port(domain=self.rdomain, transparent=False) m.submodules += wp, rp # pointers are AW+1 bits (extra MSB for wrap) wbin = Signal(self.aw + 1) wgray = Signal(self.aw + 1) rbin = Signal(self.aw + 1) rgray = Signal(self.aw + 1) # synchronized opposing domain gray pointers rgray_sync0 = Signal(self.aw + 1) rgray_sync1 = Signal(self.aw + 1) wgray_sync0 = Signal(self.aw + 1) wgray_sync1 = Signal(self.aw + 1) # write domain logic with m.Domain(self.wdomain): waddr = Signal(self.aw) next_wbin = Signal(self.aw + 1) next_wgray = Signal(self.aw + 1) # compute next pointer m.d.comb += next_wbin.eq(wbin + self.w_en) m.d.comb += next_wgray.eq(next_wbin ^ (next_wbin >> 1)) # synchronize rgray into write domain (two flops per bit) m.d.comb += [] for i in range(self.aw + 1): m.d[self.wdomain] += rgray_sync0[i].eq(rgray[i]) m.d[self.wdomain] += rgray_sync1[i].eq(rgray_sync0[i]) # full detection: next_wgray equals rgray_sync with top two bits inverted if self.aw >= 1: top = self.aw msb_cmp = Signal() low_eq = Signal() m.d.comb += low_eq.eq(next_wgray[top - 1:0] == rgray_sync1[top - 1:0]) m.d.comb += msb_cmp.eq((next_wgray[top] != rgray_sync1[top]) & (next_wgray[top - 1] != rgray_sync1[top - 1])) m.d.comb += self.w_full.eq(low_eq & msb_cmp) else: # depth==2 special case m.d.comb += self.w_full.eq(next_wgray != rgray_sync1) # write to memory when enabled & not full with m.If(self.w_en & ~self.w_full): m.d[self.wdomain] += wp.addr.eq(wbin[self.aw - 1:0]) m.d[self.wdomain] += wp.data.eq(self.wdata) m.d[self.wdomain] += wp.en.eq(1) m.d[self.wdomain] += wbin.eq(next_wbin) m.d[self.wdomain] += wgray.eq(next_wgray) with m.Else(): m.d[self.wdomain] += wp.en.eq(0) # read domain logic with m.Domain(self.rdomain): raddr = Signal(self.aw) next_rbin = Signal(self.aw + 1) next_rgray = Signal(self.aw + 1) # compute next pointer m.d.comb += next_rbin.eq(rbin + self.r_en) m.d.comb += next_rgray.eq(next_rbin ^ (next_rbin >> 1)) # synchronize wgray into read domain for i in range(self.aw + 1): m.d[self.rdomain] += wgray_sync0[i].eq(wgray[i]) m.d[self.rdomain] += wgray_sync1[i].eq(wgray_sync0[i]) # empty detection m.d.comb += self.r_empty.eq(rgray == wgray_sync1) # read when enabled and not empty with m.If(self.r_en & ~self.r_empty): m.d[self.rdomain] += rp.addr.eq(rbin[self.aw - 1:0]) m.d[self.rdomain] += rp.en.eq(1) m.d[self.rdomain] += rbin.eq(next_rbin) m.d[self.rdomain] += rgray.eq(next_rgray) m.d[self.rdomain] += self.r_valid.eq(1) m.d[self.rdomain] += self.rdata.eq(rp.data) with m.Else(): m.d[self.rdomain] += rp.en.eq(0) m.d[self.rdomain] += self.r_valid.eq(0) return m def _sim_fifo(): top = Module() fifo = AsyncFIFO(width=1, depth=16, wdomain="src", rdomain="dst") top.submodules.fifo = fifo sim = Simulator(top) sim.add_clock(1e-6, domain="src") sim.add_clock(1.7e-6, domain="dst") def writer(): # write a sequence of bits (0..31 repeating pattern) for i in range(32): yield fifo.wdata.eq(i & 1) yield fifo.w_en.eq(1) yield yield fifo.w_en.eq(0) # allow some idle cycles for _ in range((i % 3)): yield def reader(): seen = [] for _ in range(200): # try to consume if not empty empty = (yield fifo.r_empty) if not empty: yield fifo.r_en.eq(1) yield yield fifo.r_en.eq(0) if (yield fifo.r_valid): d = (yield fifo.rdata) seen.append(d) print(f"read: {d}") else: yield print(f"total read: {len(seen)}") sim.add_sync_process(writer, domain="src") sim.add_sync_process(reader, domain="dst") sim.run() if __name__ == "__main__": _sim_fifo()