forked from fadhilyori/subping
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subping.go
286 lines (226 loc) · 7.75 KB
/
subping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Package subping provides a utility for concurrently pinging multiple IP addresses and collecting the results.
//
// The package includes functionality for running ping operations on multiple IP addresses concurrently,
// calculating ping statistics, and partitioning data for parallel processing.
//
// Example usage:
//
// // Create options for Subping
// opts := &subping.Options{
// LogLevel: "info",
// Subnet: "192.168.0.0/24",
// Count: 5,
// Interval: time.Second,
// Timeout: 2 * time.Second,
// MaxWorkers: 10,
// }
//
// // Create a new Subping instance
// sp, err := subping.NewSubping(opts)
// if err != nil {
// log.Fatalf("Failed to create Subping instance: %v", err)
// }
//
// // Run the Subping process
// sp.Run()
//
// // Get the online hosts and their statistics
// onlineHosts, total := sp.GetOnlineHosts()
// fmt.Printf("Online Hosts: %v\n", onlineHosts)
// fmt.Printf("Total Online Hosts: %d\n", total)
package subping
import (
"errors"
"log"
"runtime"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/fadhilyori/subping/pkg/network"
ping "github.com/prometheus-community/pro-bing"
)
// Subping is a utility for concurrently pinging multiple IP addresses and collecting the results.
type Subping struct {
// TargetsIterator is an iterator for the target IP addresses to ping.
TargetsIterator *network.SubnetHostsIterator
// Count is the number of ping requests to send for each target.
Count int
// Interval is the time duration between each ping request.
Interval time.Duration
// Timeout specifies the timeout duration before exiting each target.
Timeout time.Duration
// BatchSize is the number of concurrent ping jobs to execute.
BatchSize int64
// Results stores the ping results for each target IP address.
Results map[string]Result
// TotalResults represents the total number of ping results collected.
TotalResults int
// MaxWorkers specifies the maximum number of concurrent workers to use.
MaxWorkers int
logger *logrus.Logger
}
// Options holds the configuration options for creating a new Subping instance.
type Options struct {
// LogLevel sets the log levels for the Subping instance.
LogLevel string
// Subnet is the subnet to scan for IP addresses to ping.
Subnet string
// Count is the number of ping requests to send for each target.
Count int
// Interval is the time duration between each ping request.
Interval time.Duration
// Timeout specifies the timeout duration before exiting each target.
Timeout time.Duration
// MaxWorkers specifies the maximum number of concurrent workers to use.
MaxWorkers int
}
// Result contains the statistics and metrics for a single ping operation.
type Result struct {
// AvgRtt is the average round-trip time of the ping requests.
AvgRtt time.Duration
// PacketLoss is the percentage of packets lost during the ping operation.
PacketLoss float64
// PacketsSent is the number of packets sent for the ping operation.
PacketsSent int
// PacketsRecv is the number of packets received for the ping operation.
PacketsRecv int
// PacketsRecvDuplicates is the number of duplicate packets received.
PacketsRecvDuplicates int
}
// NewSubping creates a new Subping instance with the provided options.
func NewSubping(opts *Options) (*Subping, error) {
if opts.Subnet == "" {
return nil, errors.New("subnet should be in CIDR notation and cannot empty")
}
if opts.Count < 1 {
return nil, errors.New("count should be more than zero (0)")
}
if opts.MaxWorkers < 1 {
return nil, errors.New("max workers should be more than zero (0)")
}
ips, err := network.NewSubnetHostsIteratorFromCIDRString(opts.Subnet)
if err != nil {
log.Fatal(err.Error())
}
batchLimit, err := calculateMaxPartitionSize(ips.TotalHosts, opts.MaxWorkers)
if err != nil {
return nil, err
}
if opts.LogLevel == "" {
opts.LogLevel = "error"
}
logLevel, err := logrus.ParseLevel(opts.LogLevel)
if err != nil {
return nil, errors.New("max workers should be more than zero (0)")
}
instance := &Subping{
TargetsIterator: ips,
Count: opts.Count,
Interval: opts.Interval,
Timeout: opts.Timeout,
BatchSize: int64(batchLimit),
MaxWorkers: opts.MaxWorkers,
logger: logrus.New(),
}
instance.logger.SetLevel(logLevel)
return instance, nil
}
// Run starts the Subping process, concurrently pinging the target IP addresses.
// It spawns worker goroutines, assigns tasks to them, waits for them to finish,
// and collects the results.
func (s *Subping) Run() {
var (
// syncMap to store the results from workers.
syncMap sync.Map
// wg WaitGroup to synchronize the workers.
wg sync.WaitGroup
// jobChannel to distribute tasks to workers.
jobChannel = make(chan string, s.BatchSize)
)
// Spawn the worker goroutines.
for i := int64(0); i < int64(s.MaxWorkers); i++ {
wg.Add(1)
go s.startWorker(i, &wg, &syncMap, jobChannel)
}
s.logger.Debugf("Spawned %d workers.\n", s.MaxWorkers)
s.logger.Debugln("Assigning task to all workers.")
for ip := s.TargetsIterator.Next(); ip != nil; ip = s.TargetsIterator.Next() {
ipString := ip.String()
jobChannel <- ipString
s.logger.Tracef("Assigned task: %s\n", ipString)
}
s.logger.Debugln("Waiting all workers finish their jobs.")
close(jobChannel)
wg.Wait()
s.logger.Debugln("All workers already stopped. Storing the results.")
s.Results = make(map[string]Result)
syncMap.Range(func(key, value any) bool {
s.Results[key.(string)] = value.(Result)
return true
})
s.TotalResults = len(s.Results)
s.logger.Debugln("Run finished. All task done..")
}
// startWorker is a worker goroutine that performs the ping task assigned to it.
// It collects the ping results and stores them in the sync.Map.
func (s *Subping) startWorker(id int64, wg *sync.WaitGroup, sm *sync.Map, c <-chan string) {
defer wg.Done()
for target := range c {
s.logger.WithField("worker", id).Tracef("Got task %s.\n", target)
p := RunPing(target, s.Count, s.Interval, s.Timeout)
sm.Store(target, Result{
AvgRtt: p.AvgRtt,
PacketLoss: p.PacketLoss,
PacketsSent: p.PacketsSent,
PacketsRecv: p.PacketsRecv,
PacketsRecvDuplicates: p.PacketsRecvDuplicates,
})
time.Sleep(s.Interval)
}
}
// GetOnlineHosts returns a map of online hosts and their corresponding ping results,
// as well as the total number of online hosts.
func (s *Subping) GetOnlineHosts() (map[string]Result, int) {
r := make(map[string]Result)
for ip, stats := range s.Results {
if stats.PacketsRecv > 0 {
r[ip] = stats
}
}
return r, len(r)
}
// RunPing performs a ping operation to the specified IP address.
// It sends the specified number of ping requests with the given interval and timeout.
func RunPing(ipAddress string, count int, interval time.Duration, timeout time.Duration) ping.Statistics {
pinger, err := ping.NewPinger(ipAddress)
if err != nil {
logrus.Printf("Failed to create pinger for IP Address: %s\n", ipAddress)
return ping.Statistics{}
}
pinger.Count = count
pinger.Interval = interval
if timeout > 0 {
pinger.Timeout = timeout
}
if runtime.GOOS == "windows" {
pinger.SetPrivileged(true)
}
err = pinger.Run()
if err != nil {
logrus.Printf("Failed to ping the address %s, %v\n", ipAddress, err.Error())
return ping.Statistics{}
}
return *pinger.Statistics()
}
// calculateMaxPartitionSize calculates the maximum size of each partition given the total data size and the desired number of partitions.
func calculateMaxPartitionSize(dataSize int, numPartitions int) (int, error) {
maxPartitionSize := dataSize / numPartitions
remainder := dataSize % numPartitions
if remainder != 0 {
maxPartitionSize++
}
if maxPartitionSize < 0 {
return 0, errors.New("the value exceeds the range of int")
}
return maxPartitionSize, nil
}