Skip to content

Commit

Permalink
bebop: fix concurrent map writes (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
gen2thomas authored Feb 12, 2024
1 parent cb1f952 commit 244f699
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 81 deletions.
118 changes: 37 additions & 81 deletions platforms/parrot/bebop/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,50 +107,6 @@ func NewNetworkFrame(buf []byte) NetworkFrame {
return frame
}

func networkFrameGenerator() func(*bytes.Buffer, byte, byte) *bytes.Buffer {
// func networkFrameGenerator() func(*bytes.Buffer, byte, byte) NetworkFrame {
//
// ARNETWORKAL_Frame_t
//
// uint8 type - frame type ARNETWORK_FRAME_TYPE
// uint8 id - identifier of the buffer sending the frame
// uint8 seq - sequence number of the frame
// uint32 size - size of the frame
//

// each frame id has it's own sequence number
seq := make(map[byte]byte)

hlen := 7 // size of ARNETWORKAL_Frame_t header

return func(cmd *bytes.Buffer, frameType byte, id byte) *bytes.Buffer {
if _, ok := seq[id]; !ok {
seq[id] = 0
}

seq[id]++

if seq[id] > 255 {
seq[id] = 0
}

ret := &bytes.Buffer{}
ret.WriteByte(frameType)
ret.WriteByte(id)
ret.WriteByte(seq[id])

size := &bytes.Buffer{}
if err := binary.Write(size, binary.LittleEndian, uint32(cmd.Len()+hlen)); err != nil {
panic(err)
}

ret.Write(size.Bytes())
ret.Write(cmd.Bytes())

return ret
}
}

type Pcmd struct {
Flag int
Roll int
Expand All @@ -161,33 +117,33 @@ type Pcmd struct {
}

type Bebop struct {
IP string
NavData map[string]string
Pcmd Pcmd
tmpFrame tmpFrame
C2dPort int
D2cPort int
RTPStreamPort int
RTPControlPort int
DiscoveryPort int
c2dClient *net.UDPConn
d2cClient *net.UDPConn
discoveryClient *net.TCPConn
networkFrameGenerator func(*bytes.Buffer, byte, byte) *bytes.Buffer
video chan []byte
writeChan chan []byte
IP string
NavData map[string]string
Pcmd Pcmd
tmpFrame tmpFrame
C2dPort int
D2cPort int
RTPStreamPort int
RTPControlPort int
DiscoveryPort int
c2dClient *net.UDPConn
d2cClient *net.UDPConn
discoveryClient *net.TCPConn
nwFrameGenerator *nwFrameGenerator
video chan []byte
writeChan chan []byte
}

func New() *Bebop {
return &Bebop{
IP: "192.168.42.1",
NavData: make(map[string]string),
C2dPort: 54321,
D2cPort: 43210,
RTPStreamPort: 55004,
RTPControlPort: 55005,
DiscoveryPort: 44444,
networkFrameGenerator: networkFrameGenerator(),
IP: "192.168.42.1",
NavData: make(map[string]string),
C2dPort: 54321,
D2cPort: 43210,
RTPStreamPort: 55004,
RTPControlPort: 55005,
DiscoveryPort: 44444,
nwFrameGenerator: newNetworkFrameGenerator(),
Pcmd: Pcmd{
Flag: 0,
Roll: 0,
Expand Down Expand Up @@ -332,7 +288,7 @@ func (b *Bebop) FlatTrim() error {

cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) GenerateAllStates() error {
Expand All @@ -352,7 +308,7 @@ func (b *Bebop) GenerateAllStates() error {

cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) TakeOff() error {
Expand All @@ -372,7 +328,7 @@ func (b *Bebop) TakeOff() error {

cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) Land() error {
Expand All @@ -392,7 +348,7 @@ func (b *Bebop) Land() error {

cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) Up(val int) error {
Expand Down Expand Up @@ -516,7 +472,7 @@ func (b *Bebop) generatePcmd() *bytes.Buffer {
}
cmd.Write(tmp.Bytes())

return b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID)
return b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID)
}

func (b *Bebop) createAck(frame NetworkFrame) *bytes.Buffer {
Expand All @@ -528,14 +484,14 @@ func (b *Bebop) createAck(frame NetworkFrame) *bytes.Buffer {
// libARNetwork/Sources/ARNETWORK_Manager.h#ARNETWORK_Manager_IDOutputToIDAck
//

return b.networkFrameGenerator(bytes.NewBuffer([]byte{uint8(frame.Seq)}),
return b.nwFrameGenerator.generate(bytes.NewBuffer([]byte{uint8(frame.Seq)}),
ARNETWORKAL_FRAME_TYPE_ACK,
byte(uint16(frame.Id)+(ARNETWORKAL_MANAGER_DEFAULT_ID_MAX/2)),
)
}

func (b *Bebop) createPong(frame NetworkFrame) *bytes.Buffer {
return b.networkFrameGenerator(bytes.NewBuffer(frame.Data),
return b.nwFrameGenerator.generate(bytes.NewBuffer(frame.Data),
ARNETWORKAL_FRAME_TYPE_DATA,
ARNETWORK_MANAGER_INTERNAL_BUFFER_ID_PONG,
)
Expand Down Expand Up @@ -579,13 +535,13 @@ func (b *Bebop) packetReceiver(buf []byte) {
func (b *Bebop) StartRecording() error {
buf := b.videoRecord(ARCOMMANDS_ARDRONE3_MEDIARECORD_VIDEO_RECORD_START)

return b.write(b.networkFrameGenerator(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) StopRecording() error {
buf := b.videoRecord(ARCOMMANDS_ARDRONE3_MEDIARECORD_VIDEO_RECORD_STOP)

return b.write(b.networkFrameGenerator(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) videoRecord(state byte) *bytes.Buffer {
Expand Down Expand Up @@ -650,7 +606,7 @@ func (b *Bebop) HullProtection(protect bool) error {
}
cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) Outdoor(outdoor bool) error {
Expand Down Expand Up @@ -679,7 +635,7 @@ func (b *Bebop) Outdoor(outdoor bool) error {
}
cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) VideoEnable(enable bool) error {
Expand All @@ -704,7 +660,7 @@ func (b *Bebop) VideoEnable(enable bool) error {
}
cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func (b *Bebop) VideoStreamMode(mode int8) error {
Expand All @@ -729,7 +685,7 @@ func (b *Bebop) VideoStreamMode(mode int8) error {
}
cmd.Write(tmp.Bytes())

return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
}

func bool2int8(b bool) int8 {
Expand Down Expand Up @@ -826,5 +782,5 @@ func (b *Bebop) createARStreamACK(frame ARStreamFrame) *bytes.Buffer {
}
ackPacket.Write(tmp.Bytes())

return b.networkFrameGenerator(ackPacket, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_VIDEO_ACK_ID)
return b.nwFrameGenerator.generate(ackPacket, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_VIDEO_ACK_ID)
}
63 changes: 63 additions & 0 deletions platforms/parrot/bebop/client/networkframegenerator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"bytes"
"encoding/binary"
"sync"
)

type nwFrameGenerator struct {
seq map[byte]byte
hlen int
mutex *sync.Mutex
}

func newNetworkFrameGenerator() *nwFrameGenerator {
nwg := nwFrameGenerator{
seq: make(map[byte]byte), // each frame id has it's own sequence number
hlen: 7, // size of ARNETWORKAL_Frame_t header
mutex: &sync.Mutex{},
}
return &nwg
}

// generate the "NetworkFrame" as bytes buffer
func (nwg *nwFrameGenerator) generate(cmd *bytes.Buffer, frameType byte, id byte) *bytes.Buffer {
nwg.mutex.Lock()
defer nwg.mutex.Unlock()

// func networkFrameGenerator() func(*bytes.Buffer, byte, byte) NetworkFrame {
//
// ARNETWORKAL_Frame_t
//
// uint8 type - frame type ARNETWORK_FRAME_TYPE
// uint8 id - identifier of the buffer sending the frame
// uint8 seq - sequence number of the frame
// uint32 size - size of the frame
//

if _, ok := nwg.seq[id]; !ok {
nwg.seq[id] = 0
}

nwg.seq[id]++

if nwg.seq[id] > 255 {
nwg.seq[id] = 0
}

ret := &bytes.Buffer{}
ret.WriteByte(frameType)
ret.WriteByte(id)
ret.WriteByte(nwg.seq[id])

size := &bytes.Buffer{}
if err := binary.Write(size, binary.LittleEndian, uint32(cmd.Len()+nwg.hlen)); err != nil {
panic(err)
}

ret.Write(size.Bytes())
ret.Write(cmd.Bytes())

return ret
}

0 comments on commit 244f699

Please sign in to comment.