Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions canopen/pdo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import itertools
import logging
from collections.abc import Iterator, Mapping
from typing import Union

from canopen import node
from canopen.pdo.base import PdoBase, PdoMap, PdoMaps, PdoVariable
Expand All @@ -17,6 +20,35 @@
logger = logging.getLogger(__name__)


class _CombinedPdoMaps(Mapping[int, PdoMap]):
"""Combine RPDO and TPDO :class:`PdoMaps` without dummy zero offsets.

Avoids ``PdoMaps(0, 0, …)`` where ``__getitem__`` fallbacks would alias
``key`` to ``key + 1`` (see discussion on PR #613).
"""

def __init__(self, rx: PdoMaps, tx: PdoMaps):
self.rx = rx
self.tx = tx

def __getitem__(self, key: int) -> PdoMap:
for maps in (self.rx, self.tx):
try:
return maps[key]
except KeyError:
continue
raise KeyError(key)

def __iter__(self) -> Iterator[int]:
return itertools.chain(
(self.rx.map_offset + i - 1 for i in self.rx),
(self.tx.map_offset + i - 1 for i in self.tx),
)

def __len__(self) -> int:
return len(self.rx) + len(self.tx)


class PDO(PdoBase):
"""PDO Class for backwards compatibility.

Expand All @@ -28,19 +60,30 @@ def __init__(self, node, rpdo, tpdo):
super(PDO, self).__init__(node)
self.rx = rpdo.map
self.tx = tpdo.map

self.map = {}
# the object 0x1A00 equals to key '1' so we remove 1 from the key
for key, value in self.rx.items():
self.map[0x1A00 + (key - 1)] = value
for key, value in self.tx.items():
self.map[0x1600 + (key - 1)] = value
self.map = _CombinedPdoMaps(self.rx, self.tx)

def __getitem__(self, key: Union[int, str]):
if isinstance(key, int):
if key == 0:
raise KeyError("PDO index zero requested for 1-based sequence")
if 0 < key <= 512:
return self.map[key]
if 0x1400 <= key <= 0x17FF:
return self.rx[key]
if 0x1800 <= key <= 0x1BFF:
return self.tx[key]
for pdo_map in self.map.values():
try:
return pdo_map[key]
except KeyError:
continue
raise KeyError(f"PDO: {key} was not found in any map")


class RPDO(PdoBase):
"""Receive PDO to transfer data from somewhere to the represented node.

Properties 0x1400 to 0x1403 | Mapping 0x1600 to 0x1603.
Properties 0x1400 to 0x15FF | Mapping 0x1600 to 0x17FF.
:param object node: Parent node for this object.
"""

Expand All @@ -65,7 +108,7 @@ def stop(self):
class TPDO(PdoBase):
"""Transmit PDO to broadcast data from the represented node to the network.

Properties 0x1800 to 0x1803 | Mapping 0x1A00 to 0x1A03.
Properties 0x1800 to 0x19FF | Mapping 0x1A00 to 0x1BFF.
:param object node: Parent node for this object.
"""

Expand Down
17 changes: 13 additions & 4 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import binascii
import contextlib
import logging
import math
import threading
Expand Down Expand Up @@ -33,7 +34,7 @@ class PdoBase(Mapping):

def __init__(self, node: Union[LocalNode, RemoteNode]):
self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK
self.map: Optional[PdoMaps] = None
self.map: Optional[Union[PdoMaps, Mapping[int, PdoMap]]] = None
self.node: Union[LocalNode, RemoteNode] = node

def __iter__(self):
Expand All @@ -45,8 +46,7 @@ def __getitem__(self, key: Union[int, str]):
raise KeyError("PDO index zero requested for 1-based sequence")
if (
0 < key <= 512 # By PDO Index
or 0x1600 <= key <= 0x17FF # By RPDO ID (512)
or 0x1A00 <= key <= 0x1BFF # By TPDO ID (512)
or 0x1400 <= key <= 0x1BFF # RPDO/TPDO communication or mapping record
):
return self.map[key]
for pdo_map in self.map.values():
Expand Down Expand Up @@ -155,6 +155,8 @@ def __init__(self, com_offset, map_offset, pdo_node: PdoBase, cob_base=None):
:param cob_base:
"""
self.maps: dict[int, PdoMap] = {}
self.com_offset = com_offset
self.map_offset = map_offset
for map_no in range(512):
if com_offset + map_no in pdo_node.node.object_dictionary:
new_map = PdoMap(
Expand All @@ -167,7 +169,14 @@ def __init__(self, com_offset, map_offset, pdo_node: PdoBase, cob_base=None):
self.maps[map_no + 1] = new_map

def __getitem__(self, key: int) -> PdoMap:
return self.maps[key]
try:
return self.maps[key]
except KeyError:
with contextlib.suppress(KeyError):
return self.maps[key + 1 - self.map_offset]
with contextlib.suppress(KeyError):
return self.maps[key + 1 - self.com_offset]
raise KeyError(key)

def __iter__(self) -> Iterator[int]:
return iter(self.maps)
Expand Down
22 changes: 19 additions & 3 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ def test_pdo_getitem(self):
self.assertEqual(node.tpdo[1]['BOOLEAN value'].raw, False)
self.assertEqual(node.tpdo[1]['BOOLEAN value 2'].raw, True)

# Test different types of access
by_mapping_record = node.pdo[0x1600]
# Test different types of access (0x1A00 = TPDO mapping per CiA 301)
by_mapping_record = node.pdo[0x1A00]
self.assertIsInstance(by_mapping_record, canopen.pdo.PdoMap)
self.assertEqual(by_mapping_record['INTEGER16 value'].raw, -3)
self.assertIs(node.tpdo[0x1A00], by_mapping_record)
self.assertIs(node.tpdo[0x1800], by_mapping_record)
self.assertIs(node.pdo[0x1800], by_mapping_record)
by_object_name = node.pdo['INTEGER16 value']
self.assertIsInstance(by_object_name, canopen.pdo.PdoVariable)
self.assertIs(by_object_name.od, node.object_dictionary['INTEGER16 value'])
Expand All @@ -68,14 +71,27 @@ def test_pdo_getitem(self):
self.assertEqual(by_object_index.raw, 0xf)
self.assertIs(node.pdo['0x2002'], by_object_index)
self.assertIs(node.tpdo[0x2002], by_object_index)
self.assertIs(node.pdo[0x1600][0x2002], by_object_index)
self.assertIs(node.pdo[0x1A00][0x2002], by_object_index)

self.assertRaises(KeyError, lambda: node.pdo[0])
self.assertRaises(KeyError, lambda: node.tpdo[0])
self.assertRaises(KeyError, lambda: node.pdo['DOES NOT EXIST'])
self.assertRaises(KeyError, lambda: node.pdo[0x1BFF])
self.assertRaises(KeyError, lambda: node.tpdo[0x1BFF])

def test_pdo_iterate(self):
node = self.node
pdo_iter = iter(node.pdo.items())
prev = 0
for rpdo, (index, pdo) in zip(node.rpdo.values(), pdo_iter):
self.assertIs(rpdo, pdo)
self.assertGreater(index, prev)
prev = index
for tpdo, (index, pdo) in zip(node.tpdo.values(), pdo_iter):
self.assertIs(tpdo, pdo)
self.assertGreater(index, prev)
prev = index

def test_pdo_maps_iterate(self):
node = self.node
self.assertEqual(len(node.pdo), sum(1 for _ in node.pdo))
Expand Down