This repository has been archived by the owner on Mar 1, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
http.go
196 lines (171 loc) · 5.64 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package main
import (
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/ostcar/geiss/asgi"
)
const (
httpResponseWait = 30 * time.Second
bodyChunkSize = 500 * 1024 // Read 500kb at once
)
// readBodyChunk reads bodyChunkSize bytes from an io.Reader, and returns it as
// first argument. eof is true when there is no more content after this call.
func readBodyChunk(body io.Reader) (content []byte, eof bool, err error) {
var n int
content = make([]byte, bodyChunkSize)
n, err = io.ReadFull(body, content)
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil
eof = true
}
return content[:n], eof, err
}
// Create the reply channel name for a http.response channel.
func createResponseReplyChannel() (replyChannel string, err error) {
replyChannel, err = channelLayer.NewChannel(globalChannelname)
if err != nil {
return "", asgi.NewForwardError("could not create a new channel name", err)
}
return replyChannel, nil
}
// Forwards a HTTP request to the channel layer. Returns the reply channel name.
func forwardHTTPRequest(req *http.Request, replyChannel string) (err error) {
var bodyChannel string
// Read the first part of the body
content, eof, err := readBodyChunk(req.Body)
if err != nil {
return asgi.NewForwardError("can not read the body of the request", err)
}
// If there is a second part of the body, then create a channel to read from it.
if !eof {
bodyChannel, err = channelLayer.NewChannel("http.request.body?")
if err != nil {
return asgi.NewForwardError("can not create new channel name", err)
}
}
host := req.Host
if req.TLS != nil && !strings.Contains(req.Host, ":") {
// If no port was set in the host explicitly, the asgi implementation uses
// 80 as default. So if the request is a https request, we have to manually
// set it to 443
host = req.Host + ":443"
}
rm := asgi.RequestMessage{
ReplyChannel: replyChannel,
HTTPVersion: req.Proto,
Method: req.Method,
Path: req.URL.Path,
Scheme: req.URL.Scheme,
QueryString: []byte(req.URL.RawQuery),
Headers: req.Header,
Body: content,
BodyChannel: bodyChannel,
Client: req.RemoteAddr,
Server: host,
}
// Send the Request message to the channel layer
err = channelLayer.Send("http.request", rm.Raw())
if err != nil {
// If err is an channel full error, we forward it. The asgi specs define, that
// we should not retry in this case, but return a 503.
return asgi.NewForwardError("can not send the message to the channel layer", err)
}
if !eof {
return sendMoreContent(req.Body, bodyChannel)
}
return nil
}
func sendMoreContent(body io.Reader, channel string) (err error) {
// Read more content from the body
content, eof, err := readBodyChunk(body)
if err != nil {
return asgi.NewForwardError("can not read the body of the request", err)
}
for i := 0; ; i++ {
rbc := asgi.RequestBodyChunkMessage{
Content: content,
Closed: false, // TODO test if the connection is closed
MoreContent: !eof,
}
err = channelLayer.Send(channel, rbc.Raw())
if err != nil {
if asgi.IsChannelFullError(err) && i < 1000 {
// If the channel is full, then try again.
time.Sleep(100 * time.Millisecond)
continue
}
return asgi.NewForwardError("can not send the message to the channel layer", err)
}
break
}
if !eof {
return sendMoreContent(body, channel)
}
return // This can return an channel full error or nil
}
// Receives a http response from the channel layer and writes it to the http response.
func receiveHTTPResponse(w http.ResponseWriter, channel string) (err error) {
// Register the asgi channel to listen on.
c, done := readFromChannel(channel)
defer close(done)
// Wait for the response
message, err := readTimeout(c, httpResponseWait)
if err != nil {
return fmt.Errorf("Can not read from channel %s: %s", channel, err)
}
var rm asgi.ResponseMessage
rm.Set(message)
// Write the headers from the response message to the http resonse
for k, v := range rm.Headers {
w.Header()[k] = v
}
// Set the status code of the http response and write the first part of the content
w.WriteHeader(rm.Status)
if _, err = w.Write(rm.Content); err != nil {
return asgi.NewForwardError("can not write to response", err)
}
// If there is more content, then receive it
moreContent := rm.MoreContent
for moreContent {
// Wait for the response
message, err = readTimeout(c, httpResponseWait)
if err != nil {
return fmt.Errorf("Can not read from channel %s: %s", channel, err)
}
var rcm asgi.ResponseChunkMessage
rcm.Set(message)
// Write the received content to the http response.
if _, err = w.Write(rcm.Content); err != nil {
return asgi.NewForwardError("can not write to response", err)
}
// See if there is still more content.
moreContent = rcm.MoreContent
}
return nil
}
// Handels an http request. Returns an error if it happens.
func asgiHTTPHandler(w http.ResponseWriter, req *http.Request) error {
// Get the reply channel name
channel, err := createResponseReplyChannel()
if err != nil {
return asgi.NewForwardError("can not create new channel for http respons", err)
}
// Forward the request to the channel layer and get the reply channel name.
if err = forwardHTTPRequest(req, channel); err != nil {
if asgi.IsChannelFullError(err) {
handleError(w, err.Error(), 503)
return nil
}
return asgi.NewForwardError("could not send message to the channel layer", err)
}
// Receive the response from the channel layer and write it to the http
// response.
if err = receiveHTTPResponse(w, channel); err != nil {
return asgi.NewForwardError(
"could not receive message from the http response channel", err)
}
return nil
}