-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
256 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
|
||
module RSMP | ||
class Sequencer < Listener | ||
|
||
def initialize notifier:, component:, filter:, task:, timeout: | ||
@notifier = notifier | ||
@filter = filter | ||
@task = task || notifier.task | ||
@condition = Async::Notification.new | ||
@timeout = timeout | ||
@state = nil | ||
reset | ||
end | ||
|
||
def reset | ||
@next = nil | ||
@messages = [] | ||
@done = false | ||
end | ||
|
||
# Waits for message and passes each message accept by the filter | ||
# to the block. | ||
# The block can do any validation of the | ||
# | ||
# | ||
def validate &block | ||
reset | ||
@notifier.add_listener self | ||
until @done | ||
yield wait_for_message | ||
end | ||
ensure | ||
@notifier.remove_listener self | ||
end | ||
|
||
def wait_for_message | ||
if @messages.empty? | ||
@task.with_timeout(@timeout) { @condition.wait } | ||
end | ||
@messages.shift | ||
rescue Async::TimeoutError | ||
@error = RSMP::TimeoutError | ||
end | ||
|
||
def notify message | ||
if @filter.accept? messages | ||
@messages.push messages | ||
@condition.notifify | ||
end | ||
end | ||
end | ||
|
||
def done | ||
@done = true | ||
end | ||
end | ||
|
||
|
||
module Validator::StatusHelpers | ||
class SignalPriorityHelper < RSMP::Listener | ||
|
||
def request | ||
end | ||
|
||
def request_unrelated | ||
end | ||
|
||
def next state | ||
@next = state | ||
end | ||
|
||
def done | ||
@done = true | ||
end | ||
|
||
private | ||
|
||
def send_priority_request id: nil, type: 'new' | ||
command_list = build_command_list :M0022, :requestPriority, { | ||
requestId: (id || SecureRandom.uuid()[0..3]), | ||
signalGroupId: signal_group, | ||
type: type, | ||
level: 7, | ||
eta: 2, | ||
vehicleType: 'car' | ||
} | ||
site.send_command component, command_list | ||
end | ||
|
||
# look through a status message to find state | ||
# updates for a specific priority request | ||
def search_for_request_state request_id, message, states | ||
message.attribute('sS').each do |status| | ||
return nil unless status['sCI'] == 'S0033' && status['n'] == 'status' | ||
status['s'].each do |priority| | ||
next unless priority['r'] == request_id # is this our request | ||
state = priority['s'] | ||
next unless state != states.last # did the state change? | ||
log "Priority request reached state '#{state}'" | ||
return state | ||
end | ||
end | ||
nil | ||
end | ||
end | ||
end | ||
|
||
|
||
|
||
|
||
# @request_id = SecureRandom.uuid()[0..3] | ||
|
||
|
||
status_list = [{'sCI'=>'S0033','n'=>'status','uRt'=>'0'}] | ||
status_list.map! { |item| item.merge!('sOc' => true) } if use_sOc?(site) | ||
site.subscribe_to_status component, status_list | ||
|
||
# start collector | ||
request_id = SecureRandom.uuid()[0..3] # make a message id | ||
num = sequence.length | ||
states = [] | ||
result = nil | ||
collector = nil | ||
collect_task = task.async do | ||
collector = RSMP::Collector.new( | ||
site, | ||
type: "StatusUpdate", | ||
num: num, | ||
timeout: Validator.get_config('timeouts','priority_completion'), | ||
component: component | ||
) | ||
|
||
|
||
result = collector.collect do |message| | ||
state = search_for_request_state request_id, message, states | ||
next unless state | ||
states << state | ||
:keep | ||
end | ||
end | ||
|
||
# helper to send priority request | ||
|
||
|
||
#send unrelated requests before and our request, to check that it does not interfere | ||
send_priority_request "Send an unrelated signal priority request before", | ||
site: site, component: component | ||
send_priority_request "Send our signal priority request", | ||
site: site, component: component, id: request_id | ||
send_priority_request "Send an unrelated signal priority request after", | ||
site: site, component: component | ||
|
||
# wait for collector to complete and check result | ||
collect_task.wait | ||
expect(result).to eq(:ok) | ||
expect(collector.messages).to be_an(Array) | ||
expect(collector.messages.size).to eq(num) | ||
expect(states).to eq(sequence), "Expected state sequence #{sequence}, got #{states}" | ||
ensure | ||
# unsubcribe | ||
unsubscribe_list = status_list.map { |item| item.slice('sCI','n') } | ||
site.unsubscribe_to_status component, unsubscribe_list | ||
end |