diff --git a/README.md b/README.md index 28d72c7..0b096cb 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,11 @@ You can use Torch to manage the nodes connections from a config file and Torch w Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses information and stores them in a Redis instance, also, it provides some metrics to expose the node's IDs through the `/metrics` endpoint. +Torch automatically detects Load Balancer resources in a Kubernetes cluster and exposes metrics related to these Load Balancers. +The service uses OpenTelemetry to instrument the metrics and Prometheus to expose them. +It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**. +For each LoadBalancer service found, it retrieves the LoadBalancer public IP and name and generates metrics with custom labels. These metrics are then exposed via a Prometheus endpoint, making them available for monitoring and visualization in Grafana or other monitoring tools. + --- ## Workflow @@ -243,6 +248,66 @@ Torch uses [Redis](https://redis.io/) as a DB, so to use Torch, you need to have We are using Redis in two different ways: - Store the Nodes IDs and reuse them. -- As a message broker, where Torch uses Producer & Consumer approach to process data async. +- As a message broker, Torch uses the Producer & Consumer approach to process data async. + +--- + +## Metrics + +### MultiAddress + +Custom metrics to expose the nodes multi-address: + +- `multiaddr`: This metric represents the nodes Multi Address: + - `service_name`: The service name. In this case, it is set to **torch**. + - `node_name`: The name of the node. + - `multiaddress`: Node MultiAddress. + - `namespace`: The namespace in which the torch is deployed. + - `value`: The value of the metric. In this example, it is set to 1. + +### BlockHeight + +Custom metrics to expose the first block height of the chain: + +- `block_height_1`: Name of the metric to represent the first block height of the chain: + - `service_name`: The service name. In this case, it is set to **torch**. + - `block_height_1`: First block id generated + - `earliest_block_time`: Timestamp when the chain was created. + - `days_running`: Number of days that the chain is running. + - `namespace`: The namespace in which the torch is deployed. + - `value`: The value of the metric. In this example, it is set to 1. + +### Load Balancer + +Custom metrics to expose the LoadBalancer public IPs: + +- `load_balancer`: This metric represents the LoadBalancer resource and includes the following labels: + - `service_name`: The service name. In this case, it is set to **torch**. + - `load_balancer_name`: The name of the LoadBalancer service. + - `load_balancer_ip`: The IP address of the LoadBalancer. + - `namespace`: The namespace in which the LoadBalancer is deployed. + - `value`: The value of the metric. In this example, it is set to 1, but it can be customized to represent different load balancing states. + + +--- + +## Monitoring and Visualization + +Torch exposes some custom metrics through the Prometheus endpoint. +You can use Grafana to connect to Prometheus and create custom dashboards to visualize these metrics. + +To access the Prometheus and Grafana dashboards and view the metrics, follow these steps: + +1. Access the Prometheus dashboard: +- Open a web browser and navigate to the Prometheus server's URL (e.g., `http://prometheus-server:9090`). +- In the Prometheus web interface, you can explore and query the metrics collected by the Service Torch. + +2. Access the Grafana dashboard: +- Open a web browser and navigate to the Grafana server's URL (e.g., `http://grafana-server:3000`). +- Log in to Grafana using your credentials. +- Create a new dashboard or import an existing one to visualize the LoadBalancer metrics from Prometheus. +- Use the `load_balancer` metric and its labels to filter and display the relevant information. + +Customizing dashboards and setting up alerts in Grafana will help you monitor the performance and health of your LoadBalancer resources effectively. --- diff --git a/pkg/http/server.go b/pkg/http/server.go index 50840d4..d55afbc 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -82,6 +82,7 @@ func Run(cfg config.MutualPeersConfig) { // check if Torch has to generate the metric or not, we invoke this function async to continue the execution flow. go BackgroundGenerateHashMetric(cfg) + go BackgroundGenerateLBMetric() // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") @@ -129,6 +130,31 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } +// BackgroundGenerateLBMetric initializes a goroutine to generate the load_balancer metric. +func BackgroundGenerateLBMetric() { + log.Info("Initializing goroutine to generate the metric: load_balancer ") + + // Retrieve the list of Load Balancers + _, err := k8s.RetrieveAndGenerateMetrics() + if err != nil { + log.Printf("Failed to update metrics: %v", err) + } + + // Start watching for changes to the services in a separate goroutine + done := make(chan error) + go k8s.WatchServices(done) + + // Handle errors from WatchServices + for { + select { + case err := <-done: + if err != nil { + log.Error("Error in WatchServices: ", err) + } + } + } +} + // BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data. func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { log.Info("BackgroundGenerateHashMetric...") diff --git a/pkg/k8s/services.go b/pkg/k8s/services.go new file mode 100644 index 0000000..6d67932 --- /dev/null +++ b/pkg/k8s/services.go @@ -0,0 +1,147 @@ +package k8s + +import ( + "context" + "errors" + "fmt" + + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/celestiaorg/torch/pkg/metrics" +) + +// RetrieveAndGenerateMetrics retrieves the list of Load Balancers and generates metrics +func RetrieveAndGenerateMetrics() ([]metrics.LoadBalancer, error) { + log.Info("Retrieving the list of Load Balancers") + + // Get list of LBs + svc, err := ListServices() + if err != nil { + log.Error("Failed to retrieve the LoadBalancers: ", err) + return nil, err + } + + // Get the list of the LBs + loadBalancers, err := GetLoadBalancers(svc) + if err != nil { + log.Error("Error getting the load balancers: ", err) + return nil, err + } + + // Generate the metrics with the LBs + err = metrics.WithMetricsLoadBalancer(loadBalancers) + if err != nil { + log.Error("Failed to update metrics: ", err) + return nil, err + } + + return loadBalancers, nil +} + +// ListServices retrieves the list of services in a namespace +func ListServices() (*corev1.ServiceList, error) { + // Authentication in cluster - using Service Account, Role, RoleBinding + config, err := rest.InClusterConfig() + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + // Create the Kubernetes clientSet + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + // Get all services in the namespace + services, err := clientSet.CoreV1().Services(GetCurrentNamespace()).List(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + return services, nil +} + +// GetLoadBalancers filters the list of services to include only Load Balancers and returns a list of them +func GetLoadBalancers(svc *corev1.ServiceList) ([]metrics.LoadBalancer, error) { + var loadBalancers []metrics.LoadBalancer + + for _, svc := range svc.Items { + if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + for _, ingress := range svc.Status.LoadBalancer.Ingress { + log.Info(fmt.Sprintf("Updating metrics for service: [%s] with IP: [%s]", svc.Name, ingress.IP)) + + // Create a LoadBalancer struct and append it to the loadBalancers list + loadBalancer := metrics.LoadBalancer{ + ServiceName: "torch", + LoadBalancerName: svc.Name, + LoadBalancerIP: ingress.IP, + Namespace: svc.Namespace, + Value: 1, // Set the value of the metric here (e.g., 1) + } + loadBalancers = append(loadBalancers, loadBalancer) + } + } + } + + if len(loadBalancers) == 0 { + return nil, errors.New("no Load Balancers found") + } + + return loadBalancers, nil +} + +// WatchServices watches for changes to the services in the specified namespace and updates the metrics accordingly +func WatchServices(done chan<- error) { + defer close(done) + + // Authentication in cluster - using Service Account, Role, RoleBinding + config, err := rest.InClusterConfig() + if err != nil { + log.Error("Failed to get in-cluster config: ", err) + done <- err + return + } + + // Create the Kubernetes clientSet + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error("Failed to create Kubernetes clientSet: ", err) + done <- err + return + } + + // Create a service watcher + watcher, err := clientSet.CoreV1().Services(GetCurrentNamespace()).Watch(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Error("Failed to create service watcher: ", err) + done <- err + return + } + + // Watch for events on the watcher channel + for event := range watcher.ResultChan() { + if service, ok := event.Object.(*corev1.Service); ok { + if service.Spec.Type == corev1.ServiceTypeLoadBalancer { + loadBalancers, err := GetLoadBalancers(&corev1.ServiceList{Items: []corev1.Service{*service}}) + if err != nil { + log.Error("Failed to get the load balancers metrics: %v", err) + done <- err + return + } + + if err := metrics.WithMetricsLoadBalancer(loadBalancers); err != nil { + log.Error("Failed to update metrics with load balancers: ", err) + done <- err + return + } + } + } + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 4bd0826..c9b2e3d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -121,3 +121,46 @@ func calculateDaysDifference(inputTimeString string) (int, error) { return daysDifference, nil } + +// LoadBalancer represents the information for a load balancer. +type LoadBalancer struct { + ServiceName string // ServiceName Name of the service associated with the load balancer. + LoadBalancerName string // LoadBalancerName Name of the load balancer. + LoadBalancerIP string // LoadBalancerIP IP address of the load balancer. + Namespace string // Namespace where the service is deployed. + Value float64 // Value to be observed for the load balancer. +} + +// WithMetricsLoadBalancer creates a callback function to observe metrics for multiple load balancers. +func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error { + // Create a Float64ObservableGauge named "load_balancer" with a description for the metric. + loadBalancersGauge, err := meter.Float64ObservableGauge( + "load_balancer", + metric.WithDescription("Torch - Load Balancers"), + ) + if err != nil { + log.Fatalf(err.Error()) + return err + } + + // Define the callback function that will be called periodically to observe metrics. + callback := func(ctx context.Context, observer metric.Observer) error { + for _, lb := range loadBalancers { + // Create labels with attributes for each load balancer. + labels := metric.WithAttributes( + attribute.String("service_name", lb.ServiceName), + attribute.String("load_balancer_name", lb.LoadBalancerName), + attribute.String("load_balancer_ip", lb.LoadBalancerIP), + attribute.String("namespace", lb.Namespace), + ) + // Observe the float64 value for the current load balancer with the associated labels. + observer.ObserveFloat64(loadBalancersGauge, lb.Value, labels) + } + + return nil + } + + // Register the callback with the meter and the Float64ObservableGauge. + _, err = meter.RegisterCallback(callback, loadBalancersGauge) + return err +}