From b5778b1651b0332a2c94a07d721918c06e19432d Mon Sep 17 00:00:00 2001 From: Quinn Damerell Date: Mon, 25 Nov 2024 13:12:48 -0800 Subject: [PATCH] Adding better logic for unknown http body size reads. --- octoeverywhere/WebStream/octowebstream.py | 4 +- .../WebStream/octowebstreamhttphelper.py | 257 +++++++++++++----- octoeverywhere/octohttprequest.py | 2 + setup.py | 2 +- 4 files changed, 191 insertions(+), 74 deletions(-) diff --git a/octoeverywhere/WebStream/octowebstream.py b/octoeverywhere/WebStream/octowebstream.py index 40da2a7..873f890 100644 --- a/octoeverywhere/WebStream/octowebstream.py +++ b/octoeverywhere/WebStream/octowebstream.py @@ -291,9 +291,9 @@ def ensureCloseMessageSent(self): WebStreamMsg.AddIsCloseMsg(builder, True) WebStreamMsg.AddCloseDueToRequestConnectionFailure(builder, self.ClosedDueToRequestConnectionError) webStreamMsgOffset = WebStreamMsg.End(builder) - outputBuf = OctoStreamMsgBuilder.CreateOctoStreamMsgAndFinalize(builder, MessageContext.MessageContext.WebStreamMsg, webStreamMsgOffset) + buffer, msgStartOffsetBytes, msgSizeBytes = OctoStreamMsgBuilder.CreateOctoStreamMsgAndFinalize(builder, MessageContext.MessageContext.WebStreamMsg, webStreamMsgOffset) # Set the flag to silently fail, since the message might have already been sent by the helper. - self.SendToOctoStream(outputBuf, True, True) + self.SendToOctoStream(buffer, msgStartOffsetBytes, msgSizeBytes, True, True) except Exception as e: # This is bad, log it and kill the stream. Sentry.Exception("Exception thrown while trying to send close message for web stream "+str(self.Id), e) diff --git a/octoeverywhere/WebStream/octowebstreamhttphelper.py b/octoeverywhere/WebStream/octowebstreamhttphelper.py index 3067bcf..5ff521c 100644 --- a/octoeverywhere/WebStream/octowebstreamhttphelper.py +++ b/octoeverywhere/WebStream/octowebstreamhttphelper.py @@ -1,7 +1,6 @@ -# namespace: WebStream - import time import logging +import threading import requests import urllib3 @@ -72,9 +71,9 @@ def __init__(self, streamId, logger:logging.Logger, webStream, webStreamOpenMsg: self.UploadBytesReceivedSoFar = 0 self.UploadBuffer = None - # Micro body read stuff. - self.IsDoingMicroBodyReads = False - self.IsFirstMicroBodyRead = True + # Unknown body size chunk reader + # If this is not None, we are doing the unknown body read. Then the rest of the body reads must use this same system. + self.UnknownBodyChunkReadContext:UnknownBodyChunkReadContext = None # Perf stats self.BodyReadTimeSec = 0.0 @@ -100,8 +99,17 @@ def __init__(self, streamId, logger:logging.Logger, webStream, webStreamOpenMsg: # When close is called, all http operations should be shutdown. # Called by the main socket thread so this should be quick! def Close(self): + # Set the flag so all of the looping http operations will stop. self.IsClosed = True + # Important! If we are doing a unknown chunk read, we need to set the wait event to unblock the stream read thread. + # This will cause the thread to wake up, it will see the IsClosed flag, return, and then allow the web request to close, + # which will end the unknown body read thread. + if self.UnknownBodyChunkReadContext is not None: + # Call set under lock, to ensure the other thread doesn't clear it without us seeing it. + with self.UnknownBodyChunkReadContext.BufferLock: + self.UnknownBodyChunkReadContext.BufferDataReadyEvent.set() + # Called when a new message has arrived for this stream from the server. # This function should throw on critical errors, that will reset the connection. @@ -272,12 +280,12 @@ def executeHttpRequest(self): # Look at the headers to see what kind of response we are dealing with. # See if we find a content length, for http request that are streams, there is no content length. - contentLength = None + contentLength:int = None # We will also look for the content type, and look for a boundary string if there is one # The boundary stream is used for webcam streams, and it's an ideal place to package and send each frame - boundaryStr = None + boundaryStr:str = None # Pull out the content type value, so we can use it to figure out if we want to compress this data or not - contentTypeLower =None + contentTypeLower:str =None headers = octoHttpResult.Headers for name, value in headers.items(): nameLower = name.lower() @@ -500,7 +508,7 @@ def executeHttpRequest(self): # Log about it - only if debug is enabled. Otherwise, we don't want to waste time making the log string. responseWriteDone = time.time() if self.Logger.isEnabledFor(logging.DEBUG): - self.Logger.debug(self.getLogMsgPrefix() + method+" [upload:"+str(format(requestExecutionStart - self.OpenedTime, '.3f'))+"s; request_exe:"+str(format(requestExecutionEnd - requestExecutionStart, '.3f'))+"s; send:"+str(format(responseWriteDone - requestExecutionEnd, '.3f'))+"s; body_read:"+str(format(self.BodyReadTimeSec, '.3f'))+"s; compress:"+str(format(self.CompressionTimeSec, '.3f'))+"s; octo_stream_upload:"+str(format(self.ServiceUploadTimeSec, '.3f'))+"s] size:("+str(nonCompressedContentReadSizeBytes)+"->"+str(contentReadBytes)+") compressed:"+str(compressBody)+" msgcount:"+str(messageCount)+" microreads:"+str(self.IsDoingMicroBodyReads)+" type:"+str(contentTypeLower)+" status:"+str(octoHttpResult.StatusCode)+" cached:"+str(isFromCache)+" for " + uri) + self.Logger.debug(self.getLogMsgPrefix() + method+" [upload:"+str(format(requestExecutionStart - self.OpenedTime, '.3f'))+"s; request_exe:"+str(format(requestExecutionEnd - requestExecutionStart, '.3f'))+"s; send:"+str(format(responseWriteDone - requestExecutionEnd, '.3f'))+"s; body_read:"+str(format(self.BodyReadTimeSec, '.3f'))+"s; compress:"+str(format(self.CompressionTimeSec, '.3f'))+"s; octo_stream_upload:"+str(format(self.ServiceUploadTimeSec, '.3f'))+"s] size:("+str(nonCompressedContentReadSizeBytes)+"->"+str(contentReadBytes)+") compressed:"+str(compressBody)+" msgcount:"+str(messageCount)+" microreads:"+str(self.UnknownBodyChunkReadContext is not None)+" type:"+str(contentTypeLower)+" status:"+str(octoHttpResult.StatusCode)+" cached:"+str(isFromCache)+" for " + uri) def buildHeaderVector(self, builder, octoHttpResult:OctoHttpRequest.Result): @@ -747,7 +755,7 @@ def shouldCompressBody(self, contentTypeLower:str, octoHttpResult:OctoHttpReques # Reads data from the response body, puts it in a data vector, and returns the offset. # If the body has been fully read, this should return ogLen == 0, len = 0, and offset == None # The read style depends on the presence of the boundary string existing. - def readContentFromBodyAndMakeDataVector(self, builderContext:MsgBuilderContext, octoHttpResult:OctoHttpRequest.Result, boundaryStr_opt, shouldCompress, contentTypeLower_NoneIfNotKnown, contentLength_NoneIfNotKnown, responseHandlerContext): + def readContentFromBodyAndMakeDataVector(self, builderContext:MsgBuilderContext, octoHttpResult:OctoHttpRequest.Result, boundaryStr_opt, shouldCompress, contentTypeLower_NoneIfNotKnown:str, contentLength_NoneIfNotKnown:int, responseHandlerContext): # This is the max size each body read will be. Since we are making local calls, most of the time we will always get this full amount as long as theres more body to read. # This size is a little under the max read buffer on the server, allowing the server to handle the buffers with no copies. # @@ -798,13 +806,10 @@ def readContentFromBodyAndMakeDataVector(self, builderContext:MsgBuilderContext, finalDataBufferMv_CanBeNone = memoryview(self.BodyReadTempBuffer) finalDataBuffer = finalDataBufferMv_CanBeNone[0:readLength] else: - if responseHandlerContext is None and self.shouldDoUnknownBodySizeRead(contentTypeLower_NoneIfNotKnown, contentLength_NoneIfNotKnown): - # If we don't know the content length AND there is no boundary string, this request is probably a event stream of some sort. - # We have to use this special read function, because doBodyRead will block until the full buffer is filled, which might take a long time - # for a number of streamed messages to fill it up. This special function does micro reads on the socket until a time limit is hit, and then - # returns what was received. - self.IsDoingMicroBodyReads = True - finalDataBuffer = self.doUnknownBodySizeRead(octoHttpResult) + if self.UnknownBodyChunkReadContext is not None or (responseHandlerContext is None and self.shouldDoUnknownBodyChunkRead(contentTypeLower_NoneIfNotKnown, contentLength_NoneIfNotKnown)): + # According to the HTTP 1.1 spec, if there's no content length and no boundary string, then the body is chunk based transfer encoding. + # Note that once we do on read as an unknown body size chunk read, we need to always do it, since there's a thread reading the body. + finalDataBuffer = self.doUnknownBodyChunkRead(octoHttpResult) else: # If there is no boundary string, but we know the content length, it's safe to just read. # This will block until either the full defaultBodyReadSizeBytes is read or the full request has been received. @@ -1099,73 +1104,161 @@ def doBodyRead(self, octoHttpResult:OctoHttpRequest.Result, readSize:int): Sentry.Exception(self.getLogMsgPrefix()+ " exception thrown in doBodyRead. Ending body read.", e) return None - # This is similar to doBodyRead, but it allows us to send chunks of the body over time. - # The problem is for requests where the content length isn't known AND there's no boundary string, response.raw.read(size) will block - # until the full amount of data requested is read. That doesn't work for things like event streams, because there's no boundary string and the full length is unknown, - # but we want to stream the data as it arrives to us. To make doBodyRead efficient, we request a large read buffer, so if the event stream contains many small messages, - # doBodyRead will block until it accumulates enough messages to fill the full buffer and send it. - # - # For normal requests with known content lengths, response.raw.read will read full buffers until the full content is known to be done, and then will return the final subset buffer, - # so they can't get blocked like streaming event can. So if the event stream isn't sending data often, this can get stuck while waiting for the final bytes of a message. + + def doUnknownBodyChunkReadThread(self): + try: + if self.Logger.isEnabledFor(logging.DEBUG): + self.Logger.debug(f"{self.getLogMsgPrefix()}Starting chunk read thread.") + + # Get the Request response object. + response = self.UnknownBodyChunkReadContext.HttpResult.ResponseForBodyRead + if response is None: + raise Exception("doUnknownBodyChunkReadThread was called with a result that has not Response object to read from.") + + # Loop until the stream is closed. + # Remember we use the raw stream read, because it will read and entire chunk and return it as soon as it's ready. + # BUG - response.raw.stream doesn't close when we close the http request from our side. (but it says it should?) + # If server we are calling closes it shutdown correctly, but if our server drops the connection it will not close. + # So what happens is that the stream will timeout from the httprequest.MakeHttpCallAttempt timeout, or when it gets a new chunk it will end. + # That's not great, but it's not super common, so it's fine. + gen = response.raw.stream(amt=None) + for i in gen: + # When we have a new buffer, add it to the list under lock. + with self.UnknownBodyChunkReadContext.BufferLock: + self.UnknownBodyChunkReadContext.BufferList.append(i) + # Call set under lock, to ensure the other thread doesn't clear it without us seeing it. + self.UnknownBodyChunkReadContext.BufferDataReadyEvent.set() + + # When the loop exits, the body read is complete and the stream is closed. + + except Exception as e: + # If the web stream is already closed, don't bother logging the exception. + # These exceptions happen for use cases as above, where stream() doesn't close in time and such. + # Note the exception can be a timeout, but it can also be a "doesn't have a read" function error bc if the socket gets data the lib will try to call read on a fp that's closed and set to None. :/ + if self.IsClosed is False: + Sentry.Exception(self.getLogMsgPrefix()+ " exception thrown in doUnknownBodyChunkReadThread", e) + finally: + # Ensure we always set this flag, so the web stream will know the body read is done. + self.UnknownBodyChunkReadContext.ReadComplete = True + + # Set the event to break the stream read wait, so it will shutdown. + # Call set under lock, to ensure the other thread doesn't clear it without us seeing it. + with self.UnknownBodyChunkReadContext.BufferLock: + self.UnknownBodyChunkReadContext.BufferDataReadyEvent.set() + + try: + if self.Logger.isEnabledFor(logging.DEBUG): + self.Logger.debug(f"{self.getLogMsgPrefix()}Exiting chunk read thread.") + except Exception: + pass + + + # This function should be used if there's no content length and there's no boundary string. + # In that case, the HTTP1.1 standard says the body content must be chunk based transfer encoded, which is what this function does. + # Most of the time these HTTP calls are for streams, like an event stream, log stream, etc. # - # Event streams are an important fallback for OctoPrint, and is also what OctoFarm uses to stream instead of websockets. + # In the past we had two problems with this: + # 1) A body read() will block until the size requested is full, which means we can't stream chunks as they come in. + # 2) We then tired to do micro reads to build a buffer but it allowed us to return if there was enough. But this still failed because read still needs to fill the buffer, + # so if we wanted to read the last 5 bytes of the buffer but set our size to 10, it would block until the final 5 bytes were read. # - # Thus, this function does many small reads (which isn't as efficient) but builds them into a larger buffer that's time limited. In this way, we ensure we are still streaming - # messages every x amount of time, but we also don't stream super small data packets. + # So, the right way to do this is with response.raw.stream(). + # This will read each chunk as it comes in, and return each complete chunk. This is the same way a web browser will handle the data, where it won't handle the data until the entire + # chunk is read. So there's no need to stream sub-chunks. # - # Note this logic will always have a "one message" latency due to the way we get blocked on the socket. Even though we read small amounts, there's no way to know the full message - # length, and thus there's no way to ask for only the remainder of the message. Thus, the end of the message will usually always get put into a pending read, but that read will block - # until the buffer is filled the rest of the way with the n+1 message. Unfortunately that means we are always a message or so behind in the stream. Without being able to do a non-blocking request read, - # there's no way to work around this. + # There's still a problem with stream, which is it will block until the next chunk is ready. But for us, once we have data, we want to send it. Thus we must spin off a thread to do + # the stream reading, and then transfer the buffer. Also stream() is a generator, so it can't be re-entered. # - # Ideally if we could just peak at the pending data length without blocking, we could do this much more efficiently. - def doUnknownBodySizeRead(self, octoHttpResult:OctoHttpRequest.Result): - - # How much we will micro read, this needs to be quite small, to prevent getting "stuck" between messages. - microReadSizeBytes = 300 + def doUnknownBodyChunkRead(self, httpResult:OctoHttpRequest.Result): + + # Even though we read complete chunks as they come in, we might want to buffer smaller chunks up + # before sending them so the compression and stream is more efficient. + # This does need to be small, because we wan't reading this min time period back to back, + # we are reading a chunk, doing all of the send logic, and then spinning back to here. + # So if we set this at exactly 16.6 for a 60fps stream, for example, we will fall behind. + minBufferBuildTimeSec = 0.010 # 10ms + + # Just as a sanity check, we will define the max amount of time we will wait for one chunk. + # This will make sure we don't get stuck in a loop if there are any bugs. + maxChunkReadTimeSec = 20 * 60 * 60 # 20 hours + + # If this is the first time, setup the unknown body read info. + # Once this is defined, this body read method must be used for the rest of the request. + if self.UnknownBodyChunkReadContext is None: + context = UnknownBodyChunkReadContext(httpResult) + context.Thread = threading.Thread(target=self.doUnknownBodyChunkReadThread) + context.Thread.start() + self.UnknownBodyChunkReadContext = context - # How long we will build one big buffer before returning, in seconds. - # Note the first read will get double this time. - maxBufferBuildTimeSec = 0.050 #(50ms) - - # Vars - buildReads = 0 - buffer = bytearray(2 * 1024) - bufferSize = 0 try: startSec = time.time() - while True: - # Do a small read, which will block until the full (small) size is read. - # If nothing shows up to be read, this will wait until the http request read timeout expires, and then will return None. - currentReadBuffer = self.doBodyRead(octoHttpResult, microReadSizeBytes) + chunkBufferList = None + + # Since we will always sleep for at least the min time, there's no need to do work until the min time is meet. + # If we did do the loop, we would just end up spinning and sleeping again. + time.sleep(minBufferBuildTimeSec) + + # Try to read a chunk or wait for the read to be done. + # Only try to read while the stream is open. + while self.IsClosed is False: + + # First, sanity check we haven't been running forever. + now = time.time() + if now - startSec > maxChunkReadTimeSec: + raise Exception(f"doUnknownBodyChunkRead has been waiting for a chunk for {maxChunkReadTimeSec} seconds. This is an error.") + + # Next, check if there are any new buffers to read. + with self.UnknownBodyChunkReadContext.BufferLock: + if len(self.UnknownBodyChunkReadContext.BufferList) > 0: + # If there's new chunks, grab them all and reset the buffer list. + if chunkBufferList is None: + chunkBufferList = self.UnknownBodyChunkReadContext.BufferList + else: + chunkBufferList += self.UnknownBodyChunkReadContext.BufferList + self.UnknownBodyChunkReadContext.BufferList = [] + # Clear the event under lock, so we don't miss a new set. + self.UnknownBodyChunkReadContext.BufferDataReadyEvent.clear() + + # If we got some chunks, see if we are past the min chunk read time or if the chunk stream is complete. + if chunkBufferList is not None and now - startSec > minBufferBuildTimeSec: + break - # If None is returned, we are done. Return the current buffer or None. - if currentReadBuffer is None: + # Finally, AFTER we checked if we have new buffers, check is the read is done. + # Note we have to do this after we grab any new buffers in the list, because we can have pending chunks from before the stream is closed. + if self.UnknownBodyChunkReadContext.ReadComplete: break - # Copy into the existing buffer. - buffer[bufferSize:bufferSize+len(currentReadBuffer)] = currentReadBuffer - bufferSize += len(currentReadBuffer) - buildReads += 1 + # If we don't have a chunk, wait on the event until we have something. + # This will return when there's new chunks ready, ReadComplete is set, or it hits a timeout. + self.UnknownBodyChunkReadContext.BufferDataReadyEvent.wait(maxChunkReadTimeSec) - # We have noticed for some systems (like OctoFarm) the first read takes a while for the event stream to get - # going, and then it gets data. The problem here is we unblock with the first chunk of data and then we are at our time - # limit and return. Instead, it's more ideal to allow one more time limit so we can read the full message and then return it. - if self.IsFirstMicroBodyRead: - self.IsFirstMicroBodyRead = False - startSec = time.time() + # If we broke out of the loop and we have no chunks to send, we are done. + if chunkBufferList is None: + return None - # Check if it's time to be done. - if time.time() - startSec > maxBufferBuildTimeSec: - break + # Append all of the chunks together and return the buffer! + # Optimize for the single chunk scenario. + if len(chunkBufferList) == 1: + return chunkBufferList[0] - # If we broke out, it's time to return what we have. - # If we didn't read anything, we want to return none, to indicate we are done or there was a read timeout. - if bufferSize == 0: - return None + # Find the final buffer length. + totalLength = sum(len(b) for b in chunkBufferList) + + # Allocate a buffer to hold all of the chunks. + finalBuffer = bytearray(totalLength) + offset = 0 + for buffer in chunkBufferList: + view = memoryview(buffer) + with view: + finalBuffer[offset:offset + len(view)] = view + offset += len(view) - # Return the subset of the buffer we filled. - return buffer[0:bufferSize] + # Sanity check + if len(finalBuffer) != totalLength: + raise Exception(f"Final appended buffer was {len(finalBuffer)} but it should have been {totalLength}") + + # Return! + return finalBuffer except Exception as e: Sentry.Exception(self.getLogMsgPrefix()+ " exception thrown in doUnknownBodySizeRead. Ending body read.", e) @@ -1174,7 +1267,12 @@ def doUnknownBodySizeRead(self, octoHttpResult:OctoHttpRequest.Result): # Based on the content length and the content type, determine if we should do a doUnknownBodySizeRead read. # Read doUnknownBodySizeRead about why we need to use it, but since it's not efficient, we only want to use it when we know we should. - def shouldDoUnknownBodySizeRead(self, contentTypeLower_CanBeNone, contentLengthLower_CanBeNone): + def shouldDoUnknownBodyChunkRead(self, contentTypeLower_CanBeNone:str, contentLengthLower_CanBeNone:int): + + # If this is set, we are already doing a unknown body chunk read, so we must keep doing it. + if self.UnknownBodyChunkReadContext is not None: + return True + # If there's a known content length, there's no need to do this, because the normal read will fill the requested buffer size # but return the remainder subset immediately when the full buffer is read. if contentLengthLower_CanBeNone is not None: @@ -1207,3 +1305,20 @@ def checkForDelayIfNotHighPri(self): # Formatting helper. def _FormatFloat(self, value:float) -> str: return str(format(value, '.3f')) + + +# Used to capture the context of the unknown body read thread. +class UnknownBodyChunkReadContext: + + def __init__(self, httpResult:OctoHttpRequest.Result) -> None: + self.HttpResult = httpResult + self.Thread:threading.Thread = None + self.BufferLock = threading.Lock() + self.BufferDataReadyEvent = threading.Event() + + # We use a list so we can efficiently append all of the pending buffers at once when they are being sent. + self.BufferList = [] + + # Set to true when the read is done either from the end of the body or an error. + # Once true, it will never read again, but we do need to process the BufferList + self.ReadComplete = False diff --git a/octoeverywhere/octohttprequest.py b/octoeverywhere/octohttprequest.py index 6c3bc9c..6ef3685 100644 --- a/octoeverywhere/octohttprequest.py +++ b/octoeverywhere/octohttprequest.py @@ -433,6 +433,8 @@ def MakeHttpCallAttempt(logger, attemptName, method, url, headers, data, mainRes # # Note we use a long timeout because some api calls can hang for a while. # For example when plugins are installed, some have to compile which can take some time. + # timeout note! This value also effects how long a body read can be. This can effect unknown body chunk stream reads can hang while waiting on a chunk. + # But whatever this timeout value is will be the max time a body read can take, and then the chunk will fail and the stream will close. # # See the note about allowRedirects above MakeHttpCall. # diff --git a/setup.py b/setup.py index 57b6356..697c68f 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module # Note that this single version string is used by all of the plugins in OctoEverywhere! -plugin_version = "3.6.3" +plugin_version = "3.6.4" # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # module