Skip to content

Commit

Permalink
Add TCP library code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jemc committed Mar 4, 2022
1 parent c95c334 commit 0e21251
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 3 deletions.
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2018 Joe Eli McIlvain
Copyright 2021 Joe Eli McIlvain

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
A base repository for Savi language libraries, with common CI actions configured.
# TCP

See the [Guide](https://github.com/savi-lang/base-standard-library/wiki/Guide) for details on how it works and how to use it for your own libraries.
TCP networking implementation for the Savi standard library.
23 changes: 23 additions & 0 deletions manifest.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
:manifest lib TCP
:sources "src/*.savi"

:dependency ByteStream v0
:from "github:savi-lang/ByteStream"

:dependency IO v0
:from "github:savi-lang/IO"
:depends on ByteStream
:depends on OSError

:dependency OSError v0

:manifest bin "spec"
:copies TCP
:sources "spec/*.savi"

:dependency Spec v0
:from "github:savi-lang/Spec"
:depends on Map

:transitive dependency Map v0
:from "github:savi-lang/Map"
5 changes: 5 additions & 0 deletions spec/Main.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
:actor Main
:new (env)
Spec.Process.run(env, [
Spec.Run(TCP.Spec).new(env)
])
93 changes: 93 additions & 0 deletions spec/TCP.Spec.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
:class iso TCP.Spec.Listener.Notify
:is TCP.Listener.Notify
:let env Env
:new (@env)

:fun ref listening(listen TCP.Listener'ref)
TCP.Spec.EchoClient.new(@env, Inspect[listen.local_port])
@env.err.print("[Listener] Listening")

:fun ref not_listening(listen TCP.Listener'ref) None
@env.err.print("[Listener] Not listening:")
@env.err.print(listen.listen_error.name)

:fun ref closed(listen TCP.Listener'ref): None
@env.err.print("[Listener] Stopped listening")

:fun ref connected!(listen TCP.Listener'ref, ticket TCP.Listener.AcceptTicket)
TCP.Spec.Echoer.new(@env, listen, --ticket)

:actor TCP.Spec.Echoer
:is IO.Actor(IO.Action)
:let env Env
:let io TCP.ConnectionEngine
:new (@env, listen, ticket)
@io = TCP.ConnectionEngine.accept(@, listen, --ticket)
@env.err.print("[Echoer] Accepted")

:fun ref _io_react(action IO.Action)
case action == (
| IO.Action.Read |
@io.pending_reads -> (bytes_available |
@io.read_stream.advance_to_end
bytes val = @io.read_stream.extract_token
@env.err.print("[Echoer] Received:")
@env.err.print(bytes.as_string)
@io.write_stream << bytes.clone // TODO: is clone still needed?
try @io.flush! // TODO: should we flush automatically on close below?
@io.close
)
| IO.Action.Closed |
@env.err.print("[Echoer] Closed")
try @io.listen.as!(TCP.Listener).dispose
)
@

:actor TCP.Spec.EchoClient
:is IO.Actor(IO.Action)
:let env Env
:let io TCP.ConnectionEngine
:new (@env, service)
@io = TCP.ConnectionEngine.connect(@, "localhost", service)

// TODO: Can we make this trigger _io_react with IO.Action.OpenFailed
// automatically via the same mechanism we will use for queuing later
// pending reads, instead of checking for this error case here?
if (@io.connect_error != OSError.None) (
@env.err.print("[EchoClient] Failed to connect:")
@env.err.print(@io.connect_error.name)
)

:fun ref _io_react(action IO.Action)
case action == (
| IO.Action.Opened |
@env.err.print("[EchoClient] Connected")
@io.write_stream << b"Hello, World!"
try @io.flush!

| IO.Action.OpenFailed |
@env.err.print("[EchoClient] Failed to connect:")
@env.err.print(@io.connect_error.name)

| IO.Action.Read |
@io.pending_reads -> (bytes_available |
if (bytes_available >= b"Hello, World!".size) (
@io.read_stream.advance_to_end
@env.err.print("[EchoClient] Received:")
@env.err.print(@io.read_stream.extract_token.as_string)
@io.close
)
)

| IO.Action.Closed |
@env.err.print("[EchoClient] Closed")
try @io.listen.as!(TCP.Listener).dispose
)
@

:class TCP.Spec
:is Spec
:const describes: "TCP"

:it "can listen, connect, send, respond, disconnect, and stop listening"
TCP.Listener.new(TCP.Spec.Listener.Notify.new(@env))
80 changes: 80 additions & 0 deletions src/TCP.ConnectionEngine.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
:class TCP.ConnectionEngine
:is IO.Engine(IO.Action)
:var io IO.CoreEngine
:var listen (TCP.Listener | None): None
:var connect_error OSError: OSError.None
:let read_stream: ByteStream.Reader.new
:let write_stream ByteStream.Writer

:fun non connect(
// TODO: TCPConnectionAuth, rather than ambient authority.
actor AsioEventNotify
host String
service String
from String = ""
)
try (
@_new_with_io(IO.CoreEngine.new_tcp_connect!(actor, host, service, from))
|
invalid = @_new_with_io(IO.CoreEngine.new)
invalid.connect_error = OSError.EINVAL
invalid
)

:fun non accept(
actor AsioEventNotify
listen TCP.Listener
ticket TCP.Listener.AcceptTicket
)
io = IO.CoreEngine.new_from_fd_rw(actor, ticket._fd)
new = @_new_with_io(io)
new.listen = listen
new

:new _new_with_io(@io)
@write_stream = ByteStream.Writer.new(@io)

:fun ref deferred_actions
:yields IO.Action for None
// TODO
@

:fun ref react(event CPointer(AsioEvent), flags U32, arg U32) @
:yields IO.Action
@io.react(event, flags, arg) -> (action |
case action == (
| IO.Action.Closed |
try @listen.as!(TCP.Listener)._conn_closed

// TODO: windows complete writes, flush-after-mute (pending writes logic from Pony)
// | IO.Action.Write |
// ...
)
yield action
)
@

:fun ref close
@io.close
@

:fun ref flush!
@write_stream.flush!

:fun ref pending_reads
:yields USize for None
if Platform.windows (
None // TODO: @_windows_complete_reads(arg)
|
@_pending_reads_unix -> (bytes_available | yield bytes_available)
)
@

:fun ref _pending_reads_unix None
:yields USize for None
while @io.is_readable (
try (
bytes_read = @read_stream.receive_from!(@io)
if (bytes_read > 0) (yield @read_stream.bytes_ahead_of_marker)
)
)
119 changes: 119 additions & 0 deletions src/TCP.Listener.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
:trait TCP.Listener.Notify
:fun ref listening(listen TCP.Listener'ref): None
:fun ref not_listening(listen TCP.Listener'ref) None
:fun ref closed(listen TCP.Listener'ref): None
:fun ref connected!(
listen TCP.Listener'ref
ticket TCP.Listener.AcceptTicket
) IO.Actor(IO.Action)

// TODO: Is there another way to protect the fd by making it non-forgeable,
// while avoiding the overhead of an allocation and pointer indirection?
:class iso TCP.Listener.AcceptTicket
:var _fd U32
:new iso _new(@_fd)

:actor TCP.Listener
:let notify TCP.Listener.Notify
:var listen_error OSError: OSError.None

:var _fd U32: -1
:var _event CPointer(AsioEvent): CPointer(AsioEvent).null

:var _count USize: 0
:var _limit USize
:var _read_buffer_size USize
:var _yield_after_reading USize
:var _yield_after_writing USize

:var _closed Bool: False
:var _paused Bool: False

:fun local_port: _NetAddress._for_fd(@_fd).port

:new (
// TODO: TCP.Listener.Auth, rather than ambient authority.
notify TCP.Listener.Notify'iso
host String = ""
service String = "0"
@_limit = 0
@_read_buffer_size = 16384
@_yield_after_reading = 16384
@_yield_after_writing = 16384
)
new_notify TCP.Listener.Notify'ref = --notify // TODO: should not be needed
@notify = new_notify

event = _LibPonyOS.pony_os_listen_tcp(@, host.cstring, service.cstring)
if event.is_not_null (
@_event = event
@_fd = AsioEvent.fd(@_event)
error = _LibPonyOS.pony_os_errno
new_notify.listening(@)
|
@listen_error = _LibPonyOS.pony_os_errno
@_closed = True
new_notify.not_listening(@)
)

:: This is a special behaviour that hooks into the AsioEventNotify runtime,
:: called whenever an event handle we're subscribed to receives an event.
:be _event_notify(event CPointer(AsioEvent), flags U32, arg U32)
if (@_event === event) (
if AsioEvent.is_readable(flags) (
@_accept(arg)
)
if AsioEvent.is_disposable(flags) (
AsioEvent.destroy(@_event)
@_event = CPointer(AsioEvent).null
)
)

:be _accept(ns U32 = 0)
if Platform.windows (
None // TODO
|
if @_closed.not (
try (
while (@_limit == 0 || @_count < @_limit) (
conn_fd = _LibPonyOS.pony_os_accept(@_event)
case conn_fd == (
| 0 | error! // EWOULDBLOCK, don't try again
| -1 | None // Some other error, so we can try again
| @_spawn(conn_fd)
)
)
@_paused = True
)
)
)

:fun ref _spawn(fd U32)
try (
@notify.connected!(@, TCP.Listener.AcceptTicket._new(fd))
@_count += 1
|
_LibPonyOS.pony_os_socket_close(fd)
)

:be _conn_closed
@_count -= 1

// If releasing this connection takes us below the limit,
// unpause acceptance and try to accept more connections.
if (@_paused && @_count < @_limit) (
@_paused = False
@_accept
)

:be dispose: @close
:fun ref close
if (@_closed.not && @_event.is_not_null) (
// When not on windows, unsubscribe immediately here instead of later.
if Platform.windows.not AsioEvent.unsubscribe(@_event)

_LibPonyOS.pony_os_socket_close(@_fd)
@_fd = -1

@notify.closed(@)
)
12 changes: 12 additions & 0 deletions src/_Lib.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
:ffi _LibC
:fun ntohs(network_short U16) U16
:fun ntohl(network_long U32) U32

:ffi _LibPonyOS
:fun pony_os_listen_tcp(owner AsioEventNotify, host CPointer(U8), service CPointer(U8)) CPointer(AsioEvent)
:fun pony_os_accept(event CPointer(AsioEvent)) U32
:fun pony_os_socket_close(fd U32) None
:fun pony_os_errno OSError
:fun pony_os_sockname(fd U32, net_addr _NetAddress'ref) None
:fun pony_os_ipv4(net_addr _NetAddress'box) Bool
:fun pony_os_ipv6(net_addr _NetAddress'box) Bool
39 changes: 39 additions & 0 deletions src/_NetAddress.savi
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@


:class val _NetAddress
:is Equatable(_NetAddress)

:let _family U16: 0
:let _port U16: 0 :: Port number in network byte order.
:let _ipv4 U32: 0 :: Bits for an IPv4 address in network byte order.
:let _ipv6a U32: 0 :: Bits 0-32 of an IPv6 address in network byte order.
:let _ipv6b U32: 0 :: Bits 33-64 of an IPv6 address in network byte order.
:let _ipv6c U32: 0 :: Bits 65-96 of an IPv6 address in network byte order.
:let _ipv6d U32: 0 :: Bits 97-128 of an IPv6 address in network byte order.
:let _scope U32: 0 :: IPv6 scope (unicast, anycast, multicast, etc...).

:new _for_fd(fd): _LibPonyOS.pony_os_sockname(fd, @)

:fun is_ipv4: _LibPonyOS.pony_os_ipv4(@)
:fun is_ipv6: _LibPonyOS.pony_os_ipv6(@)

:fun port: _LibC.ntohs(@_port) // (converted to host byte order)
:fun scope: _LibC.ntohl(@_scope) // (converted to host byte order)
:fun ipv4_addr: _LibC.ntohl(@_ipv4) // (converted to host byte order)
// TODO: ipv6_addr (needs tuple return value)
// TODO: family (needs Platform.big_endian)

:fun "=="(other _NetAddress'box)
@_family == other._family
&& @_port == other._port
&& (
if @is_ipv4 (
@_ipv4 == other._ipv4
|
@_ipv6a == other._ipv6a
&& @_ipv6b == other._ipv6b
&& @_ipv6c == other._ipv6c
&& @_ipv6d == other._ipv6d
)
)
&& @_scope == other._scope

0 comments on commit 0e21251

Please sign in to comment.