forked from bluenviron/gortsplib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_conn_reader.go
124 lines (98 loc) · 2.21 KB
/
server_conn_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package gortsplib
import (
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/liberrors"
)
type errSwitchReadFunc struct {
tcp bool
}
func (errSwitchReadFunc) Error() string {
return "switching read function"
}
func isErrSwitchReadFunc(err error) bool {
_, ok := err.(errSwitchReadFunc)
return ok
}
type serverConnReader struct {
sc *ServerConn
chReadDone chan struct{}
}
func newServerConnReader(sc *ServerConn) *serverConnReader {
cr := &serverConnReader{
sc: sc,
chReadDone: make(chan struct{}),
}
go cr.run()
return cr
}
func (cr *serverConnReader) wait() {
<-cr.chReadDone
}
func (cr *serverConnReader) run() {
defer close(cr.chReadDone)
readFunc := cr.readFuncStandard
for {
err := readFunc()
if err, ok := err.(errSwitchReadFunc); ok {
if err.tcp {
readFunc = cr.readFuncTCP
} else {
readFunc = cr.readFuncStandard
}
continue
}
cr.sc.readErr(err)
break
}
}
func (cr *serverConnReader) readFuncStandard() error {
// reset deadline
cr.sc.nconn.SetReadDeadline(time.Time{})
for {
what, err := cr.sc.conn.ReadInterleavedFrameOrRequest()
if err != nil {
return err
}
switch what := what.(type) {
case *base.Request:
cres := make(chan error)
req := readReq{req: what, res: cres}
err := cr.sc.handleRequest(req)
if err != nil {
return err
}
default:
return liberrors.ErrServerUnexpectedFrame{}
}
}
}
func (cr *serverConnReader) readFuncTCP() error {
// reset deadline
cr.sc.nconn.SetReadDeadline(time.Time{})
cr.sc.session.startWriter()
for {
if cr.sc.session.state == ServerSessionStateRecord {
cr.sc.nconn.SetReadDeadline(time.Now().Add(cr.sc.s.ReadTimeout))
}
what, err := cr.sc.conn.ReadInterleavedFrameOrRequest()
if err != nil {
return err
}
switch twhat := what.(type) {
case *base.InterleavedFrame:
atomic.AddUint64(cr.sc.session.bytesReceived, uint64(len(twhat.Payload)))
if cb, ok := cr.sc.session.tcpCallbackByChannel[twhat.Channel]; ok {
cb(twhat.Payload)
}
case *base.Request:
cres := make(chan error)
req := readReq{req: twhat, res: cres}
err := cr.sc.handleRequest(req)
if err != nil {
return err
}
}
}
}