Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torch): add new metric to expose the consensus nodes ids #37

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type MutualPeer struct {
// Peer represents a peer structure.
type Peer struct {
NodeName string `yaml:"nodeName"` // NodeName name of the sts/deployment
ServiceName string `yaml:"serviceName,omitempty"` // ServiceName name of the service
NodeType string `yaml:"nodeType"` // NodeType specify the type of node
Namespace string `yaml:"namespace,omitempty"` // Namespace of the node
ContainerName string `yaml:"containerName,omitempty"` // ContainerName name of the main container
Expand Down
51 changes: 43 additions & 8 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,25 @@ func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) er
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
log.Info("Context canceled, stopping metrics watch process.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
hashMetricsErr := GenerateHashMetrics(cfg)
consensusMetricsErr := ConsNodesIDs(cfg)

// Check if both metrics generation are successful
if hashMetricsErr == nil && consensusMetricsErr == nil {
log.Info("Metrics generated successfully, stopping the process...")
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)
// Log errors if they occur
if hashMetricsErr != nil {
log.Error("Error generating hash metrics: ", hashMetricsErr)
}
if consensusMetricsErr != nil {
log.Error("Error generating consensus node ID metrics: ", consensusMetricsErr)
}

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
Expand Down Expand Up @@ -267,6 +273,35 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
return nil
}

// ConsNodesIDs generates the metric with the consensus nodes ids.
func ConsNodesIDs(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the consensus nodes ids...")

for _, mutualPeer := range cfg.MutualPeers {
for _, peer := range mutualPeer.Peers {
if peer.NodeType == "consensus" {
consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName)
if err != nil {
log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err)
return err
}

err = metrics.RegisterConsensusNodeMetric(
consNodeId,
peer.ServiceName,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err)
return err
}
}
}
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
func RegisterMetrics(cfg config.MutualPeersConfig) error {
red := redis.InitRedisConfig()
Expand Down
39 changes: 39 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,42 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error {
_, err = meter.RegisterCallback(callback, loadBalancersGauge)
return err
}

// ConsensusNodeMetric represents the information for consensus node metrics.
type ConsensusNodeMetric struct {
NodeName string // NodeName is the name of the node.
NodeID string // NodeID is the ID of the node.
Namespace string // Namespace of the node.
}

// RegisterConsensusNodeMetric creates and registers metrics for consensus nodes.
func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error {
log.Info("Registering metric for consensus node: ", nodeName)

// Create an ObservableGauge for consensus node metrics.
consensusNodeGauge, err := meter.Float64ObservableGauge(
"consensus_node_ids_metric",
metric.WithDescription("Metric for Consensus Node IDs"),
)
if err != nil {
log.Fatalf("Error creating metric: ", err)
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
labels := metric.WithAttributes(
attribute.String("node_name", nodeName),
attribute.String("node_id", nodeID),
attribute.String("namespace", namespace),
)
// Observe the value for current consensus node metrics with associated labels.
observer.ObserveFloat64(consensusNodeGauge, 1, labels)

return nil
}

// Register the callback with the meter and the ObservableGauge.
_, err = meter.RegisterCallback(callback, consensusNodeGauge)
return err
}
83 changes: 52 additions & 31 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodes

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -32,57 +33,77 @@ func SetConsNodeDefault(peer config.Peer) config.Peer {
return peer
}

// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode
// makes a request to the API and gets the info about the genesis and return it
// GenesisHash connects to the specified consensus node, makes a request to the API,
// and retrieves information about the genesis block including its hash and time.
func GenesisHash(consensusNode string) (string, string, error) {
url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", "", err
}

blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", errors.New("error accessing block ID hash")
}

blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", errors.New("error accessing block time")
}

return blockIDHash, blockTime, nil
}

// ConsensusNodesIDs connects to the specified consensus node, makes a request to the API,
// and retrieves the node ID from the status response.
func ConsensusNodesIDs(consensusNode string) (string, error) {
url := fmt.Sprintf("http://%s:26657/status?", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", err
}

nodeID, ok := jsonResponse["result"].(map[string]interface{})["node_info"].(map[string]interface{})["id"].(string)
if !ok {
log.Error("Unable to access .result.node_info.id")
return "", errors.New("error accessing node ID")
}

log.Info("Consensus Node [", consensusNode, "] ID: [", nodeID, "]")

return nodeID, nil
}

// makeAPIRequest handles the common task of making an HTTP request to a given URL
// and parsing the JSON response. It returns a map representing the JSON response or an error.
func makeAPIRequest(url string) (map[string]interface{}, error) {
response, err := http.Get(url)
if err != nil {
log.Error("Error making the request to the node [", consensusNode, "] - ", err)
return "", "", err
log.Error("Error making the request: ", err)
return nil, err
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
log.Error("Non-OK response:", response.Status)
return "", "", err
return nil, err
}

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Error reading response body:", err)
return "", "", err
return nil, err
}

bodyString := string(bodyBytes)
log.Info("Response Body: ", bodyString)

// Parse the JSON response into a generic map
var jsonResponse map[string]interface{}
err = json.Unmarshal([]byte(bodyString), &jsonResponse)
err = json.Unmarshal(bodyBytes, &jsonResponse)
if err != nil {
log.Error("Error parsing JSON:", err)
return "", "", err
}

// Access and print the .block_id.hash field
blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", err
return nil, err
}

// Access and print the .block.header.time field
blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", err
}

log.Info("Block ID Hash: ", blockIDHash)
log.Info("Block Time: ", blockTime)
log.Info("Full output: ", bodyString)

return blockIDHash, blockTime, nil
return jsonResponse, nil
}
Loading