Skip to content

Commit

Permalink
added packages as python snippets #65
Browse files Browse the repository at this point in the history
  • Loading branch information
akyriako committed Dec 5, 2024
1 parent 99361b4 commit f189df1
Showing 1 changed file with 196 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Configure a tracker, and the system immediately starts recording operations base
## Pushing Alarm Messages

* Create a topic named `cts\_test` on the *SMN console*. For details, see [Creating a Topic](https://docs.otc.t-systems.com/simple-message-notification/umn/topic_management/creating_a_topic.html).
* Add subscriptions to the **cts\_test** topic to push alarm messages. For details, see [Adding a Subscription](https://docs.otc.t-systems.com/simple-message-notification/umn/topic_management/adding_a_subscription_to_a_topic.html).
* Add subscriptions to the `cts\_test` topic to push alarm messages. For details, see [Adding a Subscription](https://docs.otc.t-systems.com/simple-message-notification/umn/topic_management/adding_a_subscription_to_a_topic.html).

:::note

Expand All @@ -72,13 +72,204 @@ Configure a tracker, and the system immediately starts recording operations base

## Building a Program

<!-- TODO: change the link to otc hosted zip file -->
This section describes how to download and use the program package [(index.zip)](https://functionstage-examples.obs.cn-north-1.myhuaweicloud.com/index.zip) for log alarms.
This section describes how use the following snippet for log alarms.

```python
import json
import base64
import sys
import os
import requests

def handler(event, context):
text = json.dumps(event["cts"])
ctsmsg = json.loads(text)

#Resource operator IP
source_ip = ctsmsg["source_ip"]
#trace_name
action = ctsmsg["trace_name"]

SMN_Topic = context.getUserData('SMN_Topic')
regionName = context.getUserData('RegionName')
ip = context.getUserData('IP')
projectId = context.getProjectID()

if (action not in ("login" or "logout")):
return "action is fail"

if ip.find(source_ip) < 0 :
return "reasonable " + action

logInfo = " IP:"+source_ip + ", Action:"+action

body = {"message" :"Illegal operation["+logInfo+"]", "subject":"CTSTrigger"}
headers = {'X-Auth-Token': context.getToken(), "accept": "application/json"}
url= "https://smn."+regionName+".myhuaweicloud.com/v2/"+projectId+"/notifications/topics/urn:smn:"+regionName+":"+projectId+":"+SMN_Topic+"/publish"

response = requests.post(url,headers=headers,data=json.dumps(body))
if response.status_code != 200:
print (response.status_code)
return "send alarm message failed."


return 'send email success'
```

### Creating a Function

<!-- TODO: change the link to otc hosted zip file -->
Create a function for extracting logs and upload the [sample code](https://functionstage-examples.obs.cn-north-1.myhuaweicloud.com/fss_examples_logstore_warning.zip) package. For details, see [Function Management](https://docs.otc.t-systems.com/function-graph/umn/function_management.html).
Create a function for extracting the logs (as shown in the Python snippet below) and upload the code package. For details, see [Function Management](https://docs.otc.t-systems.com/function-graph/umn/function_management.html).

```python
import json
import base64
import sys
import os
import requests

from obs import *

current_file_path = os.path.dirname(os.path.realpath(__file__))
# append current path to search paths, so that we can import some third party libraries.
sys.path.append(current_file_path)

TEMP_ROOT_PATH = "/tmp/"
region = 'eu-de'
secure = True
signature = 'v4'
port = 443
path_style = True


def GetObject(obsAddr, bucketName, objName, ak, sk):
TestObs = ObsClient(access_key_id=ak, secret_access_key=sk,
is_secure=secure, server=obsAddr, signature=signature, path_style=path_style, region=region,
ssl_verify=False, port=port,
max_retry_count=5, timeout=20)

LobjectRequest = GetObjectRequest(content_type='application/zip', content_language=None, expires=None,
cache_control=None, content_disposition=None, content_encoding=None,
versionId=None)

Lheaders = GetObjectHeader(range='', if_modified_since=None, if_unmodified_since=None, if_match=None,
if_none_match=None)

loadStreamInMemory = False
fileNamePath = TEMP_ROOT_PATH + objName
resp = TestObs.getObject(bucketName=bucketName, objectKey=objName, downloadPath=fileNamePath,
getObjectRequest=LobjectRequest, headers=Lheaders, loadStreamInMemory=loadStreamInMemory)

print('*** GetObject resp: ', resp)

return (int(resp.status))


def PostObject(obsAddr, bucket, objName, ak, sk):
obsClient = ObsClient(access_key_id=ak, secret_access_key=sk,
is_secure=secure, server=obsAddr, signature=signature, path_style=path_style, region=region,
ssl_verify=False, port=port,
max_retry_count=5, timeout=20)

Lheaders = PutObjectHeader(md5=None, acl='private', location=None, contentType='text/plain')

Lheaders.sseHeader = SseKmsHeader.getInstance()
h = PutObjectHeader()
Lmetadata = {'key': 'value'}

objPath = TEMP_ROOT_PATH + objName
resp = obsClient.putFile(bucketName=bucket, objectKey=objName, file_path=objPath,
metadata=Lmetadata, headers=h)
if isinstance(resp, list):
for k, v in resp:
print(
'PostObject, objectKey', k, 'common msg:status:', v.status, ',errorCode:', v.errorCode, ',errorMessage:',
v.errorMessage)
else:
print('*** PostObject, common msg: status:', resp.status, ',errorCode:', resp.errorCode, ',errorMessage:',
resp.errorMessage)

def checkAlarmLogs(logs):
alarmlogs = []
for log in logs:
logStr = json.dumps(log)
if "WARN" in logStr or "WRN" in logStr or "ERROR" in logStr or "ERR" in logStr:
alarmlogs.append(logStr)
return alarmlogs

def log_store(fileName, logs):
fileNamePath = TEMP_ROOT_PATH + fileName
with open(fileNamePath, 'a') as f:
print ("*** Open log file"+fileNamePath+" successfully")
for log in logs:
logStr = json.dumps(log).replace('\\','')
if "WARN" in logStr or "WRN" in logStr or "ERROR" in logStr or "ERR" in logStr:
f.write(logStr + '\n')
return fileName

def handler(event, context):
# Obtains the data of lts logs.
print ("*********the data of lts logs********")
encodingData = event["lts"]["data"]
data = base64.b64decode(encodingData) # Base64 decoding is required because the information of lts logs has been encoded.
text = json.loads(data)
# Output log group id
print ("*** log group id:"+text["log_group_id"])
# Output log topic id
print ("*** log topic id:"+text["log_topic_id"])
# Obtains lts log list
logs = text["logs"]
# Check whether the alarm log existed
alarmLogs = checkAlarmLogs(logs)
if len(alarmLogs) == 0:
print ("*** no need to send alarm message")
return "no need to send alarm message"

# send smn message
print ("***********send smn message **********")
SMN_Topic = context.getUserData('SMN_Topic')
if not SMN_Topic.strip():
print ('*** No SMN topic. Please configure "SMN_Topic" environment variable')
return "Environment variable configuration error"
projectId = context.getProjectID()
body = {"message" :"Get warning message.The content of message is:"+json.dumps(alarmLogs).replace('\\',''), "subject":"TimeTrigger"}
headers = {'X-Auth-Token': context.getToken(), "accept": "application/json"}
url= "https://smn." + region + ".myhuaweicloud.com/v2/"+projectId+"/notifications/topics/urn:smn:" + region + ":"+projectId+":"+SMN_Topic+"/publish"
response = requests.post(url,headers=headers,data=json.dumps(body))
if response.status_code != 200:
print ("*** send SMN message failed. resp statusCode:"+str(response.status_code))
return "send alarm message failed."
print ("*** send SMN message to terminal successfully")

print ("***********strore log file to obs bucket **********")
obs_address = context.getUserData('obs_address')
storeBucket = context.getUserData('obs_store_bucket')
storeObjName = context.getUserData('obs_store_objName')
if not obs_address.strip() or not storeBucket.strip() or not storeObjName.strip():
print ('*** No obs environment variable. Please configure obs environment variable')
return "Environment variable configuration error"

ak = context.getAccessKey()
sk = context.getSecretKey()
if not ak.strip() or not sk.strip():
print ("*** Can not get accessKey or secretKey. Please check authority of IAM service")
return "accessKey or secretKey error"

print ("*** obs_address: " + obs_address)
print ("*** store bucket: " + storeBucket)
print ("*** store objName: " + storeObjName)

# download file uploaded by user from obs
# if store object uploaded not exist, automatically create OBS object.
status = GetObject(obs_address, storeBucket, storeObjName, ak, sk)
if status != 200:
print ("*** get store object failed. Create a new OBS object:"+storeObjName)

outFile = log_store(storeObjName, alarmLogs)

# upload file to obs bucket
PostObject(obs_address, storeBucket, outFile, ak, sk)
return 'log store success'
```

This function analyzes received operation records, filters logins or logouts from unauthorized IP addresses using a whitelist, and sends alarms under a specified SMN topic.

Expand Down

0 comments on commit f189df1

Please sign in to comment.