import logging import random import re import string from collections.abc import MutableMapping from typing import Any from typing import cast from retry import retry from slack_sdk import WebClient from slack_sdk.errors import SlackApiError from slack_sdk.models.blocks import Block from slack_sdk.models.metadata import Metadata from danswer.configs.constants import ID_SEPARATOR from danswer.configs.danswerbot_configs import DANSWER_BOT_NUM_RETRIES from danswer.connectors.slack.utils import make_slack_api_rate_limited from danswer.connectors.slack.utils import SlackTextCleaner from danswer.danswerbot.slack.constants import SLACK_CHANNEL_ID from danswer.danswerbot.slack.tokens import fetch_tokens from danswer.utils.logger import setup_logger from danswer.utils.text_processing import replace_whitespaces_w_space logger = setup_logger() class ChannelIdAdapter(logging.LoggerAdapter): """This is used to add the channel ID to all log messages emitted in this file""" def process( self, msg: str, kwargs: MutableMapping[str, Any] ) -> tuple[str, MutableMapping[str, Any]]: channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None if channel_id: return f"[Channel ID: {channel_id}] {msg}", kwargs else: return msg, kwargs def get_web_client() -> WebClient: slack_tokens = fetch_tokens() return WebClient(token=slack_tokens.bot_token) @retry( tries=DANSWER_BOT_NUM_RETRIES, delay=0.25, backoff=2, logger=cast(logging.Logger, logger), ) def respond_in_thread( client: WebClient, channel: str, thread_ts: str | None, text: str | None = None, blocks: list[Block] | None = None, receiver_ids: list[str] | None = None, metadata: Metadata | None = None, unfurl: bool = True, ) -> None: if not text and not blocks: raise ValueError("One of `text` or `blocks` must be provided") if not receiver_ids: slack_call = make_slack_api_rate_limited(client.chat_postMessage) else: slack_call = make_slack_api_rate_limited(client.chat_postEphemeral) if not receiver_ids: response = slack_call( channel=channel, text=text, blocks=blocks, thread_ts=thread_ts, metadata=metadata, unfurl_links=unfurl, unfurl_media=unfurl, ) if not response.get("ok"): raise RuntimeError(f"Failed to post message: {response}") else: for receiver in receiver_ids: response = slack_call( channel=channel, user=receiver, text=text, blocks=blocks, thread_ts=thread_ts, metadata=metadata, unfurl_links=unfurl, unfurl_media=unfurl, ) if not response.get("ok"): raise RuntimeError(f"Failed to post message: {response}") def build_feedback_block_id( query_event_id: int, document_id: str | None = None, document_rank: int | None = None, ) -> str: unique_prefix = "".join(random.choice(string.ascii_letters) for _ in range(10)) if document_id is not None: if not document_id or document_rank is None: raise ValueError("Invalid document, missing information") if ID_SEPARATOR in document_id: raise ValueError( "Separator pattern should not already exist in document id" ) block_id = ID_SEPARATOR.join( [str(query_event_id), document_id, str(document_rank)] ) else: block_id = str(query_event_id) return unique_prefix + ID_SEPARATOR + block_id def decompose_block_id(block_id: str) -> tuple[int, str | None, int | None]: """Decompose into query_id, document_id, document_rank, see above function""" try: components = block_id.split(ID_SEPARATOR) if len(components) != 2 and len(components) != 4: raise ValueError("Block ID does not contain right number of elements") if len(components) == 2: return int(components[-1]), None, None return int(components[1]), components[2], int(components[3]) except Exception as e: logger.error(e) raise ValueError("Received invalid Feedback Block Identifier") def translate_vespa_highlight_to_slack(match_strs: list[str], used_chars: int) -> str: def _replace_highlight(s: str) -> str: s = re.sub(r"(?<=[^\s])(.*?)", r"\1", s) s = s.replace("", "*").replace("", "*") return s final_matches = [ replace_whitespaces_w_space(_replace_highlight(match_str)).strip() for match_str in match_strs if match_str ] combined = "... ".join(final_matches) # Slack introduces "Show More" after 300 on desktop which is ugly # But don't trim the message if there is still a highlight after 300 chars remaining = 300 - used_chars if len(combined) > remaining and "*" not in combined[remaining:]: combined = combined[: remaining - 3] + "..." return combined def remove_slack_text_interactions(slack_str: str) -> str: slack_str = SlackTextCleaner.replace_tags_basic(slack_str) slack_str = SlackTextCleaner.replace_channels_basic(slack_str) slack_str = SlackTextCleaner.replace_special_mentions(slack_str) slack_str = SlackTextCleaner.replace_links(slack_str) slack_str = SlackTextCleaner.replace_special_catchall(slack_str) slack_str = SlackTextCleaner.add_zero_width_whitespace_after_tag(slack_str) return slack_str def get_channel_from_id(client: WebClient, channel_id: str) -> dict[str, Any]: response = client.conversations_info(channel=channel_id) response.validate() return response["channel"] def get_channel_name_from_id( client: WebClient, channel_id: str ) -> tuple[str | None, bool]: try: channel_info = get_channel_from_id(client, channel_id) name = channel_info.get("name") is_dm = any([channel_info.get("is_im"), channel_info.get("is_mpim")]) return name, is_dm except SlackApiError as e: logger.exception(f"Couldn't fetch channel name from id: {channel_id}") raise e def fetch_userids_from_emails(user_emails: list[str], client: WebClient) -> list[str]: user_ids: list[str] = [] for email in user_emails: try: user = client.users_lookupByEmail(email=email) user_ids.append(user.data["user"]["id"]) # type: ignore except Exception: logger.error(f"Was not able to find slack user by email: {email}") if not user_ids: raise RuntimeError( "Was not able to find any Slack users to respond to. " "No email was parsed into a valid slack account." ) return user_ids