Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BusABC.recv: keep calling _recv_internal until it returns None #1686

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,18 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
# try to get a message
msg, already_filtered = self._recv_internal(timeout=time_left)

# propagate timeouts from _recv_internal()
if not msg:
return None

# return it, if it matches
if msg and (already_filtered or self._matches_filters(msg)):
if already_filtered or self._matches_filters(msg):
LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
return msg

# if not, and timeout is None, try indefinitely
elif timeout is None:
continue

# try next one only if there still is time, and with
# reduced timeout
else:
time_left = timeout - (time() - start)

if time_left > 0:
continue

return None
# try again with reduced timeout
if timeout is not None:
time_left = max(0, timeout - (time() - start))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove the timeout check here, it could happen, that you never leave this loop as long as you receive messages, that do not pass the filter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing a corner case, but here's why I thought it was alright to remove the check:

time_left decreases every pass through this loop. Eventually time_left will be 0. At that point, _recv_internal() will not block waiting for new messages, and will only return ones that have already queued up at either the OS or hardware level, which should be a finite queue that clears pretty quickly.

Are you worried about CAN messages arriving at the interface faster than this loop can complete an iteration? I suppose that could theoretically happen, if:

  1. A 1Mbps bus were 100% saturated with only the smallest possible CAN 2.0a frames, meaning a new frame arrived every 47μs
  2. One iteration of this loop consistently took >= 47μs to complete
  3. No received message ever matched any installed filter

I wasn't thinking of that as a realistic concern, because to my mind most applications as a whole would be doomed regardless of how recv() behaves if they don't have enough CPU power to keep up with the incoming flow of messages.

If that's your concern, I actually don't know how to reconcile it with problem I ran into that led me to submit this PR.

This PR was prompted by finding that python-can-isotp fails to send flow-control frames in a timely fashion when filters are installed and some high-frequency messages are being received and filtered out. Its sleep_time() instigates a 50ms sleep between every call to recv(), which interacts poorly with this method as currently written.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern in general is about scenarios like this one:

  1. Message A, which would not pass the filters, arrives
  2. Message B, which would pass the filters, arrives
  3. recv(timeout=0) is called

If the hardware or OS are handling the filtering, then recv() would return Message B. However, if recv() is handling the filtering, then recv() would return None despite the fact that a desired message was available from the interface at the time of the call.

Are you interested in me continuing to pursue fixes for this discrepancy in behavior?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is well worth pursing! Thanks @malsyned

A bit off topic but when I first worked on this (many a year ago) I made some attempt at model checking the recv flow using promela or TLA+ to try satisfy myself around the many corner cases.

If you're curious here is a starting point in promela for a simplified model of this problem:

#define MAX_TIMEOUT 10

typedef message {
    bit valid;
    bit filtered;
}

// We set the buffer size to 1 for demonstration purposes
chan recv_channel = [1] of { message };

inline recv_internal(recv_timeout, msg_out, filtered_out, valid_out) {
    if
    :: recv_channel?msg_out ->
        if
        :: msg_out.filtered -> filtered_out = 1; valid_out = 1;
        :: else -> filtered_out = 0; valid_out = msg_out.valid;
        fi;
    :: recv_timeout > 0 -> recv_timeout = recv_timeout - 1;
    :: recv_timeout == 0 -> filtered_out = 0; valid_out = 0; // Timeout
    fi;
}



inline matches_filters(m, result) {
    // Placeholder for filter logic, modify result variable non-deterministically
    if
    :: result = 1;
    :: result = 0;
    fi;
}

active proctype Receiver() {
    message m;
    bit already_filtered;
    bit is_valid;
    bit filter_result; // Used to receive the result from matches_filters
    int time_left = MAX_TIMEOUT;

    do
    :: time_left > 0 ->
        recv_internal(time_left, m, already_filtered, is_valid);
        matches_filters(m, filter_result); // Call matches_filters inline function
        if
        :: is_valid && (already_filtered || filter_result) -> 
            // Message received and is valid
            assert( already_filtered || filter_result);
            assert(time_left > 0);
            break;
        :: else -> 
            // Message is invalid or no message, update time_left and loop
            time_left = time_left - 1; // Simulate time passing
        fi;
    :: else -> 
        // Timeout
        break;
    od;
}



// Generator that produces specific scenarios for testing
active proctype MessageGenerator() {
    int count = 0;
    message msg;

    // Scenario 1: All messages are filtered
    do :: count < 5 ->
        msg.valid = 1; // All messages are valid
        msg.filtered = 1; // All messages are pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;

    // Scenario 2: All messages are invalid
    do :: count < 10 ->
        msg.valid = 0; // All messages are invalid
        msg.filtered = 0; // No message is pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;
    
    do
    :: count < 15 -> // Let's generate a finite number of messages
        msg.valid = (count % 2); // Alternate between valid and invalid messages
        msg.filtered = (count % 3 == 0); // Every third message is pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;
}

//ltl liveness {
//    [](MessageGenerator:msg.valid == 1 -> <>Receiver:is_valid == 1)
//}

ltl p_timeout {
    [](Receiver:time_left == 0 -> (Receiver:is_valid == 0))
}

Run with spin e.g. using https://www.cse.chalmers.se/edu/year/2016/course/TDA383_LP1/spin/


def _recv_internal(
self, timeout: Optional[float]
Expand Down
52 changes: 52 additions & 0 deletions test/test_message_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import unittest
from unittest.mock import MagicMock, patch

from can import Bus, Message

Expand Down Expand Up @@ -46,6 +47,57 @@ def test_match_example_message(self):
self.assertFalse(self.bus._matches_filters(EXAMPLE_MSG))
self.assertTrue(self.bus._matches_filters(HIGHEST_MSG))

def test_empty_queue_up_to_match(self):
bus2 = Bus(interface="virtual", channel="testy")
self.bus.set_filters(MATCH_EXAMPLE)
bus2.send(HIGHEST_MSG)
bus2.send(EXAMPLE_MSG)
actual = self.bus.recv(timeout=0)
bus2.shutdown()
self.assertTrue(
EXAMPLE_MSG.equals(
actual, timestamp_delta=None, check_direction=False, check_channel=False
)
)


@patch("can.bus.time")
class TestMessageFilterRetryTiming(unittest.TestCase):
def setUp(self):
self.bus = Bus(interface="virtual", channel="testy")
self.bus._recv_internal = MagicMock(name="_recv_internal")

def tearDown(self):
self.bus.shutdown()

def test_propagate_recv_internal_timeout(self, time: MagicMock) -> None:
self.bus._recv_internal.side_effect = [
(None, False),
]
time.side_effect = [0]
self.bus.set_filters(MATCH_EXAMPLE)
self.assertIsNone(self.bus.recv(timeout=3))

def test_retry_with_adjusted_timeout(self, time: MagicMock) -> None:
self.bus._recv_internal.side_effect = [
(HIGHEST_MSG, False),
(EXAMPLE_MSG, False),
]
time.side_effect = [0, 1]
self.bus.set_filters(MATCH_EXAMPLE)
self.bus.recv(timeout=3)
self.bus._recv_internal.assert_called_with(timeout=2)

def test_keep_timeout_non_negative(self, time: MagicMock) -> None:
self.bus._recv_internal.side_effect = [
(HIGHEST_MSG, False),
(EXAMPLE_MSG, False),
]
time.side_effect = [0, 1]
self.bus.set_filters(MATCH_EXAMPLE)
self.bus.recv(timeout=0.5)
self.bus._recv_internal.assert_called_with(timeout=0)


if __name__ == "__main__":
unittest.main()