Skip to content

Commit

Permalink
Higher MiniSEED compression ratio
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Nov 11, 2023
1 parent c4b4878 commit b5b4a72
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 109 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.1.25p
v2.1.24p
7 changes: 6 additions & 1 deletion feature/miniseed/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ func (m *MiniSEED) OnStop(options *feature.FeatureOptions, v ...any) {
}

func (m *MiniSEED) OnReady(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, "1 record has been written", color.FgGreen, false)
switch v[0].(string) {
case "append":
logger.Print(MODULE, "1 record has been append", color.FgGreen, false)
case "write":
logger.Print(MODULE, "1 record has been written", color.FgGreen, false)
}
}

func (m *MiniSEED) OnError(options *feature.FeatureOptions, err error) {
Expand Down
71 changes: 67 additions & 4 deletions feature/miniseed/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package miniseed

import (
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/bclswl0827/mseedio"
"github.com/bclswl0827/observer/feature"
"github.com/bclswl0827/observer/publisher"
"github.com/bclswl0827/observer/utils/duration"
"github.com/bclswl0827/observer/utils/logger"
"github.com/fatih/color"
)

func (m *MiniSEED) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGroup) {
Expand All @@ -27,18 +34,74 @@ func (m *MiniSEED) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGrou
go m.handleCleanup(basePath, station, network, lifeCycle)
}

// Init sequence number
var seqNumber int
// Wait for time syncing
for !options.Status.IsReady {
logger.Print(MODULE, "waiting for time alignment", color.FgYellow, false)
time.Sleep(50 * time.Millisecond)
}

// Get initial file path
currentTime, _ := duration.Timestamp(options.Status.System.Offset)
filePath := fmt.Sprintf(
"%s/%s_%s_%s.mseed",
basePath, station, network,
currentTime.Format("20060102"),
)

// Get sequence number if file exists
var seqNumber int64
_, err := os.Stat(filePath)
if err == nil {
// Get last sequence number
logger.Print(MODULE, "starting from last record", color.FgYellow, false)

// Read MiniSEED file
var ms mseedio.MiniSeedData
err := ms.Read(filePath)
if err != nil {
m.OnError(options, err)
return
}

// Get last sequence number
recordLength := len(ms.Series)
if recordLength > 0 {
lastRecord := ms.Series[recordLength-1]
lastSeqNum := lastRecord.FixedSection.SequenceNumber
n, err := strconv.Atoi(lastSeqNum)
if err != nil {
m.OnError(options, err)
return
}
// Set current sequence number
seqNumber = int64(n)
}
} else {
// Create new file with sequence number 0
logger.Print(MODULE, "starting from a new file", color.FgYellow, false)
}

// Init MiniSEED archiving buffer
buffer := &miniSEEDBuffer{
TimeStamp: currentTime,
SeqNum: seqNumber,
EHZ: &channelBuffer{},
EHE: &channelBuffer{},
EHN: &channelBuffer{},
BasePath: options.Config.MiniSEED.Path,
Station: options.Config.MiniSEED.Station,
Network: options.Config.MiniSEED.Network,
}
m.OnStart(options, "service has started")

// Append and write when new message arrived
publisher.Subscribe(
&options.Status.Geophone,
func(gp *publisher.Geophone) error {
return m.handleMessage(gp, options, seqNumber, basePath, station, network)
return m.handleMessage(gp, options, buffer)
},
)

err := fmt.Errorf("service exited with an error")
err = fmt.Errorf("service exited with an error")
m.OnError(options, err)
}
181 changes: 85 additions & 96 deletions feature/miniseed/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,109 @@ package miniseed

import (
"fmt"
"os"
"strconv"
"time"

"github.com/bclswl0827/mseedio"
"github.com/bclswl0827/observer/feature"
"github.com/bclswl0827/observer/publisher"
)

func (m *MiniSEED) handleMessage(gp *publisher.Geophone, options *feature.FeatureOptions, seqNumber int, basePath, station, network string) error {
func (m *MiniSEED) handleMessage(gp *publisher.Geophone, options *feature.FeatureOptions, buffer *miniSEEDBuffer) error {
var (
ehz = gp.EHZ
ehe = gp.EHE
ehn = gp.EHN
ts = time.UnixMilli(gp.TS).UTC()
ehz = gp.EHZ
ehe = gp.EHE
ehn = gp.EHN
station = options.Config.MiniSEED.Station
network = options.Config.MiniSEED.Network
timestamp = time.UnixMilli(gp.TS).UTC()
)

// Init MiniSEED library
var miniseed mseedio.MiniSeedData
miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST)
// Append EHZ channel to buffer
buffer.EHZ.DataBuffer = append(buffer.EHZ.DataBuffer, ehz...)
buffer.EHZ.SampleRate = (buffer.EHZ.SampleRate + int32(len(ehz))) / 2
// Append EHE channel to buffer
buffer.EHE.DataBuffer = append(buffer.EHE.DataBuffer, ehe...)
buffer.EHE.SampleRate = (buffer.EHE.SampleRate + int32(len(ehe))) / 2
// Append EHN channel to buffer
buffer.EHN.DataBuffer = append(buffer.EHN.DataBuffer, ehn...)
buffer.EHN.SampleRate = (buffer.EHN.SampleRate + int32(len(ehn))) / 2

// Get file name by date
filePath := fmt.Sprintf(
"%s/%s_%s_%s.mseed",
basePath, station, network,
ts.Format("20060102"),
)

// If file exists, check sequence number
_, err := os.Stat(filePath)
if err == nil && seqNumber == 0 {
// Read MiniSEED file
var ms mseedio.MiniSeedData
err := ms.Read(filePath)
if err != nil {
m.OnError(options, err)
return err
}

// Get last sequence number
recordLength := len(ms.Series)
if recordLength > 0 {
lastRecord := ms.Series[recordLength-1]
n, err := strconv.Atoi(lastRecord.FixedSection.SequenceNumber)
// Check if buffer is ready to write to file
if timestamp.Sub(buffer.TimeStamp).Seconds() >= MAX_DURATION {
// Init MiniSEED data
var miniseed mseedio.MiniSeedData
miniseed.Init(ENCODING_TYPE, BIT_ORDER)
// Set basic data
filePath := fmt.Sprintf(
"%s/%s_%s_%s.mseed", buffer.BasePath,
station, network, timestamp.Format("20060102"),
)
// Append channels to MiniSEED
for _, v := range []string{"EHZ", "EHE", "EHN"} {
var (
err error
seq = fmt.Sprintf("%06d", buffer.SeqNum)
)
switch v {
case "EHZ":
// Append EHZ channel
err = miniseed.Append(buffer.EHZ.DataBuffer, &mseedio.AppendOptions{
ChannelCode: v,
SequenceNumber: seq,
StationCode: station,
NetworkCode: network,
StartTime: buffer.TimeStamp,
SampleRate: float64(buffer.EHZ.SampleRate),
})
case "EHE":
// Append EHZ channel
err = miniseed.Append(buffer.EHE.DataBuffer, &mseedio.AppendOptions{
ChannelCode: v,
SequenceNumber: seq,
StationCode: station,
NetworkCode: network,
StartTime: buffer.TimeStamp,
SampleRate: float64(buffer.EHE.SampleRate),
})
case "EHN":
// Append EHZ channel
err = miniseed.Append(buffer.EHN.DataBuffer, &mseedio.AppendOptions{
ChannelCode: v,
SequenceNumber: seq,
StationCode: station,
NetworkCode: network,
StartTime: buffer.TimeStamp,
SampleRate: float64(buffer.EHN.SampleRate),
})
}
if err != nil {
m.OnError(options, err)
return err
} else {
buffer.SeqNum++
}
// Encode record to bytes
dataBytes, err := miniseed.Encode(mseedio.APPEND, BIT_ORDER)
if err != nil {
m.OnError(options, err)
return err
}
// Append bytes to file
err = miniseed.Write(filePath, mseedio.APPEND, dataBytes)
if err != nil {
m.OnError(options, err)
return err
}

// Set current sequence number
seqNumber = n
}
}

// Increments sequence number by 1
if seqNumber >= 999999 {
seqNumber = 0
// Reset buffer
m.OnReady(options, "write")
buffer.TimeStamp = timestamp
buffer.EHZ.DataBuffer = []int32{}
buffer.EHE.DataBuffer = []int32{}
buffer.EHN.DataBuffer = []int32{}
} else {
seqNumber++
}
seqNumberString := fmt.Sprintf("%06d", seqNumber)

// Append 3 channels
for i, v := range [][]int32{ehz, ehe, ehn} {
var err error
switch i {
case 0:
err = miniseed.Append(v, &mseedio.AppendOptions{
StartTime: ts,
ChannelCode: "EHZ",
StationCode: station,
NetworkCode: network,
SequenceNumber: seqNumberString,
SampleRate: float64(len(ehz) - 1),
})
case 1:
err = miniseed.Append(v, &mseedio.AppendOptions{
StartTime: ts,
ChannelCode: "EHE",
StationCode: station,
NetworkCode: network,
SequenceNumber: seqNumberString,
SampleRate: float64(len(ehe) - 1),
})
case 2:
err = miniseed.Append(v, &mseedio.AppendOptions{
StartTime: ts,
ChannelCode: "EHN",
StationCode: station,
NetworkCode: network,
SequenceNumber: seqNumberString,
SampleRate: float64(len(ehn) - 1),
})
}
if err != nil {
m.OnError(options, err)
return err
}

// Encode record to bytes
dataBytes, err := miniseed.Encode(mseedio.APPEND, mseedio.MSBFIRST)
if err != nil {
m.OnError(options, err)
return err
}

// Append bytes to file
err = miniseed.Write(filePath, mseedio.APPEND, dataBytes)
if err != nil {
m.OnError(options, err)
return err
}
m.OnReady(options, "append")
}

m.OnReady(options)
return nil
}
28 changes: 28 additions & 0 deletions feature/miniseed/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
package miniseed

import (
"time"

"github.com/bclswl0827/mseedio"
)

const MODULE string = "miniseed"

const (
MAX_DURATION float64 = 4.0
BIT_ORDER int = mseedio.MSBFIRST
ENCODING_TYPE int = mseedio.STEIM2
)

type MiniSEED struct{}

type channelBuffer struct {
SampleRate int32
DataBuffer []int32
}

type miniSEEDBuffer struct {
SeqNum int64
Station string
Network string
BasePath string
TimeStamp time.Time
EHZ *channelBuffer
EHE *channelBuffer
EHN *channelBuffer
}
4 changes: 2 additions & 2 deletions frontend/dist/asset-manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"files": {
"main.css": "/static/css/main.3d547ccc.css",
"main.js": "/static/js/main.51a038bb.js",
"main.js": "/static/js/main.1e2cac8d.js",
"static/css/278.525e2941.chunk.css": "/static/css/278.525e2941.chunk.css",
"static/js/278.f2bbf51e.chunk.js": "/static/js/278.f2bbf51e.chunk.js",
"static/js/967.8f28312d.chunk.js": "/static/js/967.8f28312d.chunk.js",
Expand Down Expand Up @@ -47,6 +47,6 @@
},
"entrypoints": [
"static/css/main.3d547ccc.css",
"static/js/main.51a038bb.js"
"static/js/main.1e2cac8d.js"
]
}
2 changes: 1 addition & 1 deletion frontend/dist/index.html
Original file line number Diff line number Diff line change
@@ -1 +1 @@
<!doctype html><html><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=no"/><link rel="icon" href="/favicon.ico"/><link rel="manifest" href="/manifest.json"/><script defer="defer" src="/static/js/main.51a038bb.js"></script><link href="/static/css/main.3d547ccc.css" rel="stylesheet"></head><body><div id="root"></div></body></html>
<!doctype html><html><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=no"/><link rel="icon" href="/favicon.ico"/><link rel="manifest" href="/manifest.json"/><script defer="defer" src="/static/js/main.1e2cac8d.js"></script><link href="/static/css/main.3d547ccc.css" rel="stylesheet"></head><body><div id="root"></div></body></html>

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions frontend/src/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
REACT_APP_VERSION=v2.1.25p
REACT_APP_RELEASE=2369c0e4-20231104195016
REACT_APP_VERSION=v2.1.24p
REACT_APP_RELEASE=c4b48783-20231111182031

0 comments on commit b5b4a72

Please sign in to comment.