From b5e3a091dd32686d7c1652e9d8495e5ff1c1186e Mon Sep 17 00:00:00 2001 From: Stephen Bailey <7850417+stkbailey@users.noreply.github.com> Date: Mon, 25 Oct 2021 21:59:30 -0400 Subject: [PATCH] Add auto-join functionality (#3) * Add join_stream logic to Channels stream * Update state partitioning keys and add more fields to post_process --- tap_slack/streams.py | 36 ++++++++++++++++++++++++++++++++---- tap_slack/tap.py | 6 ++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/tap_slack/streams.py b/tap_slack/streams.py index 52ebd1e..c130a0a 100644 --- a/tap_slack/streams.py +++ b/tap_slack/streams.py @@ -1,4 +1,5 @@ """Stream type classes for tap-slack.""" +import requests from datetime import datetime, timezone, timedelta from typing import Any, Dict, Optional, Iterable @@ -26,6 +27,25 @@ def get_url_params(self, context, next_page_token): params["types"] = ",".join(self.config["channel_types"]) return params + def post_process(self, row, context): + "Join the channel if not a member, but emit no data." + row = super().post_process(row, context) + if not row["is_member"]: + self._join_channel(row["id"]) + return row + + def _join_channel(self, channel_id: str) -> requests.Response: + url = f"{self.url_base}/conversations.join" + params = {"channel": channel_id} + response = self.requests_session.post( + url=url, + params=params, + headers=self.authenticator.auth_headers + ) + if not response.json().get("ok"): + self.logger.warning("Error joining channel %s: %s", response.json().get("error")) + self.logger.info("Successfully joined channel: %s", channel_id) + class ChannelMembersStream(SlackStream): name = "channel_members" @@ -36,11 +56,17 @@ class ChannelMembersStream(SlackStream): schema = schemas.channel_members ignore_parent_replication_keys = True + state_partitioning_keys = [] def parse_response(self, response): user_list = extract_jsonpath(self.records_jsonpath, input=response.json()) yield from ({"member_id": ii} for ii in user_list) + def post_process(self, row, context=None): + row = super().post_process(row, context=context) + row["channel_id"] = context.get("channel_id") + return row + class MessagesStream(SlackStream): name = "messages" @@ -115,10 +141,12 @@ class ThreadsStream(SlackStream): max_requests_per_minute = 50 schema = schemas.threads - @property - def state_partitioning_keys(self): - "Remove thread_ts to prevent state logging for individual threads." - return ["channel_id"] + state_partitioning_keys = [] + + def post_process(self, row, context=None): + row = super().post_process(row, context=context) + row["channel_id"] = context.get("channel_id") + return row class UsersStream(SlackStream): diff --git a/tap_slack/tap.py b/tap_slack/tap.py index c1eae78..f38a5ce 100644 --- a/tap_slack/tap.py +++ b/tap_slack/tap.py @@ -52,6 +52,12 @@ class TapSlack(Tap): default=["public_channel"], description="The types of conversations the tap will attempt to extract data from. Must be one of 'public_channel', 'mpim', 'private_channel', or 'im'. Note that the Slack app must have the appropriate privileges and be a member of the conversations to sync messages.", ), + th.Property( + "auto_join_channels", + th.BooleanType, + default=True, + description="Whether the bot user should attempt to join channels that it has not yet joined. The bot user must be a member of the channel to retrieve messages.", + ), ).to_dict() def discover_streams(self) -> List[Stream]: