-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BusABC.recv: keep calling _recv_internal until it returns None #1686
base: main
Are you sure you want to change the base?
BusABC.recv: keep calling _recv_internal until it returns None #1686
Conversation
fc7e8c2
to
5f174f5
Compare
Sorry, this PR is still buggy. I'm going to build out the unit tests a little bit to test |
Even if recv() is called with timeout=0, the caller's intention is probably for recv() to check all of the messages that have already arrived at the interface until one of them matches the filters. This is already the way recv() behaves for interface drivers that take advantage of hardware or OS-level filtering, but those that use BusABC's default software-based filtering might return None even if a matching message has already arrived.
5f174f5
to
548a200
Compare
This PR should be good to go now. It simplifies the logic in |
return None | ||
# try again with reduced timeout | ||
if timeout is not None: | ||
time_left = max(0, timeout - (time() - start)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you remove the timeout check here, it could happen, that you never leave this loop as long as you receive messages, that do not pass the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing a corner case, but here's why I thought it was alright to remove the check:
time_left
decreases every pass through this loop. Eventually time_left
will be 0. At that point, _recv_internal()
will not block waiting for new messages, and will only return ones that have already queued up at either the OS or hardware level, which should be a finite queue that clears pretty quickly.
Are you worried about CAN messages arriving at the interface faster than this loop can complete an iteration? I suppose that could theoretically happen, if:
- A 1Mbps bus were 100% saturated with only the smallest possible CAN 2.0a frames, meaning a new frame arrived every 47μs
- One iteration of this loop consistently took >= 47μs to complete
- No received message ever matched any installed filter
I wasn't thinking of that as a realistic concern, because to my mind most applications as a whole would be doomed regardless of how recv()
behaves if they don't have enough CPU power to keep up with the incoming flow of messages.
If that's your concern, I actually don't know how to reconcile it with problem I ran into that led me to submit this PR.
This PR was prompted by finding that python-can-isotp fails to send flow-control frames in a timely fashion when filters are installed and some high-frequency messages are being received and filtered out. Its sleep_time()
instigates a 50ms sleep between every call to recv()
, which interacts poorly with this method as currently written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern in general is about scenarios like this one:
- Message A, which would not pass the filters, arrives
- Message B, which would pass the filters, arrives
recv(timeout=0)
is called
If the hardware or OS are handling the filtering, then recv()
would return Message B. However, if recv()
is handling the filtering, then recv()
would return None
despite the fact that a desired message was available from the interface at the time of the call.
Are you interested in me continuing to pursue fixes for this discrepancy in behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is well worth pursing! Thanks @malsyned
A bit off topic but when I first worked on this (many a year ago) I made some attempt at model checking the recv flow using promela or TLA+ to try satisfy myself around the many corner cases.
If you're curious here is a starting point in promela for a simplified model of this problem:
#define MAX_TIMEOUT 10
typedef message {
bit valid;
bit filtered;
}
// We set the buffer size to 1 for demonstration purposes
chan recv_channel = [1] of { message };
inline recv_internal(recv_timeout, msg_out, filtered_out, valid_out) {
if
:: recv_channel?msg_out ->
if
:: msg_out.filtered -> filtered_out = 1; valid_out = 1;
:: else -> filtered_out = 0; valid_out = msg_out.valid;
fi;
:: recv_timeout > 0 -> recv_timeout = recv_timeout - 1;
:: recv_timeout == 0 -> filtered_out = 0; valid_out = 0; // Timeout
fi;
}
inline matches_filters(m, result) {
// Placeholder for filter logic, modify result variable non-deterministically
if
:: result = 1;
:: result = 0;
fi;
}
active proctype Receiver() {
message m;
bit already_filtered;
bit is_valid;
bit filter_result; // Used to receive the result from matches_filters
int time_left = MAX_TIMEOUT;
do
:: time_left > 0 ->
recv_internal(time_left, m, already_filtered, is_valid);
matches_filters(m, filter_result); // Call matches_filters inline function
if
:: is_valid && (already_filtered || filter_result) ->
// Message received and is valid
assert( already_filtered || filter_result);
assert(time_left > 0);
break;
:: else ->
// Message is invalid or no message, update time_left and loop
time_left = time_left - 1; // Simulate time passing
fi;
:: else ->
// Timeout
break;
od;
}
// Generator that produces specific scenarios for testing
active proctype MessageGenerator() {
int count = 0;
message msg;
// Scenario 1: All messages are filtered
do :: count < 5 ->
msg.valid = 1; // All messages are valid
msg.filtered = 1; // All messages are pre-filtered
recv_channel!msg; // Send the message
count++;
od;
// Scenario 2: All messages are invalid
do :: count < 10 ->
msg.valid = 0; // All messages are invalid
msg.filtered = 0; // No message is pre-filtered
recv_channel!msg; // Send the message
count++;
od;
do
:: count < 15 -> // Let's generate a finite number of messages
msg.valid = (count % 2); // Alternate between valid and invalid messages
msg.filtered = (count % 3 == 0); // Every third message is pre-filtered
recv_channel!msg; // Send the message
count++;
od;
}
//ltl liveness {
// [](MessageGenerator:msg.valid == 1 -> <>Receiver:is_valid == 1)
//}
ltl p_timeout {
[](Receiver:time_left == 0 -> (Receiver:is_valid == 0))
}
Run with spin e.g. using https://www.cse.chalmers.se/edu/year/2016/course/TDA383_LP1/spin/
The only way I can think of to guarantee both
is to call Is this a strategy you'd be open to merging if I coded it up? |
Something like this (just a sketch, not ready to call this a complete implementation yet) diff --git a/can/bus.py b/can/bus.py
index af517e9d..55d8abac 100644
--- a/can/bus.py
+++ b/can/bus.py
@@ -4,6 +4,7 @@
import contextlib
import logging
+import queue
import threading
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum, auto
@@ -97,10 +98,33 @@ def __init__(
"""
self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
self.set_filters(can_filters)
+ self._recv_queue = queue.Queue(maxsize=100)
+ self._recv_thread = threading.Thread(target=self._recv_task,
+ name='CAN rx filtering',
+ daemon=True)
+ self._recv_thread.start()
def __str__(self) -> str:
return self.channel_info
+ def _recv_task(self):
+ while not self._is_shutdown:
+ try:
+ msg, already_filtered = self._recv_internal(timeout=0.1)
+ if msg and (already_filtered or self._matches_filters(msg)):
+ self._recv_put(msg)
+ except Exception as ex:
+ self._recv_put(ex)
+
+ def _recv_put(self, *args, **kwargs):
+ while not self._is_shutdown:
+ try:
+ self._recv_queue.put(timeout=0.1, *args, **kwargs)
+ except queue.Full:
+ continue
+ else:
+ break
+
def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
"""Block waiting for a message from the Bus.
@@ -113,25 +137,14 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
:raises ~can.exceptions.CanOperationError:
If an error occurred while reading
"""
- start = time()
- time_left = timeout
-
- while True:
- # try to get a message
- msg, already_filtered = self._recv_internal(timeout=time_left)
-
- # propagate timeouts from _recv_internal()
- if not msg:
- return None
-
- # return it, if it matches
- if already_filtered or self._matches_filters(msg):
- LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
- return msg
-
- # try again with reduced timeout
- if timeout is not None:
- time_left = max(0, timeout - (time() - start))
+ try:
+ msg = self._recv_queue.get(timeout=timeout)
+ except queue.Empty:
+ return None
+ if isinstance(msg, Exception):
+ raise msg
+ LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
+ return msg
def _recv_internal(
self, timeout: Optional[float]
@@ -457,6 +470,7 @@ def shutdown(self) -> None:
self._is_shutdown = True
self.stop_all_periodic_tasks()
+ self._recv_thread.join()
def __enter__(self) -> Self:
return self |
Even if recv() is called with timeout=0, the caller's intention is probably for recv() to check all of the messages that have already arrived at the interface until one of them matches the filters.
This is already the way recv() behaves for interface drivers that take advantage of hardware or OS-level filtering, but those that use BusABC's default software-based filtering might return None even if a matching message has already arrived.