-
Notifications
You must be signed in to change notification settings - Fork 157
/
Copy pathpingip.py
107 lines (100 loc) · 3.43 KB
/
pingip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
import sys
import ctypes
import subprocess
from multiprocessing.pool import ThreadPool
import logging
import time
import re
import msvcrt
from tqdm import tqdm
# 设置日志
def set_logging_format():
logging.basicConfig(level=logging.INFO,
format='%(message)s',
filename="ping_host.log",
filemode='w'
)
console = logging.StreamHandler()
console.setLevel(logging.FATAL)
formatter = logging.Formatter('%(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
# 获得所有待测试ip
def get_all_ips(hosts_list_path):
ips = []
with open(hosts_list_path, "r") as f:
for host in f.readlines():
ips.append(host.strip())
return ips
#多线程调用ping
def ping_host(ip):
global finish
popen = subprocess.Popen('ping -w 1 %s' %ip, stdout=subprocess.PIPE,shell=True)
popen.wait()
res = popen.stdout.read().decode('gbk').strip('\n')
if "平均" in res:
try:
latency = re.findall("平均 = \d+ms", res)[0]
latency = re.findall(r"\d+", latency)[0]
loss = re.findall("\d+% 丢失", res)[0]
loss = re.findall(r"\d+", loss)[0]
if int(latency)<THRESHOLD:
#logging.info("{}, 延迟:{}ms, 丢包:{}%".format(ip, latency, loss))
outcomes.append((ip, int(latency), int(loss)))
except Exception as e:
print(e)
finish += 1
# 判断是否为管理员
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if __name__ == '__main__':
if is_admin() == False:
print("请以管理员权限运行")
os.system("pause")
sys.exit()
# 线程数:为200时候,我本地测试179秒。
# 不同配置和网络的电脑结果有差异。线程不是越大越好,设置成不超过300。
# 超过300后丢包测试的结果不准。
WORD_THREAD_NUM = int(input("线程数(一般设置为200)范围为0-2000:"))
assert 0<WORD_THREAD_NUM<2000
# 移动连接香港,一般设置100ms,电信联通连接美西,一般设置200
THRESHOLD = int(input("阈值(移动设置为100,电信联通设置200)范围为0-300:"))
assert 0<THRESHOLD<300
now = time.time()
# 初始化参数
set_logging_format()
hosts_list_path = "./input.txt"
ips = get_all_ips(hosts_list_path)
total = len(ips)
finish = 1
finish_temp = 1
outcomes = []
# 设置线程池
pool = ThreadPool(WORD_THREAD_NUM)
pool.map_async(ping_host,ips)
pool.close()
with tqdm(total=total) as pbar:
while(True):
# 更新bar
time.sleep(1.0)
pbar.update(finish-finish_temp)
finish_temp = finish
if (total<=finish or msvcrt.kbhit()):
outcomes.sort(key=lambda outcome_item: (outcome_item[2], outcome_item[1]))
if len(outcomes)==0:
logging.info("没有找到合适的IP,请调整阈值")
else:
for i in outcomes:
ip, latency, loss = i[0], i[1], i[2]
logging.info("{}, 延迟:{}ms, 丢包:{}%".format(ip, latency, loss))
pool.terminate()
print("正在退出")
break
print("总共耗时:",time.time()-now, 's')
os.system("pause")