Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Hearth unit test coverge for event streams. #214

Merged
merged 13 commits into from
Aug 14, 2024

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def start_mirror_event_server(port)

server = TCPServer.new(port)

Logger.new($stdout)
logger = Logger.new($stdout)
$stdout.sync = true

server_thread = Thread.new do
Expand All @@ -28,6 +28,7 @@ def start_mirror_event_server(port)
conn = HTTP2::Server.new

conn.on(:frame) do |bytes|
logger.info("SERVER -> #{bytes.inspect}")
sock.write(bytes) unless sock.closed?
end

Expand Down Expand Up @@ -76,6 +77,8 @@ def start_mirror_event_server(port)
end

stream.on(:half_close) do
logger.info('SERVER HALF CLOSE')
stream.data('', end_stream: true)
stream.close
end
end
Expand All @@ -88,6 +91,7 @@ def start_mirror_event_server(port)
rescue StandardError => e
puts "#{e.class} exception: #{e.message} - closing socket."
puts e.backtrace
logger.error("SERVER exception: #{e.inspect}")
sock.close
end
end
Expand All @@ -113,9 +117,16 @@ def start_mirror_event_server(port)
# end

server, server_thread = start_mirror_event_server(port)
logger = Logger.new($stdout)
$stdout.sync = true

Timeout.timeout(5) do
client = WhiteLabel::Client.new(
endpoint: "http://localhost:#{port}"
endpoint: "http://localhost:#{port}",
http2_client: Hearth::HTTP2::Client.new(
debug_output: true,
logger: logger
)
)

handler = WhiteLabel::EventStream::StartEventStreamHandler.new
Expand Down
21 changes: 2 additions & 19 deletions hearth/lib/hearth/event_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,8 @@
module Hearth
# Module for EventStreams.
module EventStream
# Raised when reading bytes exceed buffer total bytes
class ReadBytesExceedLengthError < RuntimeError; end
class MessageDecodeError < StandardError; end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change


# Raised when insufficient bytes of a message is received
class IncompleteMessageError < RuntimeError; end

# Raised when there is a prelude checksum mismatch.
class PreludeChecksumError < RuntimeError; end

# Raised when there is a message checksum mismatch.
class MessageChecksumError < RuntimeError; end

# Raised when an event payload exceeds the maximum allowed length.
class EventPayloadLengthExceedError < RuntimeError; end

# Raised when event headers exceed maximum allowed length.
class EventHeadersLengthExceedError < RuntimeError; end

# Raised when event streams parsers encounter are unable to parse a message.
class EventStreamParserError < RuntimeError; end
class MessageEncodeError < StandardError; end
end
end
22 changes: 4 additions & 18 deletions hearth/lib/hearth/event_stream/binary/message_decoder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def decode_message(raw_message)
def validate_checksum!(prelude, content, checksum)
return if Zlib.crc32([prelude, content].pack('a*a*'), 0) == checksum

raise MessageChecksumError
raise MessageDecodeError, 'Message checksum mismatch'
end

def decode_prelude(prelude)
Expand All @@ -80,7 +80,9 @@ def decode_prelude(prelude)
content, checksum = prelude.unpack(
"a#{PRELUDE_LENGTH - CRC32_LENGTH}N"
)
raise PreludeChecksumError unless Zlib.crc32(content, 0) == checksum
unless Zlib.crc32(content, 0) == checksum
raise MessageDecodeError, 'Prelude checksum mismatch'
end

content.unpack('N*')
end
Expand Down Expand Up @@ -137,24 +139,8 @@ def extract_header_value(scanner, unpack_pattern, value_length)
# rubocop:enable Metrics

def extract_payload(encoded)
if encoded.bytesize <= ONE_MEGABYTE
payload_stringio(encoded)
else
payload_tempfile(encoded)
end
end

def payload_stringio(encoded)
StringIO.new(encoded)
end

def payload_tempfile(encoded)
payload = Tempfile.new
payload.binmode
payload.write(encoded)
payload.rewind
payload
end
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions hearth/lib/hearth/event_stream/binary/message_encoder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def encode_headers(headers)

def encode_content(message, encoded_header)
if message.payload.length > MAX_PAYLOAD_LENGTH
raise EventPayloadLengthExceedError
raise MessageEncodeError, 'Payload exceeds maximum payload length'
end

header_length = encoded_header.bytesize
Expand Down Expand Up @@ -102,7 +102,7 @@ def validate_and_join!(header_entries)
break encoded_header
end

raise EventHeadersLengthExceedError
raise MessageEncodeError, 'Encoded headers exceed maximum length'
end
end

Expand Down
4 changes: 3 additions & 1 deletion hearth/lib/hearth/event_stream/binary/types.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ module Types

def self.encode_info(type)
pattern = PATTERN[type]
raise EventStreamParserError unless pattern
unless pattern
raise MessageEncodeError, "Unexpected header type: #{type}"
end

pattern
end
Expand Down
19 changes: 1 addition & 18 deletions hearth/lib/hearth/event_stream/handler_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ def initialize
@handlers = Hash.new { |h, k| h[k] = [] }
@error_handlers = []
@error_event_handlers = []
@exception_event_handlers = []
@raw_event_handlers = []
@headers_handlers = []
end
Expand All @@ -26,11 +25,6 @@ def on_error_event(&block)
@error_event_handlers << block
end

# Modeled errors with message-type exception
def on_exception_event(&block)
@exception_event_handlers << block
end

def on_headers(&block)
@headers_handlers << block
end
Expand Down Expand Up @@ -75,11 +69,7 @@ def emit_raw_event(message)
def parse_and_emit_exception(message)
type = message.headers.delete(':exception-type')&.value
event = parse_event(type, message)
if event
emit_exception_event(type, event)
else
emit_exception_event(:unknown, message)
end
emit_event(event.class, event)
end

def on(type, callback)
Expand All @@ -98,13 +88,6 @@ def emit_event(type, event)
end
end

def emit_exception_event(type, event)
emit_event(type, event)
@exception_event_handlers.each do |handler|
handler.call(type, event)
end
end

def emit_error_event(message)
error_code = message.headers.delete(':error-code')
error_message = message.headers.delete(':error-message')
Expand Down
22 changes: 17 additions & 5 deletions hearth/lib/hearth/http2/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,28 @@ def transmit(request:, response:, logger: nil)

def setup_stream_handlers(response, stream)
stream.on(:headers) do |headers|
headers.each { |k, v| response.headers[k] = v }
if response.body.is_a?(EventStream::Decoder)
# allow async events based on headers
response.body.emit_headers(response.headers)
end
handle_response_headers(headers, response)
end

stream.on(:data) do |data|
response.body.write(data)
end

stream.on(:close) do
log_debug('Stream closed, sending stream-closed to ' \
"sync_queue. Stream: #{stream.inspect}")
response.sync_queue << 'stream-closed'
end
end

def handle_response_headers(headers, response)
headers.each { |k, v| response.headers[k] = v }
return unless response.body.is_a?(EventStream::Decoder)

# allow async events based on headers
response.body.emit_headers(headers)
end

# H2 pseudo headers
# https://http2.github.io/http2-spec/#rfc.section.8.1.2.3
def h2_headers(request)
Expand Down Expand Up @@ -193,6 +199,12 @@ def pool_config
options['http_version'] = 'http2'
options
end

def log_debug(msg)
return unless @logger && @debug_output

@logger.debug(msg)
end
end
# rubocop:enable Metrics/ClassLength
end
Expand Down
Loading
Loading