Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BusABC.recv: keep calling _recv_internal until it returns None #1686

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

malsyned
Copy link

Even if recv() is called with timeout=0, the caller's intention is probably for recv() to check all of the messages that have already arrived at the interface until one of them matches the filters.

This is already the way recv() behaves for interface drivers that take advantage of hardware or OS-level filtering, but those that use BusABC's default software-based filtering might return None even if a matching message has already arrived.

@malsyned malsyned marked this pull request as draft October 27, 2023 16:19
@malsyned malsyned force-pushed the recv-always-keep-reading-on-filter-match-failure branch from fc7e8c2 to 5f174f5 Compare October 27, 2023 16:33
@malsyned
Copy link
Author

Sorry, this PR is still buggy. I'm going to build out the unit tests a little bit to test recv() more thoroughly and push an updated version in a little while.

Even if recv() is called with timeout=0, the caller's intention is
probably for recv() to check all of the messages that have already
arrived at the interface until one of them matches the filters.

This is already the way recv() behaves for interface drivers that take
advantage of hardware or OS-level filtering, but those that use BusABC's
default software-based filtering might return None even if a matching
message has already arrived.
@malsyned malsyned force-pushed the recv-always-keep-reading-on-filter-match-failure branch from 5f174f5 to 548a200 Compare October 27, 2023 17:55
@malsyned
Copy link
Author

This PR should be good to go now. It simplifies the logic in recv() by leveraging the fact that _recv_internal() already takes care of determining whether an rx timeout has occurred and returns None if it has. Therefore recv() doesn't ever need to check whether time_left > 0 or not, only whether _recv_internal() has returned None or not. Eventually, _recv_internal() will get called with timeout=0, and if there are no messages already in the underlying interface's receive buffer, recv() will return None.

@malsyned malsyned marked this pull request as ready for review October 27, 2023 18:14
return None
# try again with reduced timeout
if timeout is not None:
time_left = max(0, timeout - (time() - start))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove the timeout check here, it could happen, that you never leave this loop as long as you receive messages, that do not pass the filter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing a corner case, but here's why I thought it was alright to remove the check:

time_left decreases every pass through this loop. Eventually time_left will be 0. At that point, _recv_internal() will not block waiting for new messages, and will only return ones that have already queued up at either the OS or hardware level, which should be a finite queue that clears pretty quickly.

Are you worried about CAN messages arriving at the interface faster than this loop can complete an iteration? I suppose that could theoretically happen, if:

  1. A 1Mbps bus were 100% saturated with only the smallest possible CAN 2.0a frames, meaning a new frame arrived every 47μs
  2. One iteration of this loop consistently took >= 47μs to complete
  3. No received message ever matched any installed filter

I wasn't thinking of that as a realistic concern, because to my mind most applications as a whole would be doomed regardless of how recv() behaves if they don't have enough CPU power to keep up with the incoming flow of messages.

If that's your concern, I actually don't know how to reconcile it with problem I ran into that led me to submit this PR.

This PR was prompted by finding that python-can-isotp fails to send flow-control frames in a timely fashion when filters are installed and some high-frequency messages are being received and filtered out. Its sleep_time() instigates a 50ms sleep between every call to recv(), which interacts poorly with this method as currently written.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern in general is about scenarios like this one:

  1. Message A, which would not pass the filters, arrives
  2. Message B, which would pass the filters, arrives
  3. recv(timeout=0) is called

If the hardware or OS are handling the filtering, then recv() would return Message B. However, if recv() is handling the filtering, then recv() would return None despite the fact that a desired message was available from the interface at the time of the call.

Are you interested in me continuing to pursue fixes for this discrepancy in behavior?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is well worth pursing! Thanks @malsyned

A bit off topic but when I first worked on this (many a year ago) I made some attempt at model checking the recv flow using promela or TLA+ to try satisfy myself around the many corner cases.

If you're curious here is a starting point in promela for a simplified model of this problem:

#define MAX_TIMEOUT 10

typedef message {
    bit valid;
    bit filtered;
}

// We set the buffer size to 1 for demonstration purposes
chan recv_channel = [1] of { message };

inline recv_internal(recv_timeout, msg_out, filtered_out, valid_out) {
    if
    :: recv_channel?msg_out ->
        if
        :: msg_out.filtered -> filtered_out = 1; valid_out = 1;
        :: else -> filtered_out = 0; valid_out = msg_out.valid;
        fi;
    :: recv_timeout > 0 -> recv_timeout = recv_timeout - 1;
    :: recv_timeout == 0 -> filtered_out = 0; valid_out = 0; // Timeout
    fi;
}



inline matches_filters(m, result) {
    // Placeholder for filter logic, modify result variable non-deterministically
    if
    :: result = 1;
    :: result = 0;
    fi;
}

active proctype Receiver() {
    message m;
    bit already_filtered;
    bit is_valid;
    bit filter_result; // Used to receive the result from matches_filters
    int time_left = MAX_TIMEOUT;

    do
    :: time_left > 0 ->
        recv_internal(time_left, m, already_filtered, is_valid);
        matches_filters(m, filter_result); // Call matches_filters inline function
        if
        :: is_valid && (already_filtered || filter_result) -> 
            // Message received and is valid
            assert( already_filtered || filter_result);
            assert(time_left > 0);
            break;
        :: else -> 
            // Message is invalid or no message, update time_left and loop
            time_left = time_left - 1; // Simulate time passing
        fi;
    :: else -> 
        // Timeout
        break;
    od;
}



// Generator that produces specific scenarios for testing
active proctype MessageGenerator() {
    int count = 0;
    message msg;

    // Scenario 1: All messages are filtered
    do :: count < 5 ->
        msg.valid = 1; // All messages are valid
        msg.filtered = 1; // All messages are pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;

    // Scenario 2: All messages are invalid
    do :: count < 10 ->
        msg.valid = 0; // All messages are invalid
        msg.filtered = 0; // No message is pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;
    
    do
    :: count < 15 -> // Let's generate a finite number of messages
        msg.valid = (count % 2); // Alternate between valid and invalid messages
        msg.filtered = (count % 3 == 0); // Every third message is pre-filtered
        recv_channel!msg; // Send the message
        count++;
    od;
}

//ltl liveness {
//    [](MessageGenerator:msg.valid == 1 -> <>Receiver:is_valid == 1)
//}

ltl p_timeout {
    [](Receiver:time_left == 0 -> (Receiver:is_valid == 0))
}

Run with spin e.g. using https://www.cse.chalmers.se/edu/year/2016/course/TDA383_LP1/spin/

@malsyned
Copy link
Author

malsyned commented Nov 2, 2023

The only way I can think of to guarantee both

  1. this function terminates regardless of bus or CPU speed, and
  2. that it will return messages that have been received and processed by the system if they are available

is to call recv_internal() and do the filtering in a separate thread, and for recv() to be a wrapper around Queue.get() instead of _recv_internal(). I don't know what that would do to the performance recv() though -- I could see it going either way.

Is this a strategy you'd be open to merging if I coded it up?

@malsyned
Copy link
Author

Something like this (just a sketch, not ready to call this a complete implementation yet)

diff --git a/can/bus.py b/can/bus.py
index af517e9d..55d8abac 100644
--- a/can/bus.py
+++ b/can/bus.py
@@ -4,6 +4,7 @@

 import contextlib
 import logging
+import queue
 import threading
 from abc import ABC, ABCMeta, abstractmethod
 from enum import Enum, auto
@@ -97,10 +98,33 @@ def __init__(
         """
         self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
         self.set_filters(can_filters)
+        self._recv_queue = queue.Queue(maxsize=100)
+        self._recv_thread = threading.Thread(target=self._recv_task,
+                                             name='CAN rx filtering',
+                                             daemon=True)
+        self._recv_thread.start()

     def __str__(self) -> str:
         return self.channel_info

+    def _recv_task(self):
+        while not self._is_shutdown:
+            try:
+                msg, already_filtered = self._recv_internal(timeout=0.1)
+                if msg and (already_filtered or self._matches_filters(msg)):
+                    self._recv_put(msg)
+            except Exception as ex:
+                self._recv_put(ex)
+
+    def _recv_put(self, *args, **kwargs):
+        while not self._is_shutdown:
+            try:
+                self._recv_queue.put(timeout=0.1, *args, **kwargs)
+            except queue.Full:
+                continue
+            else:
+                break
+
     def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
         """Block waiting for a message from the Bus.

@@ -113,25 +137,14 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
         :raises ~can.exceptions.CanOperationError:
             If an error occurred while reading
         """
-        start = time()
-        time_left = timeout
-
-        while True:
-            # try to get a message
-            msg, already_filtered = self._recv_internal(timeout=time_left)
-
-            # propagate timeouts from _recv_internal()
-            if not msg:
-                return None
-
-            # return it, if it matches
-            if already_filtered or self._matches_filters(msg):
-                LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
-                return msg
-
-            # try again with reduced timeout
-            if timeout is not None:
-                time_left = max(0, timeout - (time() - start))
+        try:
+            msg = self._recv_queue.get(timeout=timeout)
+        except queue.Empty:
+            return None
+        if isinstance(msg, Exception):
+            raise msg
+        LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
+        return msg

     def _recv_internal(
         self, timeout: Optional[float]
@@ -457,6 +470,7 @@ def shutdown(self) -> None:

         self._is_shutdown = True
         self.stop_all_periodic_tasks()
+        self._recv_thread.join()

     def __enter__(self) -> Self:
         return self

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants