Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement start command #13

Merged
merged 13 commits into from
Jan 17, 2024
2 changes: 1 addition & 1 deletion cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var deleteCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
err, defaultConfig := config.NewDefaultConfig()
if err != nil {
log.Fatalf("Error creating config: %v\n", err)
log.Fatalf("Error reading/creating config: %v\n", err)
}
helper := helpers.NewDefaultHelper()
command := command.NewDefaultSetCommand(Node, Interface, defaultConfig.GetValue(helper.GetDefaultClabNameKey()))
Expand Down
2 changes: 1 addition & 1 deletion cmd/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var setCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
err, defaultConfig := config.NewDefaultConfig()
if err != nil {
log.Fatalf("Error creating config: %v\n", err)
log.Fatalf("Error reading/creating config: %v\n", err)
}
helper := helpers.NewDefaultHelper()
command := command.NewDefaultSetCommand(Node, Interface, defaultConfig.GetValue(helper.GetDefaultClabNameKey()))
Expand Down
2 changes: 1 addition & 1 deletion cmd/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var showCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
err, defaultConfig := config.NewDefaultConfig()
if err != nil {
log.Fatalf("Error creating config: %v\n", err)
log.Fatalf("Error reading/creating config: %v\n", err)
}
helper := helpers.NewDefaultHelper()
command := command.NewDefaultShowCommand(Node, defaultConfig.GetValue(helper.GetDefaultClabNameKey()))
Expand Down
62 changes: 62 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cmd

import (
"os"
"os/signal"

"github.com/hawkv6/clab-telemetry-linker/pkg/config"
"github.com/hawkv6/clab-telemetry-linker/pkg/consumer"
"github.com/hawkv6/clab-telemetry-linker/pkg/helpers"
"github.com/hawkv6/clab-telemetry-linker/pkg/processor"
"github.com/hawkv6/clab-telemetry-linker/pkg/publisher"
"github.com/hawkv6/clab-telemetry-linker/pkg/service"
"github.com/spf13/cobra"
)

var (
KafkaBroker string
ReceiverTopic string
PublisherTopic string
)

var startCmd = &cobra.Command{
Use: "start",
Short: "Start processing the telemetry data",
Run: func(cmd *cobra.Command, args []string) {
err, defaultConfig := config.NewDefaultConfig()
if err != nil {
log.Fatalf("Error creating config: %v\n", err)
}
if err := defaultConfig.WatchConfigChange(); err != nil {
log.Fatalf("Error watching config change: %v\n", err)
}
unprocessedMsgChan := make(chan consumer.Message)
processedMsgChan := make(chan consumer.Message)
consumer := consumer.NewKafkaConsumer(KafkaBroker, ReceiverTopic, unprocessedMsgChan)
if err := consumer.Init(); err != nil {
log.Fatalf("Error initializing receiver: %v\n", err)
}
publisher := publisher.NewKafkaPublisher(KafkaBroker, PublisherTopic, processedMsgChan)
if err := publisher.Init(); err != nil {
log.Fatalf("Error initializing publisher: %v\n", err)
}
processor := processor.NewDefaultProcessor(defaultConfig, unprocessedMsgChan, processedMsgChan, helpers.NewDefaultHelper())

defaultService := service.NewDefaultService(defaultConfig, consumer, processor, publisher)
defaultService.Start()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)

<-signalChan
log.Info("Received interrupt signal, shutting down")
defaultService.Stop()
},
}

func init() {
rootCmd.AddCommand(startCmd)
startCmd.Flags().StringVarP(&KafkaBroker, "broker", "b", "", "kafka broker to connect to e.g. localhost:9092")
startCmd.Flags().StringVarP(&ReceiverTopic, "receiver-topic", "r", "", "topic where messages are received")
startCmd.Flags().StringVarP(&PublisherTopic, "publisher-topic", "p", "", "topic where messages are received")
markRequiredFlags(startCmd, []string{"broker", "receiver-topic", "publisher-topic"})
}
10 changes: 9 additions & 1 deletion codecov.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
ignore:
- "**/*_mock.go"
- "**/*_mock.go"
coverage:
status:
patch:
default:
target: 80%
project:
default:
target: 80%
15 changes: 15 additions & 0 deletions example-messages/example-bandwidth-msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"fields": {
"interface_status_and_data/enabled/bandwidth": 1000000
},
"name": "isis",
"tags": {
"host": "telegraf",
"instance_name": "1",
"interface_name": "GigabitEthernet0/0/0/0",
"path": "Cisco-IOS-XR-clns-isis-oper:isis/instances/instance/interfaces/interface",
"source": "XR-1",
"subscription": "hawk-metrics"
},
"timestamp": 1704728369
}
18 changes: 18 additions & 0 deletions example-messages/example-delay-msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"fields": {
"delay_measurement_session/last_advertisement_information/advertised_values/average": 10000,
"delay_measurement_session/last_advertisement_information/advertised_values/maximum": 10000,
"delay_measurement_session/last_advertisement_information/advertised_values/minimum": 10000,
"delay_measurement_session/last_advertisement_information/advertised_values/variance": 0
},
"name": "performance_monitoring",
"tags": {
"host": "telegraf",
"interface_name": "GigabitEthernet0/0/0/1",
"node": "0/RP0/CPU0",
"path": "Cisco-IOS-XR-perf-meas-oper:performance-measurement/nodes/node/interfaces/interface-details/interface-detail",
"source": "XR-1",
"subscription": "hawk-metrics"
},
"timestamp": 1704728135
}
15 changes: 15 additions & 0 deletions example-messages/example-loss-msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"fields": {
"interface_status_and_data/enabled/packet_loss_percentage": 0
},
"name": "isis",
"tags": {
"host": "telegraf",
"instance_name": "1",
"interface_name": "GigabitEthernet0/0/0/0",
"path": "Cisco-IOS-XR-clns-isis-oper:isis/instances/instance/interfaces/interface",
"source": "XR-1",
"subscription": "hawk-metrics"
},
"timestamp": 1704728296
}
15 changes: 15 additions & 0 deletions example-messages/example-utilization-msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"fields": {
"in_octets": 47912820356,
"out_octets": 1864216230
},
"name": "utilization",
"tags": {
"host": "telegraf",
"name": "GigabitEthernet0/0/0/0",
"path": "openconfig-interfaces:interfaces/interface/state/counters",
"source": "XR-1",
"subscription": "hawk-metrics"
},
"timestamp": 1704728433
}
19 changes: 19 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,34 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
)

require (
github.com/IBM/sarama v1.42.1
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/knadh/koanf v1.5.0
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/sirupsen/logrus v1.9.3
Expand Down
Loading