forked from chromedp/chromedp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
browser.go
330 lines (277 loc) · 8.34 KB
/
browser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
package chromedp
import (
"context"
"errors"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
easyjson "github.com/mailru/easyjson"
"github.com/chromedp/cdproto"
"github.com/chromedp/cdproto/browser"
"github.com/chromedp/cdproto/cdp"
"github.com/chromedp/cdproto/target"
)
// Browser is the high-level Chrome DevTools Protocol browser manager, handling
// the browser process runner, WebSocket clients, associated targets, and
// network, page, and DOM events.
type Browser struct {
// next is the next message id.
// NOTE: needs to be 64-bit aligned for 32-bit targets too, so be careful when moving this field.
// This will be eventually done by the compiler once https://github.com/golang/go/issues/599 is fixed.
next int64
// LostConnection is closed when the websocket connection to Chrome is
// dropped. This can be useful to make sure that Browser's context is
// cancelled (and the handler stopped) once the connection has failed.
LostConnection chan struct{}
dialTimeout time.Duration
// pages keeps track of the attached targets, indexed by each's session
// ID. The only reaon this is a field is so that the tests can check the
// map once a browser is closed.
pages map[target.SessionID]*Target
listenersMu sync.Mutex
listeners []cancelableListener
conn Transport
// newTabQueue is the queue used to create new target handlers, once a new
// tab is created and attached to. The newly created Target is sent back
// via newTabResult.
newTabQueue chan *Target
// cmdQueue is the outgoing command queue.
cmdQueue chan *cdproto.Message
// logging funcs
logf func(string, ...interface{})
errf func(string, ...interface{})
dbgf func(string, ...interface{})
// The optional fields below are helpful for some tests.
// process can be initialized by the allocators which start a process
// when allocating a browser.
process *os.Process
// userDataDir can be initialized by the allocators which set up user
// data dirs directly.
userDataDir string
}
// NewBrowser creates a new browser. Typically, this function wouldn't be called
// directly, as the Allocator interface takes care of it.
func NewBrowser(ctx context.Context, urlstr string, opts ...BrowserOption) (*Browser, error) {
b := &Browser{
LostConnection: make(chan struct{}),
dialTimeout: 10 * time.Second,
newTabQueue: make(chan *Target),
// Fit some jobs without blocking, to reduce blocking in Execute.
cmdQueue: make(chan *cdproto.Message, 32),
logf: log.Printf,
}
// apply options
for _, o := range opts {
o(b)
}
// ensure errf is set
if b.errf == nil {
b.errf = func(s string, v ...interface{}) { b.logf("ERROR: "+s, v...) }
}
dialCtx := ctx
if b.dialTimeout > 0 {
var cancel context.CancelFunc
dialCtx, cancel = context.WithTimeout(ctx, b.dialTimeout)
defer cancel()
}
var err error
urlstr = forceIP(urlstr)
b.conn, err = DialContext(dialCtx, urlstr, WithConnDebugf(b.dbgf))
if err != nil {
return nil, fmt.Errorf("could not dial %q: %v", urlstr, err)
}
go b.run(ctx)
return b, nil
}
func (b *Browser) newExecutorForTarget(ctx context.Context, targetID target.ID, sessionID target.SessionID) (*Target, error) {
if targetID == "" {
return nil, errors.New("empty target ID")
}
if sessionID == "" {
return nil, errors.New("empty session ID")
}
t := &Target{
browser: b,
TargetID: targetID,
SessionID: sessionID,
messageQueue: make(chan *cdproto.Message, 1024),
frames: make(map[cdp.FrameID]*cdp.Frame),
logf: b.logf,
errf: b.errf,
}
// This send should be blocking, to ensure the tab is inserted into the
// map before any more target events are routed.
select {
case <-ctx.Done():
return nil, ctx.Err()
case b.newTabQueue <- t:
}
return t, nil
}
func (b *Browser) Execute(ctx context.Context, method string, params easyjson.Marshaler, res easyjson.Unmarshaler) error {
if method == browser.CommandClose {
return fmt.Errorf("to close the browser, cancel its context or use chromedp.Cancel")
}
id := atomic.AddInt64(&b.next, 1)
lctx, cancel := context.WithCancel(ctx)
ch := make(chan *cdproto.Message, 1)
fn := func(ev interface{}) {
if msg, ok := ev.(*cdproto.Message); ok && msg.ID == id {
select {
case <-ctx.Done():
case ch <- msg:
}
cancel()
}
}
b.listenersMu.Lock()
b.listeners = append(b.listeners, cancelableListener{lctx, fn})
b.listenersMu.Unlock()
// send command
var buf []byte
if params != nil {
var err error
buf, err = easyjson.Marshal(params)
if err != nil {
return err
}
}
cmd := &cdproto.Message{
ID: id,
Method: cdproto.MethodType(method),
Params: buf,
}
select {
case <-ctx.Done():
return ctx.Err()
case b.cmdQueue <- cmd:
}
// wait for result
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-ch:
switch {
case msg == nil:
return ErrChannelClosed
case msg.Error != nil:
return msg.Error
case res != nil:
return easyjson.Unmarshal(msg.Result, res)
}
}
return nil
}
func (b *Browser) run(ctx context.Context) {
defer b.conn.Close()
// incomingQueue is the queue of incoming target events, to be routed by
// their session ID.
incomingQueue := make(chan *cdproto.Message, 1)
delTabQueue := make(chan target.SessionID, 1)
// This goroutine continuously reads events from the websocket
// connection. The separate goroutine is needed since a websocket read
// is blocking, so it cannot be used in a select statement.
go func() {
for {
msg := new(cdproto.Message)
if err := b.conn.Read(ctx, msg); err != nil {
// If the websocket failed, most likely Chrome was closed or
// crashed. Signal that so the entire browser handler can be
// stopped.
close(b.LostConnection)
return
}
switch {
case msg.SessionID != "" && (msg.Method != "" || msg.ID != 0):
select {
case <-ctx.Done():
return
case incomingQueue <- msg:
}
case msg.Method != "":
ev, err := cdproto.UnmarshalMessage(msg)
if err != nil {
b.errf("%s", err)
continue
}
b.listenersMu.Lock()
b.listeners = runListeners(b.listeners, ev)
b.listenersMu.Unlock()
if ev, ok := ev.(*target.EventDetachedFromTarget); ok {
delTabQueue <- ev.SessionID
}
case msg.ID != 0:
b.listenersMu.Lock()
b.listeners = runListeners(b.listeners, msg)
b.listenersMu.Unlock()
default:
b.errf("ignoring malformed incoming message (missing id or method): %#v", msg)
}
}
}()
b.pages = make(map[target.SessionID]*Target, 32)
for {
select {
case <-ctx.Done():
return
case msg := <-b.cmdQueue:
if err := b.conn.Write(ctx, msg); err != nil {
b.errf("%s", err)
continue
}
case t := <-b.newTabQueue:
if _, ok := b.pages[t.SessionID]; ok {
b.errf("executor for %q already exists", t.SessionID)
}
b.pages[t.SessionID] = t
case sessionID := <-delTabQueue:
if _, ok := b.pages[sessionID]; !ok {
b.errf("executor for %q doesn't exist", sessionID)
}
delete(b.pages, sessionID)
case m := <-incomingQueue:
page, ok := b.pages[m.SessionID]
if !ok {
// A page we recently closed still sending events.
continue
}
select {
case <-ctx.Done():
return
case page.messageQueue <- m:
}
case <-b.LostConnection:
return // to avoid "write: broken pipe" errors
}
}
}
// BrowserOption is a browser option.
type BrowserOption = func(*Browser)
// WithBrowserLogf is a browser option to specify a func to receive general logging.
func WithBrowserLogf(f func(string, ...interface{})) BrowserOption {
return func(b *Browser) { b.logf = f }
}
// WithBrowserErrorf is a browser option to specify a func to receive error logging.
func WithBrowserErrorf(f func(string, ...interface{})) BrowserOption {
return func(b *Browser) { b.errf = f }
}
// WithBrowserDebugf is a browser option to specify a func to log actual
// websocket messages.
func WithBrowserDebugf(f func(string, ...interface{})) BrowserOption {
return func(b *Browser) { b.dbgf = f }
}
// WithConsolef is a browser option to specify a func to receive chrome log events.
//
// Note: NOT YET IMPLEMENTED.
func WithConsolef(f func(string, ...interface{})) BrowserOption {
return func(b *Browser) {}
}
// WithDialTimeout is a browser option to specify the timeout when dialing a
// browser's websocket address. The default is ten seconds; use a zero duration
// to not use a timeout.
func WithDialTimeout(d time.Duration) BrowserOption {
return func(b *Browser) { b.dialTimeout = d }
}