Skip to content

Commit

Permalink
Merge pull request #20 from OObasuyi/working
Browse files Browse the repository at this point in the history
new feature for time based
  • Loading branch information
OObasuyi authored Nov 6, 2024
2 parents 325c9dc + a0021a6 commit ee63b16
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 5 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ColdClarity is a tool designed to see data gathered by Cisco ISE from your netwo
- **Automated Email Notifications**: Sends reports via email to specified recipients.
- **Customizable Profiles and Buckets**: Allows for logical organization of endpoints into profiles and buckets.
- **Specialized Reporting Options**: Option to focus reports on hardware details or other endpoint specifics.
- **Time Based Options**: if you just want to pull a report on a certain time range or the past number of days.

## Configuration

Expand Down Expand Up @@ -74,6 +75,7 @@ logs are created and placed in the logging directory on run, you can also use a

```yaml
# DIAG
test_endpoint_pull: 1 # if you want to get only a certain amount of endpoints back Useful if you want to test with a small portion of endpoints if you have alot
test_messaging_svc: True # if you want to test pulling data without sending a email
debug_console_login: ~ #outputs debug and higher to console
```
24 changes: 22 additions & 2 deletions ise_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,26 @@ def dataconnect_engine(self, sql_string) -> pd.DataFrame:
db_ssl_context.verify_mode = CERT_NONE
self.logger.debug('Connecting to ISE DB :)')

# FLAG FOR DATES
if self.config.get('time_window'):
# check to see what db column dt str we need if we need it if not skip section
dt_sql_str = None
if 'LOGGED_AT' in sql_string.upper():
dt_sql_str = 'LOGGED_AT'
self.logger.debug('changing sql string for "POSTURE_ASSESSMENT_BY_CONDITION"')
elif 'TIMESTAMP' in sql_string.upper():
dt_sql_str = 'TIMESTAMP'
self.logger.debug('changing sql string for "RADIUS_AUTHENTICATIONS"')

if dt_sql_str:
ep_dt_str, ep_dt_end = self.UTILS.get_time_range(self.config['time_window'], self)
if ep_dt_str and ep_dt_end:
sql_string = (f"{sql_string} WHERE {dt_sql_str} BETWEEN "
f"TO_TIMESTAMP('{ep_dt_str} 00:00:00.000000', 'DD-MM-YYYY HH24:MI:SS.FF6') AND "
f"TO_TIMESTAMP('{ep_dt_end} 23:59:59.999999', 'DD-MM-YYYY HH24:MI:SS.FF6')")
else:
self.logger.info('Could not retrieve time window info SKIPPING')

# DIAG FLAG for test
ep_amount = self.config.get('test_endpoint_pull')
if isinstance(ep_amount, int):
Expand Down Expand Up @@ -294,7 +314,7 @@ def get_endpoint_software_info(self) -> pd.DataFrame:
endpoints += ep_data
step_page += 1
else:
self.logger.critical(f'GESI: no HW data for endpoints on page {step_page}')
self.logger.debug(f'GESI: no HW data for endpoints on page {step_page}')
break
else:
self.logger.debug(f'GESI: received back response code {response.status_code} on data retrieval')
Expand Down Expand Up @@ -354,7 +374,7 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame:
endpoints += ep_data
step_page += 1
else:
self.logger.critical(f'GEHI: no HW data for endpoints on page {step_page}')
self.logger.debug(f'GEHI: no HW data for endpoints on page {step_page}')
break
else:
self.logger.debug(f'GEHI: received back response code {response.status_code} on data retrieval')
Expand Down
4 changes: 2 additions & 2 deletions report_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def ise_step_2(self, writer):
common_computing_profiles = 'server|red hat| hel|workstation|OSX'
# db queries
get_all_posture_endpoints = "select LOGGED_AT,POLICY,ENDPOINT_ID,POLICY_STATUS, CONDITION_NAME, CONDITION_STATUS from posture_assessment_by_condition"
get_all_auths = "select ORIG_CALLING_STATION_ID,AUTHENTICATION_METHOD,AUTHENTICATION_PROTOCOL,POSTURE_STATUS,ENDPOINT_PROFILE from RADIUS_AUTHENTICATIONS"
get_all_auths = "select TIMESTAMP,ORIG_CALLING_STATION_ID,AUTHENTICATION_METHOD,AUTHENTICATION_PROTOCOL,POSTURE_STATUS,ENDPOINT_PROFILE from RADIUS_AUTHENTICATIONS"
get_all_endpoints ="select B.LOGICAL_PROFILE, B.ASSIGNED_POLICIES, A.MAC_ADDRESS from ENDPOINTS_DATA A, LOGICAL_PROFILES B where A.ENDPOINT_POLICY = B.ASSIGNED_POLICIES"
get_portal_endpoints ="select GUEST_LAST_NAME,GUEST_FIRST_NAME,SSID,NAD_ADDRESS,MAC_ADDRESS from PRIMARY_GUEST"
get_portal_endpoints ="select LOGGED_AT,GUEST_LAST_NAME,GUEST_FIRST_NAME,SSID,NAD_ADDRESS,MAC_ADDRESS from PRIMARY_GUEST"

ep_postured = self.ise.dataconnect_engine(get_all_posture_endpoints)
ep_auths = self.ise.dataconnect_engine(get_all_auths)
Expand Down
3 changes: 2 additions & 1 deletion templates/config_templete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ EndpointData:
dataconnect:
password: dcpassword1234


# records for specific time dates can recorded as last N days or a range in a format such as "DD-MM-YYYY:DD-MM-YYYY"
time_window: 90

# DIAG
test_endpoint_pull: 0
Expand Down
36 changes: 36 additions & 0 deletions utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from io import StringIO
import yaml
from re import sub
from pandas import to_datetime, isna
from datetime import datetime, timedelta

import logging

Expand Down Expand Up @@ -95,6 +97,40 @@ def mac_normalizer(df_str):
else:
return df_str

@staticmethod
def qualify_dt_str(dt_object:str):
potential_dt = to_datetime(dt_object,errors='coerce',format='%d-%m-%Y')
return potential_dt

@staticmethod
def get_last_n_date_from_now(n_date:int) -> tuple:
today = datetime.now()
n_date = today - timedelta(days=n_date)
today = today.strftime('%d-%m-%Y')
n_date = n_date.strftime('%d-%m-%Y')
return n_date, today

def get_time_range(self,time_range,self_instance) -> tuple:
ep_dt_str, ep_dt_end = None, None
ep_dt_info = time_range
if isinstance(ep_dt_info, int):
# make sure its pos val
ep_dt_info = abs(ep_dt_info)
ep_dt_str, ep_dt_end = self.get_last_n_date_from_now(ep_dt_info)
else:
# make sure we are only receiving a str val and splice
ep_dt_info = str(ep_dt_info)
ep_dt_info = ep_dt_info.split(':')
if len(ep_dt_info) > 1:
ep_dt_str = self.qualify_dt_str(ep_dt_info[0])
ep_dt_end = self.qualify_dt_str(ep_dt_info[1])
if isna(ep_dt_str) or isna(ep_dt_end):
self_instance.logger.critical(f'Time window {ep_dt_info} is not a valid time window! ')
return None,None
else:
ep_dt_str = ep_dt_str.strftime('%d-%m-%Y')
ep_dt_end = ep_dt_end.strftime('%d-%m-%Y')
return ep_dt_str, ep_dt_end

def log_collector(log_all=False,file_name='ise_reporting_logs.log'):
fName = Rutils().create_file_path('logging', file_name)
Expand Down

0 comments on commit ee63b16

Please sign in to comment.