generated from alexejk/go-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
codec.go
178 lines (143 loc) · 3.78 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package xmlrpc
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/rpc"
"net/url"
"sync"
)
const defaultUserAgent = "alexejk.io/go-xmlrpc"
// Codec implements methods required by rpc.ClientCodec
// In this implementation Codec is the one performing actual RPC requests with http.Client.
type Codec struct {
endpoint *url.URL
httpClient *http.Client
customHeaders map[string]string
mutex sync.Mutex
// contains completed but not processed responses by sequence ID
pending map[uint64]*rpcCall
// Current in-flight response
response *Response
encoder Encoder
decoder Decoder
// presents completed requests by sequence ID
ready chan uint64
userAgent string
shutdown chan struct{}
}
type rpcCall struct {
Seq uint64
ServiceMethod string
httpResponse *http.Response
}
// NewCodec creates a new Codec bound to provided endpoint.
// Provided client will be used to perform RPC requests.
func NewCodec(endpoint *url.URL, httpClient *http.Client) *Codec {
return &Codec{
endpoint: endpoint,
httpClient: httpClient,
encoder: &StdEncoder{},
decoder: &StdDecoder{},
pending: make(map[uint64]*rpcCall),
response: nil,
ready: make(chan uint64),
userAgent: defaultUserAgent,
shutdown: make(chan struct{}),
}
}
// SetEncoder allows setting a new Encoder on the codec
func (c *Codec) SetEncoder(encoder Encoder) {
c.encoder = encoder
}
// SetDecoder allows setting a new Decoder on the codec
func (c *Codec) SetDecoder(decoder Decoder) {
c.decoder = decoder
}
func (c *Codec) WriteRequest(req *rpc.Request, args interface{}) error {
bodyBuffer := new(bytes.Buffer)
err := c.encoder.Encode(bodyBuffer, req.ServiceMethod, args)
if err != nil {
return err
}
httpRequest, err := http.NewRequestWithContext(context.TODO(), "POST", c.endpoint.String(), bodyBuffer)
if err != nil {
return err
}
httpRequest.Header.Set("Content-Type", "text/xml")
httpRequest.Header.Set("User-Agent", c.userAgent)
// Apply customer headers if set, this allows overwriting static default headers
for key, value := range c.customHeaders {
httpRequest.Header.Set(key, value)
}
httpRequest.Header.Set("Content-Length", fmt.Sprintf("%d", bodyBuffer.Len()))
httpResponse, err := c.httpClient.Do(httpRequest) //nolint:bodyclose // Handled in ReadResponseHeader
if err != nil {
return err
}
c.mutex.Lock()
c.pending[req.Seq] = &rpcCall{
Seq: req.Seq,
ServiceMethod: req.ServiceMethod,
httpResponse: httpResponse,
}
c.mutex.Unlock()
c.ready <- req.Seq
return nil
}
func (c *Codec) ReadResponseHeader(resp *rpc.Response) error {
select {
case seq := <-c.ready:
// Handle request that is ready
c.mutex.Lock()
call := c.pending[seq]
delete(c.pending, seq)
c.mutex.Unlock()
resp.Seq = call.Seq
resp.ServiceMethod = call.ServiceMethod
r := call.httpResponse
defer r.Body.Close()
if r.StatusCode < 200 || r.StatusCode >= 300 {
resp.Error = fmt.Sprintf("bad response code: %d", r.StatusCode)
return nil
}
body, err := io.ReadAll(r.Body)
if err != nil {
resp.Error = err.Error()
return nil
}
decodableResponse, err := NewResponse(body)
if err != nil {
resp.Error = err.Error()
return nil
}
// Return response Fault already at this stage
if err := c.decoder.DecodeFault(decodableResponse); err != nil {
resp.Error = err.Error()
return nil
}
c.response = decodableResponse
return nil
case <-c.shutdown:
// Handle shutdown signal
return net.ErrClosed
}
}
func (c *Codec) ReadResponseBody(v interface{}) error {
if v == nil {
return nil
}
if c.response == nil {
return errors.New("no in-flight response found")
}
return c.decoder.Decode(c.response, v)
}
func (c *Codec) Close() error {
c.shutdown <- struct{}{}
c.httpClient.CloseIdleConnections()
return nil
}