-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNetworks.py
More file actions
executable file
·282 lines (280 loc) · 10.3 KB
/
Networks.py
File metadata and controls
executable file
·282 lines (280 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#!/usr/bin/python3 -O
"""
File: Networks.py
Author: Leon McClatchey
Date: 2025-09-23
Description: Uses SNMP to determine what the active interfaces are,
Grabbing the pertinent statistics of the active interfaces.
This script does use similar functions as the Storage.py script
except it is focused more on Network interfaces instead of Storage Devices,
But, This script has been set up as a standalone script as well.
Commands: snmpwalk -v2c -c???? host oid (1.3.6.1.2.1.2.2.1) or (ifEntry)
SNMP States for AdminStatus and OperStatus: 1 = up, 2 = down, 3 = testing
SNMP Definition for Type: ethernetCsmacd = 6, softwareLoopback = 24
"""
# System Libraries
import os, sys, socket, subprocess
import argparse
import logging
from IPy import IP
from errno import ECONNREFUSED
from datetime import datetime
from tabulate import tabulate
# Initialize Global Strings
Config = {"OID":{"Base":"1.3.6.1.2.1.2.2.1",
"States":{1:"up",2:"down",3:"testing"},
"Types":{1:"other",6:"ethernetCsmacd",24:"softwareLoopback",23:"ppp",53:"propVirtual",62:"fastEther",
69:"gigabitEthernet",71:"ieee80211",117:"l2vlan",131:"tunnel",136:"usb",150:"docsCableUpstream",161:"ipv6",
166:"mplsTunnel"}
},"Elements":{}}
MyScript = os.path.splitext(os.path.basename(sys.argv[0]))[0]
# Class MyParser used to get the command line Arguments
class MyParser(argparse.ArgumentParser):
def Error(self, message):
global Config
WriteLog(f"Argument Parsing Error: {message}",Config['log'])
self.print_help()
sys.stderr.write(f"error: {message}\n")
sys.exit(2)
# Function to Archive the Logfile
def ArchiveLog(dic):
cdir = os.getcwd()
tDate=datetime.today().strftime("%Y%m%d")
pth = os.path.dirname(dic['path'])
archive = f"{os.path.splitext(os.path.basename(dic['path']))[0]}{tDate}.zip"
log=f"{os.path.splitext(os.path.basename(dic['path']))[0]}"
try:
os.chdir(pth)
lsize = os.path.getsize(os.path.basename(dic['path']))
msize = dic['max'] * 10e6
if lsize >= msize:
zip = zipfile.ZipFile(archive,'w',zipfile.ZIP_DEFLATED)
archive.write(zip)
archive.close()
os.remove(os.path.basename(dic['path']))
WriteLog(f"Successfully compressed {os.path.basename(dic['path'])} to {archive} and removed {os.path.basename(dic['path'])}",dic['path'])
except:
WriteLog(f"Error Compressing Log",dic['path'])
finally:
os.chdir(cdir)
# Function to Build the Table
def BuildTable(entry):
global Config
Mibs = Config['OID']['Mibs']
Elements = Config['Elements']
for i in range(len(Mibs)):
lst = entry.split("=")
if lst[0].split('.')[0] == f"if{Mibs[i]}":
Elements.setdefault(Mibs[i],[])
Integers = {0,3,4,9,10,11,12,13,14,15,16,17,18,19,20}
tmp = lst[1].split(":")[1].strip()
match i:
case i if i in Integers:
Elements[Mibs[i]].append(int(tmp))
break
case 1:
Elements[Mibs[i]].append(tmp)
break
case 2 | 6 | 7:
Elements[Mibs[i]].append(int(tmp.split('(')[1].strip(')')))
break
case 5:
Elements[Mibs[i]].append(lst[1].split(":",1)[1].strip())
break
case 8:
Elements[Mibs[i]].append(lst[1].split(')')[1].strip())
break
case 21:
Elements[Mibs[i]].append(lst[1])
break
case _:
print("Unknown Mib Found")
Config['Elements'] = Elements
# Function to convert Bytes
def ConvertBytes(data):
match data:
case n if 1e3 <= n <= 1e6:
return f"{round(data / 1e3,3)} KB"
case n if 1e6 <= n <= 1e9:
return f"{round(data / 1e6,3)} MB"
case n if 1e9 <= n <= 1e12:
return f"{round(data / 1e9,3)} GB"
case n if 1e12 <= n:
return f"{round(data / 1e12,3)} TB"
case _:
return f"{data}b"
# Function to Create a Folder if it does not exist
def CreateFolder(pth):
if not pth:
print("Path must be specified")
return
tst=os.path.splitext(pth)
wpth=os.path.dirname(pth) if tst[1] else pth
if not os.path.exists(wpth):
os.makedirs(wpth)
# Function to create Mib Table
def CreateMibTable():
global Config
Args = Config['Args']
OID = Config['OID']
Mibs = []
cmd = f"snmpwalk -v{Args['version']} -c{Args['community']} {Args['host']} {OID['Base']}"
WriteLog(f"Obtaining SNMP Mibs from {Args['host']}!",Args['log'])
rc = LocalCommand(cmd)
if rc['code'] == 0:
tbl = rc['out'].decode('utf-8').split('\n')
WriteLog(f"Query returned {len(tbl)} records from {Args['host']}",Args['log'])
for i in range(len(tbl)):
if len(tbl[i].strip()) > 0:
tmp = tbl[i].strip().split("::")[1].split('.')[0].strip('if')
if tmp not in Mibs:
Mibs.append(tmp)
Config['OID']['Mibs'] = Mibs
else:
dsp = f"Unable to Connect via SNMP with {Args['host']}"
WriteLog(dsp,Args['log'])
print(dsp)
sys.exit(2)
# Function to Display Table
def DisplayTable(title,dic):
global Config
Mibs = Config['OID']['Mibs']
Types = Config['OID']['Types']
States = Config['OID']['States']
print(Mibs[3])
print(f"Network Results for {title}")
data = {k: v for k, v in dic.items() if v is not None and len(str(v).strip()) > 0}
for key in data.keys():
match key:
case 'Type':
data[key] = Types[dic[key]]
case str(x) if 'Status' in x:
data[key] = States[dic[key]]
case _:
data[key] = ConvertBytes(dic[key]) if isinstance(dic[key],int) else dic[key]
continue
for key in data.keys():
print(f"{key}: {data[key]}")
#table = [[k,v] for k,v in data.items()]
#print(tabulate(table, headers=["Key","Value"],tablefmt="grid"))
# Function to get the Command Line Arguments
def GetArguments():
global MyScript
parser = MyParser(description='SNMP Network Manager Version 1.0.0', usage='%(prog)s [options]', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-c", '--community', dest='community', type=str, default='public', help="The snmp community: defaults to 'public'")
parser.add_argument("-e", '--engine',dest='engine',type=str,help="Engine ID if SNMP Version 3 has been specified")
parser.add_argument("-f", '--flags',dest='flags',type=str,default="c 10:w 20",help="Colon deliminated string for critical and warning percentage settings, defaults to 'c 10:w 15'")
parser.add_argument("-H", '--host', dest='host', type=str, required=True, help="Host (Required): Name of the host running the target service")
parser.add_argument("-l", '--log', dest='log',default="path=/usr/local/logs:max=500", help="colon deliminated Location and Max size of Log file\nDefaults to 'path=/usr/local/logs:max=500'\nUnless Nagios is defined, then it will be in the Nagios Log directory")
parser.add_argument("-n", '--nagios', dest='nagios', help='location of NMS config file, complete path and filename')
parser.add_argument("-p", '--port', dest='port', type=int, default=161, help="The snmp port: Defaults to '161'")
parser.add_argument("-P", '--password',dest='password',type=str,help="Password if SNMP Version 3 has been specified")
parser.add_argument("-v", '--version', dest='version', type=str, default='2c', help="The snmp version: must be '1,2c, or 3': defaults to '2c'")
parser.add_argument("-u", '--user',dest='user',type=str,help="User if SNMP Version 3 has been specified")
args = vars(parser.parse_args())
if args['version'].strip().lower() not in ['1','2c','3']:
sys.exit(f"Invalid SNMP Version '{args['version']}' Specified\nAborting Mission!")
(pth,mx) = args['log'].split(':')
path=pth.split('=')
max=mx.split('=')
lpth = path[1] if args['nagios'] is None else getNagiosLogPath(args['nagios'])
args['log']={'path':f"{lpth}/{MyScript}.log",'max':int(max[1])}
return args
# Function to retrieve the Nagios Log Folder
def getNagiosLogPath(path):
srch = "log_file"
with open(path,'r') as file:
for line_num, line in enumerate(file,1):
if srch in line:
log = os.path.dirname(line.strip().split('=')[1])
break
return log
# Function to execute the local query
def LocalCommand(cmd):
my_env = os.environ.copy()
try:
proc = subprocess.Popen([cmd],env=my_env,stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(out,err)=proc.communicate()
ecode=proc.wait()
return {'out':out,'err':err,'code':ecode}
except Exception as error:
print >> sys.stderr, f"An {type(error).__name__} occurred in {inspect.stack()[3]} function!"
print >> sys.stderr, f"Exception: {str(error)}"
sys.exit(3)
# Main Function directs how the script is executed
def main():
global Config
Config["Args"] = GetArguments()
log = Config['Args']['log']['path']
logging.basicConfig(filename=log,filemode='a',level=logging.WARN)
ArchiveLog(Config['Args']['log'])
if ValidateHost(Config['Args']['host']):
CreateMibTable()
ReadSnmp()
SortTable()
Elements = Config['Elements']
Devices = Config['Devices']
for label in Elements['Descr']:
DisplayTable(label,Devices[label])
else:
dsp = f"Unable to connect to {Config['Args']['host']}"
WriteLog(dsp,Config['Args']['log'])
print(dsp)
sys.exit(2)
# Function to Read the SNMP network table
def ReadSnmp():
global Config
Args = Config['Args']
Oids = Config['OID']
Base = Oids['Base']
cmd = f"snmpwalk -v{Args['version']} -c{Args['community']} {Args['host']} {Base}"
WriteLog(f"Querying SNMP version {Args['version']} on {Args['host']}",Args['log'])
rc = LocalCommand(cmd)
if rc['code'] == 0:
tbl = rc['out'].decode('utf-8').split('\n')
WriteLog(f"Query returned {len(tbl)} records from {Args['host']}",Args['log'])
for i in range(len(tbl)):
if len(tbl[i].strip()) > 0:
tmp = tbl[i].strip().split("::")[1]
BuildTable(tmp)
else:
dsp = f"Unable to Connect via SNMP with {Args['host']}"
WriteLog(dsp,Args['log'])
print(dsp)
sys.exit(2)
# Function to Sort the Table and match it to a specific IF device
def SortTable():
global Config
Elements = Config['Elements']
Mibs = Config['OID']['Mibs']
Devices = {}
for s in range(len(Elements['Descr'])):
parms = {}
for i in range(1,len(Mibs)):
label = Mibs[i].strip('if')
if label != 'Descr':
parms[label] = Elements[label][s]
Devices[Elements['Descr'][s]] = parms
Config['Devices'] = Devices
# Validate the Host
def ValidateHost(host):
try:
IP(socket.gethostbyname(host))
rc = LocalCommand(f"ping -c 1 -w 5 {host}")
return rc['code'] == 0
except ValueError:
print("Invalid IP Address Specified")
return False
# Function to Write to a log file
def WriteLog(msg,log):
tDate=datetime.today().strftime("%Y-%m-%d %H:%M:%S")
logpath=log['path']
CreateFolder(logpath)
pth=f"{logpath}/{MyScript}.log" if not '.log' in logpath else logpath
dsp=f"{tDate}; {msg}\n"
lf=open(pth,"a")
lf.write(dsp)
lf.close()
# Executes the Script by calling the main function
if __name__ =="__main__":
main()