Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Streaming response bodies #158

Open
julik opened this issue Feb 2, 2018 · 8 comments
Open

RFC: Streaming response bodies #158

julik opened this issue Feb 2, 2018 · 8 comments

Comments

@julik
Copy link
Collaborator

julik commented Feb 2, 2018

I started work on the streaming response bodies in https://github.com/toland/patron/tree/response-body-callback – using a Ruby callback to get the body data as it gets returned. However, the current API Patron has for this is not a good fit because the Response object is only returned once the curl_easy_perform has returned. In practice, this leads to a pattern like this:

sess = Patron::Session.new
sess.on_body do |body_chunk_binary_str|
  ...
  # This block will execute first
end
resp = sess.get('/endpoint') # This will execute second
resp.status #=> 401, we were actually unauthorized 😭

This in turn means that implementing streaming response bodies is very obtuse, because we only get to the status code of the response later. I can envision the following APIs instead:

sess = Patron::Session.new
resp = sess.streaming_get('/get') # This must return a StreamingResponse or similar
begin
  if resp.status != 200
     error_body = resp.receive_body_and_close
     raise "Invalid status #{resp.status} - #{error_body}"
  end
  resp.each do |body_chunk|
    # do something with a body chunk
  end
ensure
  resp.close # because we are still inside `curl_easy_perform` at this stage
end

We could stretch that paradigm even further out and use something like https://github.com/Tonkpils/celluloid-eventsource use pattern which is very similar to EventMachine, and is also known as "callback spaghetti":

sess = Patron::AsyncSession.new
sess.on_status_and_headers do |status, header_hash|
  if status != 200
    sess.abort
  end
end
sess.on_body_chunk do |chunk|
  @event_bus.deliver(chunk)
end
# This will actually block the current thread until `curl_easy_perform` returns
sess.perform!

Yet another option is going for a minimalist API akin to what Excon uses, which would reduce the calling code to this:

sess = Patron::Session.new
sess.streaming_get('/url') do |status, header_hash, body_chunk|
  # here you can abort or continue, same status and header_hash
  # get yielded every time, and we need to implement something like
  # a...
  throw :patron_abort
  # or alternatively just a
  break
  # which would force the block to return `nil` to the calling C code.
end

That is only possible if we remove the Response objects from the picture, but it is probably not dramatic because the code calling long-polled endpoints is going to end up looking very different from the code that reads eagerly.

When you are using a streaming response you don't want to buffer on the Ruby side - actually you probably want the opposite - no buffering at all and the Curl buffer size set to a minimum, so we cannot buffer until curl_easy_perform returns. Besides, we might be dealing with long-running responses here, which would mean that the curl_easy_perform continues until the process/thread quits.

Aside from the fact that this needs a lot of rearchitecting of the C code - what are people's thoughts on these APIs and what would be a better fit for Patron? I am at loss - what I do know is that all of these are possible, but I rather double-check we all like this before I do this change which is going to span hundreds of lines of C.

@julik
Copy link
Collaborator Author

julik commented Feb 2, 2018

@toland and anyone else - please do comment.

@toland
Copy link
Owner

toland commented Feb 5, 2018

Patron was originally intended to have a simple API. I wrote it because I really didn't like curb's rather arcane API. Admittedly, curb is a faithful representation of the libcurl API, but that didn't feel very "Rubyish" to me.

The callback-based API just feels wrong. It directly messes with how we read and understand code. The term "callback spaghetti" isn't meant as a compliment 😄

The first and third examples both look good, but I would say that the first is more in keeping with the spirit with which I created Patron.

This is just my opinion. Take it with an appropriately sized grain of salt.

@toland
Copy link
Owner

toland commented Feb 5, 2018

Actually, the need for that resp.close at the end of the first example bothers me. Folks are going to forget that and it will cause all kinds of badness. I think I would combine the first and third into something like this:

sess = Patron::Session.new
sess.streaming_get('/get') do |stream|
  if stream.status != 200
     error_body = stream.receive_body_and_close
     raise "Invalid status #{stream.status} - #{error_body}"
  end
  stream.each do |body_chunk|
    # do something with a body chunk
  end
end

I like this approach because the stream object is extensible; you aren't bound to the three parameters passed in the third example. Also, you can't forget to close the session like in the first example. Win, win!

It seems like you might also want methods like stream.to_file and stream.to_string in addition to stream.each.

@WJWH
Copy link

WJWH commented Mar 3, 2018

And stream.to_socket!

@julik
Copy link
Collaborator Author

julik commented Mar 11, 2018

Ok, imagine we do this. How do we ensure the stream gets closed correctly?

  sess.streaming_get('/get') do |stream|
    begin
      if stream.status != 200
         error_body = stream.receive_body
         raise "Invalid status #{stream.status} - #{error_body}"
      end
      stream.each do |body_chunk|
        # the fourth iteration of this will raise something
      end
    ensure
      stream.close
    end
  end

@toland
Copy link
Owner

toland commented Mar 12, 2018

I am not sure I understand the question, so if what I am about to say seems out in left field, it probably is.

The stream is closed within #streaming_get, whose implementation looks something like this:

def streaming_get(url) do
  stream = setup_stream(url)
  begin
    yield stream
  ensure
    stream.close
  end
end

(Please note that my Ruby is rusty, and this was done off the top of my head. Be kind 😄 )

Your example could then be simplified to:

  sess.streaming_get('/get') do |stream|
    if stream.status != 200
       error_body = stream.receive_body
       raise "Invalid status #{stream.status} - #{error_body}"
    end
    stream.each do |body_chunk|
      # the fourth iteration of this will raise something
    end
  end

We will need to make sure that #streaming_get returns an appropriate value when an error is raised. Or, since this function exists solely for its side-effects, maybe nil is a reasonable return value. And nothing would stop a caller from using ensure within the block if they want to handle certain exceptions themselves. We just want to make sure that the stream is closed when all is said and done.

@WJWH
Copy link

WJWH commented Mar 12, 2018

I came here to say the same thing, the error handling should be inside the method we provide IMO (in this case streaming_get). I do think we should at least allow an error callback in case the stream raises an error somewhere halfway through that has nothing to do with the other code, like so:

  sess.streaming_get('/get', on_error: &my_error_proc) do |stream|
    if stream.status != 200
       error_body = stream.receive_body
       raise "Invalid status #{stream.status} - #{error_body}"
    end
    stream.each do |body_chunk|
      # the fourth iteration of this will raise something
    end
  end

Where my_error_proc is some user-definable proc that could do logging for example.

EDIT: Possibly we could also already put the error handling for non-200 responses in the streaming_get method.

@julik
Copy link
Collaborator Author

julik commented Sep 17, 2021

This has (not-unexpectedly) again become relevant 😆 so I think I'll try a go at the following semantics:

  1. We get a method called streaming_request with a set of arguments, maybe similar to the existing #request maybe slightly different. This is an opportunity to rethink the API surface a little bit
  2. Since modern Rubies have block-level rescue I don't think having an on_error callback is justified - unless we want to consider such a callback recoverable. Since most libCURL-originating errors are "terminal" for the running request (actually - I think all are) - raising from either the #streaming_request method or the #each method is appropriate.
  3. For restart/recovery the caller should keep track on how much data they have already processed / received if they want
  4. To avoid mem bloat I am going to try doing an approach whereby body_chunk is a String allocated once and then reused, I believe the libCURL callback accepts the size of the buf as argument and will fill the buffer to the brim and no more, so this should be achievable.
  5. At #streaming_request method end the libCURL request should forcibly close/terminate via an ensure, regardless whether the stream has been read out in full or not

@WJWH @toland I will mention you from a PR once I have something

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants