Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/k8s.io/client-go-0.29.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tty47 committed Apr 7, 2024
2 parents 05c5790 + bde327f commit bea9bad
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 41 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type MutualPeer struct {
// Peer represents a peer structure.
type Peer struct {
NodeName string `yaml:"nodeName"` // NodeName name of the sts/deployment
ServiceName string `yaml:"serviceName,omitempty"` // ServiceName name of the service
NodeType string `yaml:"nodeType"` // NodeType specify the type of node
Namespace string `yaml:"namespace,omitempty"` // Namespace of the node
ContainerName string `yaml:"containerName,omitempty"` // ContainerName name of the main container
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A=
k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA=
k8s.io/apimachinery v0.29.0 h1:+ACVktwyicPz0oc6MTMLwa2Pw3ouLAfAon1wPLtG48o=
k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis=
k8s.io/client-go v0.29.0 h1:KmlDtFcrdUzOYrBhXHgKw5ycWzc3ryPX5mQe0SkG3y8=
k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38=
k8s.io/client-go v0.27.4 h1:vj2YTtSJ6J4KxaC88P4pMPEQECWMY8gqPqsTgUKzvjk=
k8s.io/client-go v0.27.4/go.mod h1:ragcly7lUlN0SRPk5/ZkGnDjPknzb37TICq07WhI6Xc=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
Expand Down
51 changes: 43 additions & 8 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,25 @@ func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) er
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
log.Info("Context canceled, stopping metrics watch process.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
hashMetricsErr := GenerateHashMetrics(cfg)
consensusMetricsErr := ConsNodesIDs(cfg)

// Check if both metrics generation are successful
if hashMetricsErr == nil && consensusMetricsErr == nil {
log.Info("Metrics generated successfully, stopping the process...")
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)
// Log errors if they occur
if hashMetricsErr != nil {
log.Error("Error generating hash metrics: ", hashMetricsErr)
}
if consensusMetricsErr != nil {
log.Error("Error generating consensus node ID metrics: ", consensusMetricsErr)
}

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
Expand Down Expand Up @@ -267,6 +273,35 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
return nil
}

// ConsNodesIDs generates the metric with the consensus nodes ids.
func ConsNodesIDs(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the consensus nodes ids...")

for _, mutualPeer := range cfg.MutualPeers {
for _, peer := range mutualPeer.Peers {
if peer.NodeType == "consensus" {
consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName)
if err != nil {
log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err)
return err
}

err = metrics.RegisterConsensusNodeMetric(
consNodeId,
peer.ServiceName,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err)
return err
}
}
}
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
func RegisterMetrics(cfg config.MutualPeersConfig) error {
red := redis.InitRedisConfig()
Expand Down
39 changes: 39 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,42 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error {
_, err = meter.RegisterCallback(callback, loadBalancersGauge)
return err
}

// ConsensusNodeMetric represents the information for consensus node metrics.
type ConsensusNodeMetric struct {
NodeName string // NodeName is the name of the node.
NodeID string // NodeID is the ID of the node.
Namespace string // Namespace of the node.
}

// RegisterConsensusNodeMetric creates and registers metrics for consensus nodes.
func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error {
log.Info("Registering metric for consensus node: ", nodeName)

// Create an ObservableGauge for consensus node metrics.
consensusNodeGauge, err := meter.Float64ObservableGauge(
"consensus_node_ids_metric",
metric.WithDescription("Metric for Consensus Node IDs"),
)
if err != nil {
log.Fatalf("Error creating metric: ", err)
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
labels := metric.WithAttributes(
attribute.String("node_name", nodeName),
attribute.String("node_id", nodeID),
attribute.String("namespace", namespace),
)
// Observe the value for current consensus node metrics with associated labels.
observer.ObserveFloat64(consensusNodeGauge, 1, labels)

return nil
}

// Register the callback with the meter and the ObservableGauge.
_, err = meter.RegisterCallback(callback, consensusNodeGauge)
return err
}
83 changes: 52 additions & 31 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodes

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -32,57 +33,77 @@ func SetConsNodeDefault(peer config.Peer) config.Peer {
return peer
}

// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode
// makes a request to the API and gets the info about the genesis and return it
// GenesisHash connects to the specified consensus node, makes a request to the API,
// and retrieves information about the genesis block including its hash and time.
func GenesisHash(consensusNode string) (string, string, error) {
url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", "", err
}

blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", errors.New("error accessing block ID hash")
}

blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", errors.New("error accessing block time")
}

return blockIDHash, blockTime, nil
}

// ConsensusNodesIDs connects to the specified consensus node, makes a request to the API,
// and retrieves the node ID from the status response.
func ConsensusNodesIDs(consensusNode string) (string, error) {
url := fmt.Sprintf("http://%s:26657/status?", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", err
}

nodeID, ok := jsonResponse["result"].(map[string]interface{})["node_info"].(map[string]interface{})["id"].(string)
if !ok {
log.Error("Unable to access .result.node_info.id")
return "", errors.New("error accessing node ID")
}

log.Info("Consensus Node [", consensusNode, "] ID: [", nodeID, "]")

return nodeID, nil
}

// makeAPIRequest handles the common task of making an HTTP request to a given URL
// and parsing the JSON response. It returns a map representing the JSON response or an error.
func makeAPIRequest(url string) (map[string]interface{}, error) {
response, err := http.Get(url)
if err != nil {
log.Error("Error making the request to the node [", consensusNode, "] - ", err)
return "", "", err
log.Error("Error making the request: ", err)
return nil, err
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
log.Error("Non-OK response:", response.Status)
return "", "", err
return nil, err
}

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Error reading response body:", err)
return "", "", err
return nil, err
}

bodyString := string(bodyBytes)
log.Info("Response Body: ", bodyString)

// Parse the JSON response into a generic map
var jsonResponse map[string]interface{}
err = json.Unmarshal([]byte(bodyString), &jsonResponse)
err = json.Unmarshal(bodyBytes, &jsonResponse)
if err != nil {
log.Error("Error parsing JSON:", err)
return "", "", err
}

// Access and print the .block_id.hash field
blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", err
return nil, err
}

// Access and print the .block.header.time field
blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", err
}

log.Info("Block ID Hash: ", blockIDHash)
log.Info("Block Time: ", blockTime)
log.Info("Full output: ", bodyString)

return blockIDHash, blockTime, nil
return jsonResponse, nil
}

0 comments on commit bea9bad

Please sign in to comment.