Skip to content

Commit

Permalink
Simple implementation of SeedLink buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Jan 20, 2024
1 parent 99da461 commit e23683a
Show file tree
Hide file tree
Showing 93 changed files with 327 additions and 187 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v2.6.0

- Simple implementation of SeedLink buffer
- Add PowerShell frontend build script for Windows
- Replace CWB to CWA in earthquake event data source API

## v2.5.5

- Allows querying waveform within 24 hours in JSON format
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.5.5
v2.6.0
46 changes: 41 additions & 5 deletions driver/seedlink/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package seedlink

import (
"net"
"time"

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/publisher"
Expand All @@ -14,18 +15,53 @@ type END struct{}
func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
cl.StreamMode = true // Enter stream mode
var (
seqNum int64 = 0
channels = cl.Channels
location = cl.Location
station = text.TruncateString(cl.Station, 5)
network = text.TruncateString(cl.Network, 2)
seqNum int64 = 0
channels = cl.Channels
location = cl.Location
endTime = cl.EndTime
startTime = cl.StartTime
station = text.TruncateString(cl.Station, 5)
network = text.TruncateString(cl.Network, 2)
)

if startTime.IsZero() {
_, err := conn.Write([]byte(RES_ERR))
return err
}

// Send data in buffer
for _, buffer := range sl.SeedLinkBuffer.Data {
chMap := map[string]publisher.Int32Array{
"EHZ": buffer.EHZ, "EHE": buffer.EHE, "EHN": buffer.EHN,
}
for _, channel := range channels {
if data, ok := chMap[channel]; ok {
bufTime := time.UnixMilli(buffer.TS)
if bufTime.After(startTime) && bufTime.Before(endTime) {
dataBytes, err := CreateSLPacket(data, buffer.TS, seqNum, network, station, channel, location)
if err != nil {
return err
}

_, err = conn.Write(dataBytes)
if err != nil {
return err
}

seqNum++
}
}
}
}

// Subscribe to the publisher
go publisher.Subscribe(
&options.Status.Geophone, &cl.StreamMode,
func(gp *publisher.Geophone) error {
return streamer(gp, conn, channels, network, station, location, &seqNum)
},
)

return nil
}

Expand Down
40 changes: 40 additions & 0 deletions driver/seedlink/packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package seedlink

import (
"fmt"
"time"

"github.com/bclswl0827/mseedio"
)

func CreateSLPacket(count []int32, ts, seq int64, network, station, channel, location string) ([]byte, error) {
// Generate MiniSEED, send to client
var miniseed mseedio.MiniSeedData
// Init header fields
miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST)

// Append MiniSEED data
err := miniseed.Append(count, &mseedio.AppendOptions{
StationCode: station,
LocationCode: location,
ChannelCode: channel,
NetworkCode: network,
SampleRate: float64(len(count)),
StartTime: time.UnixMilli(ts).UTC(),
SequenceNumber: fmt.Sprintf("%06d", seq),
})
if err != nil {
return nil, err
}

// Get MiniSEED bytes
miniseed.Series[0].BlocketteSection.RecordLength = 9
dataBytes, err := miniseed.Encode(mseedio.OVERWRITE, mseedio.MSBFIRST)
if err != nil {
return nil, err
}

// Return SeedLink packet
slSeq := []byte(fmt.Sprintf("SL%06X", seq))
return append(slSeq, dataBytes...), nil
}
73 changes: 71 additions & 2 deletions driver/seedlink/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,88 @@ package seedlink

import (
"net"
"strconv"
"strings"
"time"

"github.com/anyshake/observer/feature"
)

type TIME struct{}

// Callback of "TIME <...>" command, implements SeedLinkCommandCallback interface
func (*TIME) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
_, err := conn.Write([]byte(RES_OK))
func (t *TIME) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
resCode := RES_OK
switch len(args) {
case 2:
endTime, err := t.getTimeFromArg(args[1])
if err != nil {
resCode = RES_ERR
} else {
cl.EndTime = endTime
}
fallthrough
case 1:
startTime, err := t.getTimeFromArg(args[0])
if err != nil {
resCode = RES_ERR
} else {
cl.StartTime = startTime
}
default:
resCode = RES_ERR
}

_, err := conn.Write([]byte(resCode))
return err
}

// Fallback of "TIME <...>" command, implements SeedLinkCommandCallback interface
func (*TIME) Fallback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, conn net.Conn, args ...string) {
conn.Close()
}

func (*TIME) getTimeFromArg(timeStr string) (time.Time, error) {
if len(timeStr) != 19 {
return time.Time{}, nil
}
splitTimeStr := strings.Split(timeStr, ",")
if len(splitTimeStr) != 6 {
return time.Time{}, nil
}

// Format: YYYY,MM,DD,hh,mm,ss
// Example: 2024,01,16,07,15,16
year, err := strconv.Atoi(splitTimeStr[0])
if err != nil {
return time.Time{}, err
}

monthInt, err := strconv.Atoi(splitTimeStr[1])
if err != nil {
return time.Time{}, err
}

month := time.Month(monthInt)
day, err := strconv.Atoi(splitTimeStr[2])
if err != nil {
return time.Time{}, err
}

hour, err := strconv.Atoi(splitTimeStr[3])
if err != nil {
return time.Time{}, err
}

minute, err := strconv.Atoi(splitTimeStr[4])
if err != nil {
return time.Time{}, err
}

second, err := strconv.Atoi(splitTimeStr[5])
if err != nil {
return time.Time{}, err
}

return time.Date(year, month, day, hour, minute, second, 0, time.UTC), nil
}
7 changes: 7 additions & 0 deletions driver/seedlink/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ const (
// SeedLink main daemon config & state
type SeedLinkGlobal struct {
SeedLinkState
SeedLinkBuffer
Streams []SeedLinkStream
Stations []SeedLinkStation
Capabilities []SeedLinkCapability
}

// SeedLink data buffer
type SeedLinkBuffer struct {
Size int
Data []publisher.Geophone
}

// SeedLink basic state
type SeedLinkState struct {
Software string
Expand Down
2 changes: 1 addition & 1 deletion feature/geophone/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (g *Geophone) Read(port io.ReadWriteCloser, conf *config.Conf, packet *Pack

// Read data frame
checksumLen := len(packet.Checksum)
buf := make([]byte, g.getSize(packetLen, checksumLen))
buf := make([]byte, g.getPacketSize(packetLen, checksumLen))
n, err := serial.Read(port, buf, TIMEOUT_THRESHOLD)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion feature/geophone/size.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package geophone

func (g *Geophone) getSize(packetLen, checksumLen int) int {
func (g *Geophone) getPacketSize(packetLen, checksumLen int) int {
// channelLen*packetLen*int32 + checksumLen + 1
return checksumLen*packetLen*4 + checksumLen + 1
}
6 changes: 3 additions & 3 deletions feature/miniseed/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (m *MiniSEED) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGrou
// Wait for time syncing
for !options.Status.IsReady {
logger.Print(MODULE, "waiting for time alignment", color.FgYellow, false)
time.Sleep(50 * time.Millisecond)
time.Sleep(1 * time.Second)
}

// Init MiniSEED archiving buffer
currentTime, _ := duration.Timestamp(options.Status.System.Offset)
miniSEEDBuffer := &miniSEEDBuffer{
miniSEEDBuffer := &publisher.SegmentBuffer{
TimeStamp: currentTime,
ChannelBuffer: map[string]*channelBuffer{
ChannelBuffer: map[string]*publisher.ChannelSegmentBuffer{
"EHZ": {}, "EHE": {}, "EHN": {},
},
}
Expand Down
2 changes: 1 addition & 1 deletion feature/miniseed/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/bclswl0827/mseedio"
)

func (m *MiniSEED) handleMessage(gp *publisher.Geophone, options *feature.FeatureOptions, buffer *miniSEEDBuffer) error {
func (m *MiniSEED) handleMessage(gp *publisher.Geophone, options *feature.FeatureOptions, buffer *publisher.SegmentBuffer) error {
var (
basePath = options.Config.MiniSEED.Path
timestamp = time.UnixMilli(gp.TS).UTC()
Expand Down
15 changes: 1 addition & 14 deletions feature/miniseed/types.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,15 @@
package miniseed

import (
"time"

"github.com/bclswl0827/mseedio"
)

const MODULE string = "miniseed"

const (
MAX_DURATION float64 = 4.0
MAX_DURATION float64 = 3.0
BIT_ORDER int = mseedio.MSBFIRST
ENCODING_TYPE int = mseedio.STEIM2
)

type MiniSEED struct{}

type channelBuffer struct {
DataBuffer []int32
Samples int32
SeqNum int64
}

type miniSEEDBuffer struct {
TimeStamp time.Time
ChannelBuffer map[string]*channelBuffer
}
17 changes: 17 additions & 0 deletions feature/seedlink/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package seedlink

import (
"github.com/anyshake/observer/driver/seedlink"
"github.com/anyshake/observer/publisher"
)

func (s *SeedLink) handleBuffer(gp *publisher.Geophone, buffer *seedlink.SeedLinkBuffer) error {
if len(buffer.Data) < buffer.Size {
buffer.Data = append(buffer.Data, *gp)
} else {
buffer.Data = append(buffer.Data[1:], *gp)
}

s.OnReady(nil, "1 record added to buffer")
return nil
}
4 changes: 3 additions & 1 deletion feature/seedlink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (s *SeedLink) handleCommand(options *feature.FeatureOptions, slGlobal *seed
}

// Exit from stream mode
if clientMessage != "END" {
if clientMessage != "END" &&
// An exception for INFO command
!strings.Contains(clientMessage, "INFO ") {
slClient.StreamMode = false
}

Expand Down
7 changes: 7 additions & 0 deletions feature/seedlink/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/anyshake/observer/driver/seedlink"
"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/publisher"
"github.com/anyshake/observer/utils/logger"
"github.com/anyshake/observer/utils/text"
"github.com/fatih/color"
Expand Down Expand Up @@ -60,6 +61,12 @@ func (s *SeedLink) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGrou
}
}()

// Subscribe to publisher to append buffer
expressionForSubscribe := true
go publisher.Subscribe(&options.Status.Geophone, &expressionForSubscribe, func(gp *publisher.Geophone) error {
return s.handleBuffer(gp, &slGlobal.SeedLinkBuffer)
})

// Receive interrupt signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
Expand Down
2 changes: 2 additions & 0 deletions feature/seedlink/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (s *SeedLink) InitGlobal(slGlobal *seedlink.SeedLinkGlobal, currentTime tim
{Name: "window-extraction"}, {Name: "info:connections"},
{Name: "info:capabilities"}, {Name: "info:stations"},
}
// Station field are not used by SeedLink, but are required by the protocol to differentiate between stations
slGlobal.SeedLinkBuffer = seedlink.SeedLinkBuffer{Size: SEEDLINK_BUFFERSIZE}
slGlobal.Streams = []seedlink.SeedLinkStream{
{BeginTime: currentTimeString, EndTime: streamEndTimeString, SeedName: "EHZ", Location: location, Type: "D", Station: station},
{BeginTime: currentTimeString, EndTime: streamEndTimeString, SeedName: "EHE", Location: location, Type: "D", Station: station},
Expand Down
Loading

0 comments on commit e23683a

Please sign in to comment.