-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstream.go
146 lines (131 loc) · 3.37 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"sync"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/sirupsen/logrus"
)
type tcpStream struct {
factoryWg *sync.WaitGroup
// protocol tcpStream挟带数据的上层协议类型
protocol string
// isDetect 已经确定该Stream的协议类型
isDetect bool
c2sBuf *buffer
s2cBuf *buffer
}
func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
if *start {
return true // Important! First SYN packet must be accepted.
}
// 当我们检测到应用层协议后,创建消费者进行消费。
if !s.isDetect {
s.protocol = guessProtocol(tcp.Payload)
if s.protocol == UnknownType {
return false // drop it.
}
s.isDetect = true
s.factoryWg.Add(1)
go s.consume()
}
return true
}
func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
dir, _, _, _ := sg.Info()
l, _ := sg.Lengths()
data := sg.Fetch(l)
if l > 0 {
if dir == reassembly.TCPDirClientToServer {
s.c2sBuf.bytes <- data
} else {
s.s2cBuf.bytes <- data
}
}
}
// ReassemblyComplete will be called when stream receive two endpoint FIN packet.
func (s *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
close(s.c2sBuf.bytes)
close(s.s2cBuf.bytes)
// false to receive last ack for avoiding New tcpStream.
return false
}
// consume 消费两个缓存中的数据进行下一步处理
func (s *tcpStream) consume() {
defer s.factoryWg.Done()
switch s.protocol {
case HttpType:
handleHttp(s.c2sBuf, s.s2cBuf)
}
}
func handleHttp(c2s, s2c io.Reader) {
c2sReader := bufio.NewReader(c2s)
s2cReader := bufio.NewReader(s2c)
for {
// read http request and response.
req, err := http.ReadRequest(c2sReader)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
logrus.Error(err)
continue
}
resp, err := http.ReadResponse(s2cReader, nil)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
logrus.Error(err)
continue
}
// send http req/resp to bubblereplay.
err = sendReqResp(req, resp)
if err != nil {
logrus.Errorf("send old req/resp failed, %s", err)
} else {
logrus.Info("send old req/resp ok")
}
req.Body.Close()
resp.Body.Close()
}
}
// sendReqResp 将old req/resp发送至replay服务进行进一步处理
// todo: 为了支持多种协议,这个函数应该是协议无关的,现在参数为http协议。
func sendReqResp(req *http.Request, resp *http.Response) (err error) {
// 序列化req/resp
rawReq, err := httputil.DumpRequest(req, true)
if err != nil {
return err
}
rawResp, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
// send them
record := &Record{
TaskID: configuration.Taskid,
OldReq: rawReq,
OldResp: rawResp,
NewResp: nil,
}
b, err := json.Marshal(record)
if err != nil {
return err
}
api := fmt.Sprintf("http://%s%s", configuration.ReplaySvrAddr, ApiAddRecord)
apiResp, err := http.Post(api, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
err = apiResp.Body.Close()
if err != nil {
logrus.Errorf("close AddRecord Resp.Body failed, %s", err)
}
return nil
}