Skip to content

Commit

Permalink
Remove deprecated usage of IO.Engine.new_tcp_connect!.
Browse files Browse the repository at this point in the history
This function was unsafe for `IO` to expose, so it needs to be
removed from there.

This commit copies the TCP-specific logic from `IO.Engine`
into the `TCP.Engine` where it belongs.
  • Loading branch information
jemc committed Feb 15, 2023
1 parent d30c3ca commit a29ec1c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 35 deletions.
11 changes: 1 addition & 10 deletions spec/TCP.Spec.savi
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@
TCP.auth(@env.root).connect.to("localhost", port)
)

// TODO: Can we make this trigger io_react with IO.Action.OpenFailed
// automatically via the same mechanism we will use for queuing later
// pending reads, instead of checking for this error case here?
if (@io.connect_error != OSError.None) (
@env.err.print("[EchoClient] Failed to connect:")
@env.err.print(@io.connect_error.name)
)

:fun ref io_react(action IO.Action)
case action == (
| IO.Action.Opened |
Expand All @@ -101,8 +93,7 @@
try @io.flush!

| IO.Action.OpenFailed |
@env.err.print("[EchoClient] Failed to connect:")
@env.err.print(@io.connect_error.name)
@env.err.print("[EchoClient] Failed to connect.")

| IO.Action.Read |
if (@io.read_stream.bytes_ahead_of_marker >= b"Hello, World!".size) (
Expand Down
81 changes: 56 additions & 25 deletions src/TCP.Engine.savi
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,77 @@
:is IO.Engine(IO.Action)
:var io IO.CoreEngine
:var _listener (IO.Actor(IO.Action) | None): None
:var connect_error OSError: OSError.None
:var _pending_connect_count I32: 0
:let read_stream: ByteStream.Reader.new
:let write_stream ByteStream.Writer

:new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket)
@io = try (
// TODO: The IO package shouldn't expose this unsafe interface that
// could be used to circumvent the capability security of the TCP package.
// Instead, the relevant code should be carefully moved to this package.
IO.CoreEngine.new_tcp_connect!(
actor
ticket.host
ticket.port
ticket.from_port
)
:fun non _asio_flags
if Platform.is_windows (
AsioEvent.Flags.read_write
|
@connect_error = OSError.EINVAL
IO.CoreEngine.new(AsioEvent.ID.null) // an invalid one
AsioEvent.Flags.read_write_oneshot
)

:: Create a new TCP engine based on an outbound connection.
::
:: The given `ticket` specifies the connection details, and also proves
:: (via capability security) that the caller has authority to connect.
:new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket)
// Begin with an "empty" IO core engine - we'll fill it later
// after one of the attempted TCP connections succeeds.
@io = IO.CoreEngine.new(AsioEvent.ID.null)
@write_stream = ByteStream.Writer.new(@io)

:new accept(
actor IO.Actor(IO.Action)
ticket TCP.Accept.Ticket
)
// If IPv4 and IPv6 resolutions are both possible, the runtime will try to
// connect with both parallel; we'll later adopt whichever succeeds first.
@_pending_connect_count = _FFI.pony_os_connect_tcp(
actor
ticket.host.cstring, ticket.port.cstring, ticket.from_port.cstring
@_asio_flags
)

// If we failed to resolve any valid connection attempts, send the actor
// a later IO action that will let it know that connection has failed.
if (@_pending_connect_count == 0) (
actor.io_deferred_action(IO.Action.OpenFailed)
)

:: Create a new TCP engine based on an accepting an inbound connection.
::
:: The given `ticket` is a single-use capability that originated in a
:: `TCP.Listen.Engine` that had an incoming connection available to accept.
:new accept(actor IO.Actor(IO.Action), ticket TCP.Accept.Ticket)
actor.io_deferred_action(IO.Action.Opened)
@io = IO.CoreEngine.new(
_FFI.pony_asio_event_create(actor, ticket._fd, @_asio_flags, 0, True)
)
@write_stream = ByteStream.Writer.new(@io)
@_listener = ticket._listener

:fun non _asio_flags
if Platform.is_windows (
AsioEvent.Flags.read_write
|
AsioEvent.Flags.read_write_oneshot
)

:fun ref react(event AsioEvent) @
:yields IO.Action
// If we haven't adopted an event yet, and this one is ready to be adopted,
// try to adopt it now, as we expect it is one of our pending connections.
if (@io.is_waiting_to_open && event.is_writable) (
try (
@_pending_connect_count -= 1
@io.adopt_event!(event)
yield IO.Action.Opened
|
// We failed to adopt it because it was a failed connection attempt.
// If there are no more pending connection attempts, our last one has
// failed and we have no choice but to admit final failure.
if (@_pending_connect_count == 0) (
yield IO.Action.OpenFailed
)

// Return early because we don't want to do anything with this event
// after having failed to adopt it already.
return @
)
)

// Now, pass the event to the inner engine and react to its yielded actions.
@io.react(event) -> (action |
case action == (
| IO.Action.Closed |
Expand All @@ -67,6 +97,7 @@
yield action
)
)

@

:fun ref close
Expand Down
1 change: 1 addition & 0 deletions src/_FFI.savi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:ffi pony_asio_event_unsubscribe(event AsioEvent.ID) None
:ffi pony_asio_event_destroy(event AsioEvent.ID) None

:ffi pony_os_connect_tcp(owner AsioEvent.Actor, host CPointer(U8), service CPointer(U8), from CPointer(U8), asio_flags U32) I32
:ffi pony_os_listen_tcp(owner AsioEvent.Actor, host CPointer(U8), service CPointer(U8)) AsioEvent.ID
:ffi pony_os_accept(event AsioEvent.ID) U32
:ffi pony_os_socket_close(fd U32) None
Expand Down

0 comments on commit a29ec1c

Please sign in to comment.