From 13f060dde214adeada2f2687575721f4d899ae1d Mon Sep 17 00:00:00 2001 From: "mikhail.trifonov" Date: Sun, 9 May 2021 07:40:17 +0300 Subject: [PATCH] support for many connections --- setup.py | 1 - slack_notifications.py | 157 +++++++++++++++++++++++------------------ 2 files changed, 87 insertions(+), 71 deletions(-) diff --git a/setup.py b/setup.py index 1f12111..1fce522 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from os import path from setuptools import setup diff --git a/slack_notifications.py b/slack_notifications.py index dbc3993..2f9d624 100644 --- a/slack_notifications.py +++ b/slack_notifications.py @@ -1,23 +1,15 @@ -# -*- coding: utf8 -*- - import os import logging from typing import List, Union import requests - logger = logging.getLogger(__name__) +ACCESS_TOKEN = None ACCESS_TOKEN_ENV_NAME = 'SLACK_ACCESS_TOKEN' -SLACK_API_URL = 'https://slack.com/api' -ACCESS_TOKEN = os.getenv(ACCESS_TOKEN_ENV_NAME) - -DEFAULT_RECORDS_LIMIT = 100 -DEFAULT_REQUEST_TIMEOUT = 180 - COLOR_MAP = { 'green': '#008000', 'gray': '#808080', @@ -375,89 +367,114 @@ def init_color(name, code): COLOR_MAP[name] = code -def call_resource(resource: Resource, *, raise_exc: bool = False, **kwargs): - assert ACCESS_TOKEN is not None, f'Please export "{ACCESS_TOKEN_ENV_NAME}" environment variable' +class Slack(requests.Session): + API_URL = 'https://slack.com/api' + + DEFAULT_RECORDS_LIMIT = 100 + DEFAULT_REQUEST_TIMEOUT = 180 + + def __init__(self, token): + super(Slack, self).__init__() - kwargs.setdefault('timeout', DEFAULT_REQUEST_TIMEOUT) + self.headers['Authorization'] = f'Bearer {token}' + self.headers['Content-Type'] = 'application/json; charset=utf-8' - url = f'{SLACK_API_URL}/{resource.handle}' + @classmethod + def from_env(cls): + token = ACCESS_TOKEN or os.getenv(ACCESS_TOKEN_ENV_NAME) + assert token is not None, f'Please export "{ACCESS_TOKEN_ENV_NAME}" environment variable' + return cls(token) - kwargs['headers'] = { - 'Authorization': f'Bearer {ACCESS_TOKEN}', - 'Content-Type': 'application/json; charset=utf-8', - } + def call_resource(self, resource: Resource, *, raise_exc: bool = False, **kwargs): + kwargs.setdefault('timeout', self.DEFAULT_REQUEST_TIMEOUT) - response = requests.request(resource.method, url, **kwargs) + url = f'{self.API_URL}/{resource.handle}' + response = self.request(resource.method, url, **kwargs) - logger.debug(response.content) + logger.debug(response.content) - if raise_exc: - response.raise_for_status() + if raise_exc: + response.raise_for_status() - json = response.json() + json = response.json() - if not json['ok']: - logger.error(response.content) - raise SlackError(response=response) + if not json['ok']: + logger.error(response.content) + raise SlackError(response=response) - return response + return response + def resource_iterator(self, + resource: Resource, from_key: str, *, + cursor: str = None, + raise_exc: bool = False, + limit: int = None): + params = {'limit': limit} -def resource_iterator(resource: Resource, from_key: str, *, - cursor: str = None, - raise_exc: bool = False, - limit: int = DEFAULT_RECORDS_LIMIT): - params = {'limit': limit} + if cursor: + params['cursor'] = cursor - if cursor: - params['cursor'] = cursor + response = self.call_resource(resource, params=params, raise_exc=raise_exc) + data = response.json() + + for item in data[from_key]: + yield item + + cursor = data.get('response_metadata', {}).get('next_cursor') + + if cursor: + yield from self.resource_iterator( + resource, from_key, + limit=limit or self.DEFAULT_RECORDS_LIMIT, cursor=cursor, raise_exc=raise_exc, + ) + + def send_notify(self, + channel, *, + text: str = None, + username: str = None, + icon_url: str = None, + icon_emoji: str = None, + link_names: bool = True, + raise_exc: bool = False, + attachments: List[Attachment] = None, + blocks: List[BaseBlock] = None): + data = { + 'channel': channel, + 'link_names': link_names, + } - response = call_resource(resource, params=params, raise_exc=raise_exc) - data = response.json() + if username: + data['as_user'] = False + data['username'] = username + else: + data['as_user'] = True - for item in data[from_key]: - yield item + if text: + data['mrkdwn'] = True + data['text'] = text - cursor = data.get('response_metadata', {}).get('next_cursor') + if icon_url and not data['as_user']: + data['icon_url'] = icon_url - if cursor: - yield from resource_iterator(resource, from_key, limit=limit, cursor=cursor, raise_exc=raise_exc) + if icon_emoji and not data['as_user']: + data['icon_emoji'] = icon_emoji + if blocks: + data['blocks'] = [b.to_dict() for b in blocks] -def send_notify(channel, *, - text: str = None, - username: str = None, - icon_url: str = None, - icon_emoji: str = None, - link_names: bool = True, - raise_exc: bool = False, - attachments: List[Attachment] = None, - blocks: List[BaseBlock] = None): - data = { - 'channel': channel, - 'link_names': link_names, - } + if attachments: + data['attachments'] = [a.to_dict() for a in attachments] - if username: - data['as_user'] = False - data['username'] = username - else: - data['as_user'] = True + return self.call_resource(Resource('chat.postMessage', 'POST'), raise_exc=raise_exc, json=data) - if text: - data['mrkdwn'] = True - data['text'] = text - if icon_url and not data['as_user']: - data['icon_url'] = icon_url +def call_resource(*args, **kwargs): + return Slack.from_env().call_resource(*args, **kwargs) - if icon_emoji and not data['as_user']: - data['icon_emoji'] = icon_emoji - if blocks: - data['blocks'] = [b.to_dict() for b in blocks] +def resource_iterator(*args, **kwargs): + return Slack.from_env().resource_iterator(*args, **kwargs) - if attachments: - data['attachments'] = [a.to_dict() for a in attachments] - return call_resource(Resource('chat.postMessage', 'POST'), raise_exc=raise_exc, json=data) +def send_notify(*args, **kwargs): + return Slack.from_env().send_notify(*args, **kwargs)