Skip to content

Commit

Permalink
Add auto-join functionality (#3)
Browse files Browse the repository at this point in the history
* Add join_stream logic to Channels stream

* Update state partitioning keys and add more fields to post_process
  • Loading branch information
stkbailey authored Oct 26, 2021
1 parent 7892c39 commit b5e3a09
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
36 changes: 32 additions & 4 deletions tap_slack/streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Stream type classes for tap-slack."""
import requests

from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional, Iterable
Expand Down Expand Up @@ -26,6 +27,25 @@ def get_url_params(self, context, next_page_token):
params["types"] = ",".join(self.config["channel_types"])
return params

def post_process(self, row, context):
"Join the channel if not a member, but emit no data."
row = super().post_process(row, context)
if not row["is_member"]:
self._join_channel(row["id"])
return row

def _join_channel(self, channel_id: str) -> requests.Response:
url = f"{self.url_base}/conversations.join"
params = {"channel": channel_id}
response = self.requests_session.post(
url=url,
params=params,
headers=self.authenticator.auth_headers
)
if not response.json().get("ok"):
self.logger.warning("Error joining channel %s: %s", response.json().get("error"))
self.logger.info("Successfully joined channel: %s", channel_id)


class ChannelMembersStream(SlackStream):
name = "channel_members"
Expand All @@ -36,11 +56,17 @@ class ChannelMembersStream(SlackStream):
schema = schemas.channel_members

ignore_parent_replication_keys = True
state_partitioning_keys = []

def parse_response(self, response):
user_list = extract_jsonpath(self.records_jsonpath, input=response.json())
yield from ({"member_id": ii} for ii in user_list)

def post_process(self, row, context=None):
row = super().post_process(row, context=context)
row["channel_id"] = context.get("channel_id")
return row


class MessagesStream(SlackStream):
name = "messages"
Expand Down Expand Up @@ -115,10 +141,12 @@ class ThreadsStream(SlackStream):
max_requests_per_minute = 50
schema = schemas.threads

@property
def state_partitioning_keys(self):
"Remove thread_ts to prevent state logging for individual threads."
return ["channel_id"]
state_partitioning_keys = []

def post_process(self, row, context=None):
row = super().post_process(row, context=context)
row["channel_id"] = context.get("channel_id")
return row


class UsersStream(SlackStream):
Expand Down
6 changes: 6 additions & 0 deletions tap_slack/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class TapSlack(Tap):
default=["public_channel"],
description="The types of conversations the tap will attempt to extract data from. Must be one of 'public_channel', 'mpim', 'private_channel', or 'im'. Note that the Slack app must have the appropriate privileges and be a member of the conversations to sync messages.",
),
th.Property(
"auto_join_channels",
th.BooleanType,
default=True,
description="Whether the bot user should attempt to join channels that it has not yet joined. The bot user must be a member of the channel to retrieve messages.",
),
).to_dict()

def discover_streams(self) -> List[Stream]:
Expand Down

0 comments on commit b5e3a09

Please sign in to comment.