-
Notifications
You must be signed in to change notification settings - Fork 1
/
writer.go
83 lines (67 loc) · 1.5 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package conveyor
import (
"io"
"sync"
)
// The ConcurrentWriter type is a thread-safe wrapper for
// io.Writer that is able to keep the order of lines across all chunks.
type ConcurrentWriter struct {
handle io.Writer
keepOrder bool
lastChunkWritten int
cache map[int][]byte
firstWrite bool
sync.Mutex
}
// NewConcurrentWriter returns a new ConcurrentWriter
func NewConcurrentWriter(writer io.Writer, keepOrder bool) *ConcurrentWriter {
return &ConcurrentWriter{
keepOrder: keepOrder,
handle: writer,
cache: make(map[int][]byte),
firstWrite: true,
}
}
func (c *ConcurrentWriter) Write(chunk *Chunk, buff []byte) error {
c.Lock()
defer c.Unlock()
if !c.keepOrder {
return c.writeBuff(buff)
}
c.addToCache(chunk.Id, buff)
return c.writeCache()
}
func (c *ConcurrentWriter) addToCache(id int, buff []byte) {
c.cache[id] = make([]byte, len(buff))
copy(c.cache[id], buff)
}
func (c *ConcurrentWriter) writeCache() error {
for {
currentIndex := c.lastChunkWritten + 1
buff, set := c.cache[currentIndex]
if !set {
return nil
}
if err := c.writeBuff(buff); err != nil {
return err
}
delete(c.cache, currentIndex)
c.lastChunkWritten++
}
}
func (c *ConcurrentWriter) writeBuff(buff []byte) error {
if len(buff) == 0 {
return nil
}
if c.firstWrite {
c.firstWrite = false
} else {
if _, err := c.handle.Write([]byte{'\n'}); err != nil {
return err
}
}
if _, err := c.handle.Write(buff); err != nil {
return err
}
return nil
}