diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b313588..7ab7826 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,6 +4,6 @@ repos: hooks: - id: check-yaml - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 25.12.0 hooks: - id: black diff --git a/docs/conf.py b/docs/conf.py index 5d9a565..3421574 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -55,7 +55,7 @@ # All aren't necessary for the docs themselves autodoc_mock_imports = [ # import a DLL/shared lib and is platform-dependent - "fixate.drivers._ftdi", + "fixate.drivers.ftdi._ftdi", "PyDAQmx", # pulls in platform-dependent libraries "pynput", diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 5c1da9b..0a172c3 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -13,6 +13,7 @@ Major Changes New Features ############ +- FTDI MPSSE I2C functionality that replaces pyftdi with libmpsse. Improvements ############ diff --git a/mypy.ini b/mypy.ini index 757f720..d0e6e61 100644 --- a/mypy.ini +++ b/mypy.ini @@ -67,8 +67,14 @@ exclude = (?x) __init__.py |helper.py ) + |ftdi/ + ( + __init__.py + |_ftdi_mpsse.py + |_ftdi_py.py + |_libmpsse.py + ) |__init__.py - |ftdi.py ) ) ) diff --git a/setup.cfg b/setup.cfg index 394cd09..55f36e0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,7 +39,11 @@ install_requires = [options.packages.find] where = src +include_package_data = True +[options.package_data] +# this is only for windows. We can look to add support for other platforms in the future if necessary. +fixate.drivers.ftdi.libs = libmpsse.dll [options.extras_require] gui = diff --git a/src/fixate/__init__.py b/src/fixate/__init__.py index 959ec02..59d3788 100644 --- a/src/fixate/__init__.py +++ b/src/fixate/__init__.py @@ -32,4 +32,4 @@ from fixate.main import run_main_program as run -__version__ = "0.6.4" +__version__ = "0.6.5" diff --git a/src/fixate/drivers/ftdi.py b/src/fixate/drivers/ftdi/__init__.py similarity index 96% rename from src/fixate/drivers/ftdi.py rename to src/fixate/drivers/ftdi/__init__.py index 8aa2ff3..bfaab59 100644 --- a/src/fixate/drivers/ftdi.py +++ b/src/fixate/drivers/ftdi/__init__.py @@ -8,7 +8,7 @@ from fixate.core.common import bits from fixate.core.exceptions import FixateError, InstrumentNotConnected -from fixate.drivers._ftdi import ftdI2xx +from fixate.drivers.ftdi._ftdi import ftdI2xx # Definitions UCHAR = ctypes.c_ubyte @@ -448,3 +448,13 @@ def open(ftdi_description="") -> FTDI2xx: raise InstrumentNotConnected( f"No valid ftdi found by description '{ftdi_description}'" ) + + +from fixate.drivers.ftdi._ftdi_mpsse import ( # noqa: I001 - we don't want these imports to be split up + I2CTransferOptions as I2CTransferOptions, + I2CClockRate as I2CClockRate, + I2CChannelConfig as I2CChannelConfig, + MpsseI2C as MpsseI2C, + MpsseI2CSimpleInterface as MpsseI2CSimpleInterface, + open as open_mpsse, # explicitly named to avoid conflict with open() at ftdi level. # noqa: F401 +) diff --git a/src/fixate/drivers/_ftdi.py b/src/fixate/drivers/ftdi/_ftdi.py similarity index 100% rename from src/fixate/drivers/_ftdi.py rename to src/fixate/drivers/ftdi/_ftdi.py diff --git a/src/fixate/drivers/ftdi/_ftdi_mpsse.py b/src/fixate/drivers/ftdi/_ftdi_mpsse.py new file mode 100644 index 0000000..61966db --- /dev/null +++ b/src/fixate/drivers/ftdi/_ftdi_mpsse.py @@ -0,0 +1,566 @@ +import ctypes +import logging +from collections.abc import Collection +from enum import IntEnum, IntFlag, StrEnum, unique +from typing import Callable, TypeVar + +from fixate.core.exceptions import FixateError, InstrumentNotConnected +from fixate.drivers import log_instrument_open +from fixate.drivers.ftdi import FT_HANDLE, FTD2XXError, check_return +from fixate.drivers.ftdi._libmpsse import libmpsse + +# For more information see https://ftdichip.com/wp-content/uploads/2020/08/AN_177_User_Guide_For_LibMPSSE-I2C-1.pdf +# Additionally, the source code for libMPSSE is available as part of this download: https://ftdichip.com/wp-content/uploads/2025/08/libmpsse-windows-1.0.8.zip +# This source code has been edited to include an method to open by description, which didn't come in the box. + +DWORD = ctypes.c_ulong +UCHAR = ctypes.c_ubyte +USHORT = ctypes.c_ushort +LPDWORD = ctypes.POINTER(DWORD) +PCHAR = ctypes.c_char_p + +logger = logging.getLogger(__name__) + + +class I2CError(FixateError): + """Base class for I2C errors.""" + + pass + + +class SPIError(FixateError): + """Base class for SPI errors.""" + + pass + + +class Protocol(StrEnum): + I2C = "i2c" + SPI = "spi" + # TODO - add more protocols as needed + + +@unique +class I2CTransferOptions(IntFlag): + START_BIT = 0x01 + STOP_BIT = 0x02 + BREAK_ON_NACK = 0x04 + NACK_LAST_BYTE = 0x08 + FAST_TRANSFER_BYTES = 0x10 + FAST_TRANSFER_BITS = 0x20 + NO_ADDRESS = 0x40 + + +@unique +class I2CClockRate(IntEnum): + STANDARD_MODE = 100000 + FAST_MODE = 400000 + FAST_MODE_PLUS = 1000000 + HIGH_SPEED_MODE = 3400000 + + +@unique +class I2COptions(IntFlag): + DISABLE_3PHASE_CLOCKING = 0x01 + ENABLE_DRIVE_ONLY_ZERO = 0x02 + # This option is not documented in the user guide, but is mentioned in the source code. + ENABLE_PIN_STATE_CONFIG = 0x10 + # Bits 4 - 31 are reserved + + +class I2CChannelConfig(ctypes.Structure): + _fields_ = [ + ("ClockRate", DWORD), + ("LatencyTimer", UCHAR), + ("Options", DWORD), + ("Pin", DWORD), + ("currentPinState", USHORT), + ] + + +class Mpsse: + """ + Base class for MPSSE drivers. This class should not be instantiated directly, but should be derived from for specific protocols. + Derived classes should implement protocol-specific functionality, but can rely on the base class for connection management and other common functionality. + """ + + INSTR_TYPE = "FTDI" + REGEX_ID = "" # this is only here to 'satisfy' the DriverProtocol interface + + def __init__(self, ftdi_description: str): + self.ftdi_description = ftdi_description + self._handle = FT_HANDLE() + + def get_identity(self) -> str: + """Return identity string representing connected ftdi object""" + return self.ftdi_description + + +class MpsseI2C(Mpsse): + def __init__(self, ftdi_description: str, retries: int = 3): + super().__init__(ftdi_description) + self._connect() + self._retries = retries + + def _connect(self): + check_return( + libmpsse.I2C_OpenChannelByDescription( + self.ftdi_description.encode("utf-8"), ctypes.byref(self._handle) + ) + ) + + def configure( + self, config: I2CChannelConfig | None = None, options: I2COptions | None = None + ): + if config is None: + config = I2CChannelConfig( + ClockRate=I2CClockRate.STANDARD_MODE, # standard 100 kHz I2C clock rate, this is the default speed used by pyftdi. + LatencyTimer=16, + Options=options.value if options is not None else 0, + Pin=0, + currentPinState=0, + ) + check_return(libmpsse.I2C_InitChannel(self._handle, ctypes.byref(config))) + + def read(self, address: int, length: int, options: I2CTransferOptions) -> bytes: + """Read data from an I2C device. + + Args: + address: The 7-bit I2C address of the device to read from. + length: The number of bytes to read. + options: Transfer options for the read operation. See I2CTransferOptions for more information. + + Returns: + The data read from the I2C device. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + FT_IO_ERROR will occur if the device does not transfer the expected number of bytes. + FT_DEVICE_NOT_FOUND will occur if the device does not respond. + """ + # libmpsse handles the conversion of the address and read/write bit, so we just need to pass the 7-bit address. + addr = UCHAR(address) + buffer = (UCHAR * length)() + bytes_read = DWORD() + # the pyftdi library has in-built retry logic, it's not known if retries are done frequently in our usage, + # but we'll do a similar thing here. + for attempt in range(self._retries): + try: + check_return( + libmpsse.I2C_DeviceRead( + self._handle, + addr, + length, + ctypes.byref(buffer), + ctypes.byref(bytes_read), + options.value, + ) + ) + # break out to return. Having the return here upsets pylance. + break + except FTD2XXError as e: + if e.args[0] == "FT_IO_ERROR": + # this is a retriable error + if attempt < self._retries - 1: + logger.warning( + f"Attempt {attempt + 1} failed with error {e.args[0]}. Retrying..." + ) + continue + raise I2CError( + f"Expected to read {length} bytes, but only read {bytes_read.value} bytes." + ) from e + elif e.args[0] == "FT_DEVICE_NOT_FOUND": + raise I2CError( + f"Device with address {address:#02x} not found." + ) from e + else: + # Something else happened that isn't documented by the libmpsse library. + raise I2CError( + f"An unexpected error occurred while reading from device with address {address:#02x}." + ) from e + + return bytes(buffer[: bytes_read.value]) + + def write( + self, + address: int, + data: bytes | bytearray | Collection[int], + options: I2CTransferOptions, + ): + """Write data to an I2C device. + + Args: + address: The 7-bit I2C address of the device to write to. + data: The data to write to the device. + options: Transfer options for the write operation. See I2CTransferOptions for more information. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + FT_IO_ERROR will occur if the device does not transfer the expected number of bytes. + FT_DEVICE_NOT_FOUND will occur if the device does not respond. + FT_FAILED_TO_WRITE_DEVICE will occur if the device nACKs a byte and the BREAK_ON_NACK option is specified. + """ + # libmpsse handles the conversion of the address and read/write bit, so we just need to pass the 7-bit address. + addr = UCHAR(address) + buffer = (UCHAR * len(data))(*data) + bytes_written = DWORD() + # the pyftdi library has in-built retry logic, it's not known if retries are done frequently in our usage, + # but we'll do a similar thing here. + for attempt in range(self._retries): + try: + check_return( + libmpsse.I2C_DeviceWrite( + self._handle, + addr, + len(buffer), + ctypes.byref(buffer), + ctypes.byref(bytes_written), + options.value, + ) + ) + return + except FTD2XXError as e: + if e.args[0] in ["FT_IO_ERROR", "FT_FAILED_TO_WRITE_DEVICE"]: + # these are retriable errors + if attempt < self._retries - 1: + logger.warning( + f"Attempt {attempt + 1} failed with error {e.args[0]}. Retrying..." + ) + continue + raise I2CError( + f"Expected to write {len(data)} bytes, but only wrote {bytes_written.value} bytes." + ) from e + elif e.args[0] == "FT_DEVICE_NOT_FOUND": + raise I2CError( + f"Device with address {address:#02x} not found." + ) from e + else: + # Something else happened that isn't documented by the libmpsse library. + raise I2CError( + f"An unexpected error occurred while writing to device with address {address:#02x}." + ) from e + + def exchange( + self, + address: int, + data: bytes | bytearray | Collection[int], + write_options: I2CTransferOptions, + read_length: int, + read_options: I2CTransferOptions, + ) -> bytes: + """Write data to an I2C device, then read data from the device with a repeated start. + + Args: + address: The 7-bit I2C address of the device to write to and read from. + data: The data to write to the device before reading. E.g. a register address to read from. + read_length: The number of bytes to read from the device after writing. + write_options: Transfer options for the write operation. See I2CTransferOptions for more information. + read_options: Transfer options for the read operation. See I2CTransferOptions for more information. + + Returns: + The data read from the I2C device after writing. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + addr = UCHAR(address) + write_buffer = (UCHAR * len(data))(*data) + bytes_written = DWORD() + read_buffer = (UCHAR * read_length)() + bytes_read = DWORD() + # can't use this class's read and write methods since they have retry logic that we don't want to use here, + # so we need to call the underlying library methods directly. + for attempt in range(self._retries): + try: + check_return( + libmpsse.I2C_DeviceWrite( + self._handle, + addr, + len(write_buffer), + ctypes.byref(write_buffer), + ctypes.byref(bytes_written), + write_options.value, + ) + ) + check_return( + libmpsse.I2C_DeviceRead( + self._handle, + addr, + read_length, + ctypes.byref(read_buffer), + ctypes.byref(bytes_read), + read_options.value, + ) + ) + # break out to return. Having the return here upsets pylance. + break + + except FTD2XXError as e: + if e.args[0] in ["FT_IO_ERROR", "FT_FAILED_TO_WRITE_DEVICE"]: + # these are retriable errors + if attempt < self._retries - 1: + logger.warning( + f"Attempt {attempt + 1} failed with error {e.args[0]}. Retrying..." + ) + continue + raise I2CError( + f"Expected to write {len(data)} bytes and read {read_length} bytes, but wrote {len(data)} bytes and read {len(read_buffer)} bytes." + ) from e + elif e.args[0] == "FT_DEVICE_NOT_FOUND": + raise I2CError( + f"Device with address {address:#02x} not found." + ) from e + else: + # Something else happened that isn't documented by the libmpsse library. + raise I2CError( + f"An unexpected error occurred while exchanging data with device with address {address:#02x}." + ) from e + + return bytes(read_buffer[: bytes_read.value]) + + def write_gpio(self, direction: int, pin_values: int): + """Set the state of the GPIO pins. + + Args: + direction: Direction of the GPIO pins. 1 for output, 0 for input. + pin_values: Values of the GPIO pins. For output pins, 1 for high, 0 for low. For input pins, this value is ignored. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + _dir = UCHAR(direction) + _values = UCHAR(pin_values) + check_return(libmpsse.I2C_WriteGPIO(self._handle, _dir, _values)) + + def read_gpio(self) -> int: + """Read the state of the GPIO pins. + + Returns: + The state of the GPIO pins. For output pins, 1 for high, 0 for low. For input pins, 1 for high, 0 for low. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + _values = UCHAR() + check_return(libmpsse.I2C_ReadGPIO(self._handle, ctypes.byref(_values))) + return _values.value + + def get_simple_interface(self, address: int) -> "MpsseI2CSimpleInterface": + """Get a simple interface to the I2C device similar to the port concept used by pyftdi. This is not intended to be a + full-featured interface, but can be useful for simple use cases where the full flexibility of the underlying library is not needed. + + Returns: + An instance of MpsseI2CSimpleInterface that provides a simplified interface to the I2C device. + """ + + return MpsseI2CSimpleInterface(self, address) + + def close(self): + check_return(libmpsse.I2C_CloseChannel(self._handle)) + self._handle = FT_HANDLE() # reset handle to default value + + +class MpsseI2CSimpleInterface: + """A simple interface to an I2C device that provides basic read and write functionality without requiring the user to specify transfer options or other parameters. + This is intended to be used for simple use cases where the full flexibility of the underlying library is not needed. Similar to the pyftdi library. + """ + + def __init__(self, main_interface: MpsseI2C, address: int): + self._main_interface = main_interface + self._address = address + + def read(self, length: int, start: bool = True, stop: bool = True) -> bytes: + """Read data from the I2C device. + + This method uses default transfer options that should be suitable for most use cases. If you need more control over the transfer, you can use the read method of the main interface MpsseI2C directly. + + Args: + length: The number of bytes to read. + start: Whether to send a start bit before the read operation. Default is True. + stop: Whether to send a stop bit after the read operation. Default is True. + + Returns: + The data read from the I2C device. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + # default will be to NACK the last byte, which is a common convention for I2C reads + # and what is done in the pyftdi library. + options = I2CTransferOptions.NACK_LAST_BYTE + if start: + options |= I2CTransferOptions.START_BIT + if stop: + options |= I2CTransferOptions.STOP_BIT + + return self._main_interface.read(self._address, length, options=options) + + def read_from( + self, + register: int, + length: int, + start: bool = True, + stop: bool = True, + repeated_start: bool = True, + ) -> bytes: + """Read data from a specific register of the I2C device. + + This method uses default transfer options that should be suitable for most use cases. If you need more control over the transfer, you can use the exchange method of the main interface MpsseI2C directly. + + Args: + register: The register address to read from. + length: The number of bytes to read. + start: Whether to send a start bit before the write operation. Default is True. + stop: Whether to send a stop bit after the read operation. Default is True. + repeated_start: Whether to send a repeated start bit between the write and read operations. Default is False. + + Returns: + The data read from the specified register of the I2C device. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + # default will be to break on NACK + write_options = I2CTransferOptions.BREAK_ON_NACK + # default will be to NACK the last byte, which is a common convention for I2C reads + # and what is done in the pyftdi library. + read_options = I2CTransferOptions.NACK_LAST_BYTE + + if start: + write_options |= I2CTransferOptions.START_BIT + if stop: + read_options |= I2CTransferOptions.STOP_BIT + if repeated_start: + read_options |= I2CTransferOptions.START_BIT + + return self._main_interface.exchange( + self._address, + bytes([register]), + write_options=write_options, + read_length=length, + read_options=read_options, + ) + + def write( + self, + data: bytes | bytearray | Collection[int], + start: bool = True, + stop: bool = True, + ): + """Write data to the I2C device. + + This method uses default transfer options that should be suitable for most use cases. If you need more control over the transfer, you can use the write method of the main interface MpsseI2C directly. + + Args: + data: The data to write to the I2C device. + start: Whether to send a start bit before the write operation. Default is True. + stop: Whether to send a stop bit after the write operation. Default is True. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + # default will be to break on NACK. + write_options = I2CTransferOptions(I2CTransferOptions.BREAK_ON_NACK) + + if start: + write_options |= I2CTransferOptions.START_BIT + if stop: + write_options |= I2CTransferOptions.STOP_BIT + + return self._main_interface.write(self._address, data, options=write_options) + + def write_to( + self, + register: int, + data: bytes | bytearray | Collection[int], + start: bool = True, + stop: bool = True, + ): + """Write data to a specific register of the I2C device. + + This method uses default transfer options that should be suitable for most use cases. If you need more control over the transfer, you can use the exchange method of the main interface MpsseI2C directly. + + Args: + register: The register address to write to. + data: The data to write to the specified register of the I2C device. + start: Whether to send a start bit before the write operation. Default is True. + stop: Whether to send a stop bit after the write operation. Default is True. + + Raises: + I2CError: Via FTD2XXError in check_return if the underlying library call fails. + """ + # TODO - might need to consider the possibility of the register being multiple bytes, but for now assume it's just one byte. + # default will be to break on NACK. + write_options = I2CTransferOptions(I2CTransferOptions.BREAK_ON_NACK) + + if start: + write_options |= I2CTransferOptions.START_BIT + if stop: + write_options |= I2CTransferOptions.STOP_BIT + + return self._main_interface.write( + self._address, bytes([register]) + bytes(data), options=write_options + ) + + # TODO - add more functionality as needed, e.g. POLLING, GPIO control, clock stretching, etc. + + +@unique +class SPITransferOptions(IntEnum): + # TODO - complete me + def __init__(self, value): + raise NotImplementedError("SPI support not yet implemented.") + + +class MpsseSPI(Mpsse): + INSTR_TYPE = "FTDI" + + # TODO - complete me + def __init__(self, ftdi_description: str): + raise NotImplementedError("SPI support not yet implemented.") + + +MPSSE_TYPE = TypeVar("MPSSE_TYPE", bound=Mpsse) + + +def open[MPSSE_TYPE]( + interface: Callable[[str], MPSSE_TYPE], ftdi_description: str +) -> MPSSE_TYPE: + """Open an MPSSE device with the given class/type and description. + + Args: + interface: The MPSSE class to instantiate. This determines which MPSSE class to use. + ftdi_description: The description of the device to open. This is the "Description" field from the D2XX API (aka). + + Returns: + An instance of the appropriate MPSSE class for the given class/type, for the device corresponding to the given description. + + Raises: + InstrumentNotConnected: If no device with the given description is found. + ValueError: If an unsupported protocol is specified. + """ + + try: + driver = interface(ftdi_description) + except FTD2XXError: + raise InstrumentNotConnected( + f"FTDI device with description '{ftdi_description}' not found." + ) + + # ignore the below for now, it can be fixed as part of a future mypy targeted branch. + log_instrument_open(driver) # type: ignore + return driver + + +def lib_versions() -> tuple[int, int]: + """ + Get the versions of the libMPSSE and libftdi libraries. + Returns: + A tuple containing the libMPSSE version and the libftdi version, both as integers in the format 0xAABBCCDD where AA is the major version, BB is the minor version, CC is the patch version, and DD is the build number. + """ + mpsse_version = DWORD(0) + libftdi_version = DWORD(0) + + libmpsse.Ver_libMPSSE(ctypes.byref(mpsse_version), ctypes.byref(libftdi_version)) + + return mpsse_version.value, libftdi_version.value diff --git a/src/fixate/drivers/ftdi/_libmpsse.py b/src/fixate/drivers/ftdi/_libmpsse.py new file mode 100644 index 0000000..1668d2b --- /dev/null +++ b/src/fixate/drivers/ftdi/_libmpsse.py @@ -0,0 +1,26 @@ +""" Private wrapper for ftdi libmpsse driver. DLL on Windows, .so shared library on *nix. +This is wrapped privately so it can be ommitted from the documentation build. +""" + +import ctypes +import sys +from importlib import resources + +if sys.platform == "win32": + try: + with resources.path("fixate.drivers.ftdi.libs", "libmpsse.dll") as lib_path: + libmpsse = ctypes.WinDLL(lib_path) + except Exception as e: + raise ImportError( + "Unable to find libmpsse.dll.\nThis should have been included in the fixate package installation." + ) from e + +else: + try: + # this won't work at this stage since the .so file isn't included in the package yet. + with resources.path("fixate.drivers.ftdi.libs", "libmpsse.so") as lib_path: + libmpsse = ctypes.cdll.LoadLibrary(lib_path) + except Exception as e: + raise ImportError( + "Unable to find libmpsse.so.\nThis should have been included in the fixate package installation." + ) from e diff --git a/src/fixate/drivers/ftdi/libs/libmpsse.dll b/src/fixate/drivers/ftdi/libs/libmpsse.dll new file mode 100644 index 0000000..042bf60 Binary files /dev/null and b/src/fixate/drivers/ftdi/libs/libmpsse.dll differ diff --git a/tox.ini b/tox.ini index 8b3d4bd..8a4b57f 100644 --- a/tox.ini +++ b/tox.ini @@ -16,7 +16,7 @@ commands = [testenv:black] basepython = python3 skip_install = true -deps = black==22.3.0 +deps = black==25.12.0 commands = black --check src test scripts {posargs} [pytest] @@ -30,7 +30,7 @@ markers = [testenv:mypy] basepython = python3 -deps = mypy==1.10.0 +deps = mypy==1.19.1 # mypy gives different results if you actually install the stuff before you check it # separate cache to stop weirdness around sharing cache with other instances of mypy commands = mypy --cache-dir="{envdir}/mypy_cache" --config-file=mypy.ini