Skip to content

Commit

Permalink
Refactored write and included pip/non-pip
Browse files Browse the repository at this point in the history
Signed-off-by: Anderson Ignacio <anderson@aignacio.com>
  • Loading branch information
aignacio committed Oct 14, 2023
1 parent 35e241a commit fd12a9a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 84 deletions.
191 changes: 110 additions & 81 deletions cocotbext/ahb/ahb_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 10.10.2023
# Last Modified Date: 14.10.2023

import cocotb
import logging
Expand Down Expand Up @@ -52,8 +52,12 @@ def _get_def(self, width: int = 1,
"""Return a handle obj with the default value"""
return LogicArray([val for _ in range(width)])

def _convert_size(self, value) -> AHBTrans:
def _convert_size(self, value) -> Union[AHBTrans, LogicArray]:
"""Convert byte size into hsize."""

if isinstance(value, LogicArray):
return value

for hsize in AHBSize:
if (2**hsize.value) == value:
return hsize
Expand All @@ -67,109 +71,134 @@ def _check_size(size: int, data_bus_width: int) -> None:
" provided is larger than the bus width "
"({} B)".format(size, data_bus_width))
elif size <= 0 or (size & (size - 1)) != 0:
raise ValueError("Size must be a positive power of 2")
raise ValueError(f"Error -> {size} - Size must"
"be a positive power of 2")

def _addr_phase(self, addr, size):
def _addr_phase(self, addr: int, size: int, mode: str):
"""Drive the AHB signals of the address phase."""
wr = AHBWrite(0b1) if mode == 'write' else AHBWrite(0b0)
self.bus.haddr.value = addr
self.bus.htrans.value = AHBTrans(0b10)
self.bus.hsize.value = self._convert_size(size)
self.bus.hwrite.value = AHBWrite(0b1)
self.bus.hwrite.value = wr
if self.bus.hsel_exist:
self.bus.hsel.value = 1

def _create_vector(self, vec: Sequence[int],
width: int,
phase: str,
pip: Optional[bool] = False) -> Sequence[int]:
"""Create a list to be driven during address/data phase."""
vec_out = []
if pip:
# Format address/data to send in pipeline
# Address
# | ADDRESS 0 | ADDRESS 1 | ... | default value |
# Data
# | default val | DATA 0 | ... | DATA N |

if phase == 'address_phase':
vec_out = vec
vec_out.append(self._get_def(width, self.def_val))
elif phase == 'data_phase':
vec_out = vec
vec_out.insert(0, self._get_def(width, self.def_val))
else:
# Format address/data to send in non-pipeline
# Address
# | ADDRESS 0 | default value | ADDRESS 1 | default value |
# Data
# | default val | DATA 0 | default value | DATA 1 |

if phase == 'address_phase':
for i in vec:
vec_out.append(i)
vec_out.append(self._get_def(width, self.def_val))
elif phase == 'data_phase':
for i in vec:
vec_out.append(self._get_def(width, self.def_val))
vec_out.append(i)
return vec_out

@cocotb.coroutine
async def _send_txn(self, address: Sequence[int],
value: Sequence[int],
size: Sequence[int],
mode: str = 'write') -> Sequence[AHBResp]:
"""Drives the AHB transaction into the bus."""
response = []
first_txn = True

for index, (txn_addr, txn_data, txn_size) in enumerate(zip(address,
value,
size)):
if index == len(address) - 1:
self._init_bus()
else:
self._addr_phase(txn_addr, txn_size, mode)
if txn_addr != self.def_val:
if not isinstance(txn_addr, LogicArray):
self.log.info(f"AHB write txn:\n"
f"\tADDR = 0x{txn_addr:x}\n"
f"\tDATA = 0x{value[index+1]:x}\n"
f"\tSIZE = {txn_size}")
self.bus.hwdata.value = txn_data
await RisingEdge(self.clk)
timeout_counter = 0
while self.bus.hready.value != 1:
timeout_counter += 1
if timeout_counter == self.timeout:
raise Exception(f'Timeout value of {timeout_counter}'
f' clock cycles has been reached!')
await RisingEdge(self.clk)

if first_txn:
first_txn = False
else:
response += [{'resp': AHBResp(int(self.bus.hresp.value)),
'data': self.bus.hrdata.value}]
self._init_bus()

@cocotb.coroutine
async def write(self, address: Union[int, Sequence[int]],
value: Union[int, Sequence[int]],
size: Optional[int] = None,
size: Optional[Union[int, Sequence[int]]] = None,
pip: Optional[bool] = False) -> Sequence[AHBResp]:
"""Write data in the AHB bus."""

if size is None:
size = [self.bus._data_width // 8 for _ in range(len(address))]
else:
for sz in size:
AHBLiteMaster._check_size(sz, len(self.bus.hwdata) // 8)

# Convert all inputs into lists, if not already
if not isinstance(address, list):
address = [address]
if not isinstance(value, list):
value = [value]

if size is None:
size = self.bus._data_width // 8
else:
AHBLiteMaster._check_size(size, len(self.bus.hwdata) // 8)
if not isinstance(size, list):
size = [size]

# First check if the input sizes are correct
if len(address) != len(value):
raise Exception(f'Address length ({len(address)}) is'
f'different from data length ({len(value)})')

response = []
if not pip:
for txn_addr, txn_data in zip(address, value):
self.log.info(f"AHB write txn:\n"
f"\tADDR = 0x{txn_addr:x}\n"
f"\tDATA = 0x{txn_data:x}\n"
f"\tSIZE = {size}")

self._addr_phase(txn_addr, size)
await RisingEdge(self.clk)
if len(address) != len(size):
raise Exception(f'Address length ({len(address)}) is'
f'different from size length ({len(size)})')

# Address phase
timeout_counter = 0
while self.bus.hready.value != 1:
timeout_counter += 1
if timeout_counter == self.timeout:
raise Exception(f'Timeout value of {timeout_counter}'
f' clock cycles has been reached!')
await RisingEdge(self.clk)
# Need to copy data as we'll have to shift address/value
t_address = copy.deepcopy(address)
t_value = copy.deepcopy(value)
t_size = copy.deepcopy(size)

self._init_bus()
width = len(self.bus.haddr)
t_address = self._create_vector(t_address, width, 'address_phase', pip)
width = len(self.bus.hwdata)
t_value = self._create_vector(t_value, width, 'data_phase', pip)
width = len(self.bus.hsize)
t_size = self._create_vector(t_size, width, 'address_phase', pip)

# Data phase
self.bus.hwdata.value = txn_data
await RisingEdge(self.clk)
timeout_counter = 0
while self.bus.hready.value != 1:
timeout_counter += 1
if timeout_counter == self.timeout:
raise Exception(f'Timeout value of {timeout_counter}'
f' clock cycles has been reached!')
await RisingEdge(self.clk)
response += [{'resp': AHBResp(int(self.bus.hresp.value)),
'data': self.bus.hrdata.value}]
self._init_bus()
else:
# Need to copy data as we'll have to shift address/value
t_address = copy.deepcopy(address)
t_value = copy.deepcopy(value)

t_address.append(self._get_def(len(self.bus.haddr),
self.def_val))
t_value.insert(0, self._get_def(len(self.bus.hwdata),
self.def_val))
first_txn = True
# for txn_addr, txn_data in zip(address, value):
for index, (txn_addr, txn_data) in enumerate(zip(t_address,
t_value)):
if index == len(t_address) - 1:
self._init_bus()
else:
self._addr_phase(txn_addr, size)
self.log.info(f"AHB write txn:\n"
f"\tADDR = 0x{txn_addr:x}\n"
f"\tDATA = 0x{t_value[index+1]:x}\n"
f"\tSIZE = {size}")
self.bus.hwdata.value = txn_data
await RisingEdge(self.clk)
timeout_counter = 0
while self.bus.hready.value != 1:
timeout_counter += 1
if timeout_counter == self.timeout:
raise Exception(f'Timeout value of {timeout_counter}'
f' clock cycles has been reached!')
await RisingEdge(self.clk)

if first_txn:
first_txn = False
else:
response += [{'resp': AHBResp(int(self.bus.hresp.value)),
'data': self.bus.hrdata.value}]
self._init_bus()
return response
return await self._send_txn(t_address, t_value, t_size, 'write')
27 changes: 24 additions & 3 deletions tests/test_ahb_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 10.10.2023
# Last Modified Date: 14.10.2023

import cocotb
import os
Expand All @@ -17,8 +17,21 @@
from cocotbext.ahb import AHBBus, AHBLiteMaster


def rnd_val(bit: int = 0):
return random.randint(0, (2**bit) - 1)
def rnd_val(bit: int = 0, zero: bool = True):
if zero is True:
return random.randint(0, (2**bit) - 1)
else:
return random.randint(1, (2**bit) - 1)


def pick_random_value(input_list):
"""
Pick a random value from the given list.
"""
if input_list:
return random.choice(input_list)
else:
return None # Return None if the list is empty


@cocotb.coroutine
Expand All @@ -42,10 +55,18 @@ async def run_test(dut):

address = [rnd_val(32) for _ in range(200)]
value = [rnd_val(32) for _ in range(200)]
size = [pick_random_value([1, 2, 4]) for _ in range(200)]

resp = await ahb_lite_master.write(address, value, pip=True)
resp = await ahb_lite_master.write(address, value, pip=False)
resp = await ahb_lite_master.write(address, value, pip=True)
resp = await ahb_lite_master.write(address, value, pip=False)

resp = await ahb_lite_master.write(address, value, size, pip=False)
resp = await ahb_lite_master.write(address, value, size, pip=False)
resp = await ahb_lite_master.write(address, value, size, pip=True)
resp = await ahb_lite_master.write(address, value, size, pip=True)

print(resp)


Expand Down

0 comments on commit fd12a9a

Please sign in to comment.