mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-01 10:09:08 +00:00
Fix: Handle Slack API rate limiting for conversations.list
The `get_all_channels` method in `slack_history.py` was making paginated requests to `conversations.list` without any delay, leading to HTTP 429 errors when fetching channels from large Slack workspaces. This commit introduces the following changes: - Adds a 3-second delay between paginated calls to `conversations.list` to comply with Slack's Tier 2 rate limits (approx. 20 requests/minute). - Implements handling for the `Retry-After` header when a 429 error is received. The system will wait for the specified duration before retrying. If the header is missing or invalid, a default of 60 seconds is used. - Adds comprehensive unit tests to verify the new delay and retry logic, covering scenarios with and without the `Retry-After` header, as well as other API errors.
This commit is contained in:
parent
5fef58bdf9
commit
807c83b2f6
2 changed files with 250 additions and 27 deletions
|
@ -6,6 +6,7 @@ Allows fetching channel lists and message history with date range filtering.
|
|||
"""
|
||||
|
||||
import os
|
||||
import time # Added import
|
||||
from slack_sdk import WebClient
|
||||
from slack_sdk.errors import SlackApiError
|
||||
from datetime import datetime, timedelta
|
||||
|
@ -35,7 +36,7 @@ class SlackHistory:
|
|||
|
||||
def get_all_channels(self, include_private: bool = True) -> Dict[str, str]:
|
||||
"""
|
||||
Fetch all channels that the bot has access to.
|
||||
Fetch all channels that the bot has access to, with rate limit handling.
|
||||
|
||||
Args:
|
||||
include_private: Whether to include private channels
|
||||
|
@ -45,7 +46,8 @@ class SlackHistory:
|
|||
|
||||
Raises:
|
||||
ValueError: If no Slack client has been initialized
|
||||
SlackApiError: If there's an error calling the Slack API
|
||||
SlackApiError: If there's an unrecoverable error calling the Slack API
|
||||
RuntimeError: For unexpected errors during channel fetching.
|
||||
"""
|
||||
if not self.client:
|
||||
raise ValueError("Slack client not initialized. Call set_token() first.")
|
||||
|
@ -54,35 +56,58 @@ class SlackHistory:
|
|||
types = "public_channel"
|
||||
if include_private:
|
||||
types += ",private_channel"
|
||||
|
||||
try:
|
||||
# Call the conversations.list method
|
||||
result = self.client.conversations_list(
|
||||
types=types,
|
||||
limit=1000 # Maximum allowed by API
|
||||
)
|
||||
channels = result["channels"]
|
||||
|
||||
# Handle pagination for workspaces with many channels
|
||||
while result.get("response_metadata", {}).get("next_cursor"):
|
||||
next_cursor = result["response_metadata"]["next_cursor"]
|
||||
|
||||
# Get the next batch of channels
|
||||
result = self.client.conversations_list(
|
||||
|
||||
next_cursor = None
|
||||
is_first_request = True
|
||||
|
||||
while is_first_request or next_cursor:
|
||||
try:
|
||||
if not is_first_request: # Add delay only for paginated requests
|
||||
print(f"Paginating for channels, waiting 3 seconds before next call. Cursor: {next_cursor}")
|
||||
time.sleep(3)
|
||||
|
||||
current_limit = 1000 # Max limit
|
||||
api_result = self.client.conversations_list(
|
||||
types=types,
|
||||
cursor=next_cursor,
|
||||
limit=1000
|
||||
limit=current_limit
|
||||
)
|
||||
channels.extend(result["channels"])
|
||||
|
||||
# Create a dictionary mapping channel names to IDs
|
||||
for channel in channels:
|
||||
channels_dict[channel["name"]] = channel["id"]
|
||||
|
||||
return channels_dict
|
||||
|
||||
channels_on_page = api_result["channels"]
|
||||
for channel in channels_on_page:
|
||||
# Ensure channel name and id exist, as per observed Slack API behavior
|
||||
if "name" in channel and "id" in channel:
|
||||
channels_dict[channel["name"]] = channel["id"]
|
||||
else:
|
||||
# Handle cases where a channel might be missing a name or ID
|
||||
# This could be logged or handled as per specific requirements
|
||||
print(f"Warning: Channel found with missing name or id. Data: {channel}")
|
||||
|
||||
|
||||
next_cursor = api_result.get("response_metadata", {}).get("next_cursor")
|
||||
is_first_request = False # Subsequent requests are not the first
|
||||
|
||||
if not next_cursor: # All pages processed
|
||||
break
|
||||
|
||||
except SlackApiError as e:
|
||||
if e.response is not None and e.response.status_code == 429:
|
||||
retry_after_header = e.response.headers.get('Retry-After')
|
||||
wait_duration = 60 # Default wait time
|
||||
if retry_after_header and retry_after_header.isdigit():
|
||||
wait_duration = int(retry_after_header)
|
||||
|
||||
print(f"Slack API rate limit hit. Waiting for {wait_duration} seconds. Cursor: {next_cursor}")
|
||||
time.sleep(wait_duration)
|
||||
# The loop will continue, retrying with the same cursor
|
||||
else:
|
||||
# Not a 429 error, or no response object, re-raise
|
||||
raise SlackApiError(f"Error retrieving channels: {e}", e.response)
|
||||
except Exception as general_error:
|
||||
# Handle other potential errors like network issues if necessary, or re-raise
|
||||
raise RuntimeError(f"An unexpected error occurred during channel fetching: {general_error}")
|
||||
|
||||
except SlackApiError as e:
|
||||
raise SlackApiError(f"Error retrieving channels: {e}", e.response)
|
||||
return channels_dict
|
||||
|
||||
def get_conversation_history(
|
||||
self,
|
||||
|
|
198
surfsense_backend/app/connectors/test_slack_history.py
Normal file
198
surfsense_backend/app/connectors/test_slack_history.py
Normal file
|
@ -0,0 +1,198 @@
|
|||
import unittest
|
||||
import time # Imported to be available for patching target module
|
||||
from unittest.mock import patch, Mock, call
|
||||
from slack_sdk.errors import SlackApiError
|
||||
|
||||
# Since test_slack_history.py is in the same directory as slack_history.py
|
||||
from .slack_history import SlackHistory
|
||||
|
||||
class TestSlackHistoryGetAllChannels(unittest.TestCase):
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient') # Patches where WebClient is looked up when SlackHistory instantiates it
|
||||
def test_get_all_channels_pagination_with_delay(self, MockWebClient, mock_sleep):
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
page1_response = {
|
||||
"channels": [{"name": "general", "id": "C1"}, {"name": "dev", "id": "C0"}], # Added one more channel
|
||||
"response_metadata": {"next_cursor": "cursor123"}
|
||||
}
|
||||
page2_response = {
|
||||
"channels": [{"name": "random", "id": "C2"}],
|
||||
"response_metadata": {"next_cursor": ""}
|
||||
}
|
||||
|
||||
mock_client_instance.conversations_list.side_effect = [
|
||||
page1_response,
|
||||
page2_response
|
||||
]
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
channels = slack_history.get_all_channels(include_private=True) # Explicitly True
|
||||
|
||||
self.assertEqual(len(channels), 3) # Adjusted for 3 channels
|
||||
self.assertEqual(channels["general"], "C1")
|
||||
self.assertEqual(channels["dev"], "C0")
|
||||
self.assertEqual(channels["random"], "C2")
|
||||
|
||||
expected_calls = [
|
||||
call(types="public_channel,private_channel", cursor=None, limit=1000),
|
||||
call(types="public_channel,private_channel", cursor="cursor123", limit=1000)
|
||||
]
|
||||
mock_client_instance.conversations_list.assert_has_calls(expected_calls)
|
||||
self.assertEqual(mock_client_instance.conversations_list.call_count, 2)
|
||||
|
||||
mock_sleep.assert_called_once_with(3)
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient')
|
||||
def test_get_all_channels_rate_limit_with_retry_after(self, MockWebClient, mock_sleep):
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
mock_error_response = Mock()
|
||||
mock_error_response.status_code = 429
|
||||
mock_error_response.headers = {'Retry-After': '5'}
|
||||
|
||||
successful_response = {
|
||||
"channels": [{"name": "general", "id": "C1"}],
|
||||
"response_metadata": {"next_cursor": ""}
|
||||
}
|
||||
|
||||
mock_client_instance.conversations_list.side_effect = [
|
||||
SlackApiError(message="ratelimited", response=mock_error_response),
|
||||
successful_response
|
||||
]
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
channels = slack_history.get_all_channels(include_private=True)
|
||||
|
||||
self.assertEqual(len(channels), 1)
|
||||
self.assertEqual(channels["general"], "C1")
|
||||
mock_sleep.assert_called_once_with(5)
|
||||
|
||||
expected_calls = [
|
||||
call(types="public_channel,private_channel", cursor=None, limit=1000), # First attempt
|
||||
call(types="public_channel,private_channel", cursor=None, limit=1000) # Retry attempt
|
||||
]
|
||||
mock_client_instance.conversations_list.assert_has_calls(expected_calls)
|
||||
self.assertEqual(mock_client_instance.conversations_list.call_count, 2)
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient')
|
||||
def test_get_all_channels_rate_limit_no_retry_after_valid_header(self, MockWebClient, mock_sleep):
|
||||
# Test case for when Retry-After is not a digit
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
mock_error_response = Mock()
|
||||
mock_error_response.status_code = 429
|
||||
mock_error_response.headers = {'Retry-After': 'invalid_value'} # Non-digit value
|
||||
|
||||
successful_response = {
|
||||
"channels": [{"name": "general", "id": "C1"}],
|
||||
"response_metadata": {"next_cursor": ""}
|
||||
}
|
||||
|
||||
mock_client_instance.conversations_list.side_effect = [
|
||||
SlackApiError(message="ratelimited", response=mock_error_response),
|
||||
successful_response
|
||||
]
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
channels = slack_history.get_all_channels(include_private=True)
|
||||
|
||||
self.assertEqual(channels["general"], "C1")
|
||||
mock_sleep.assert_called_once_with(60) # Default fallback
|
||||
self.assertEqual(mock_client_instance.conversations_list.call_count, 2)
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient')
|
||||
def test_get_all_channels_rate_limit_no_retry_after_header(self, MockWebClient, mock_sleep):
|
||||
# Test case for when Retry-After header is missing
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
mock_error_response = Mock()
|
||||
mock_error_response.status_code = 429
|
||||
mock_error_response.headers = {} # No Retry-After header
|
||||
|
||||
successful_response = {
|
||||
"channels": [{"name": "general", "id": "C1"}],
|
||||
"response_metadata": {"next_cursor": ""}
|
||||
}
|
||||
|
||||
mock_client_instance.conversations_list.side_effect = [
|
||||
SlackApiError(message="ratelimited", response=mock_error_response),
|
||||
successful_response
|
||||
]
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
channels = slack_history.get_all_channels(include_private=True)
|
||||
|
||||
self.assertEqual(channels["general"], "C1")
|
||||
mock_sleep.assert_called_once_with(60) # Default fallback
|
||||
self.assertEqual(mock_client_instance.conversations_list.call_count, 2)
|
||||
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient')
|
||||
def test_get_all_channels_other_slack_api_error(self, MockWebClient, mock_sleep):
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
mock_error_response = Mock()
|
||||
mock_error_response.status_code = 500
|
||||
mock_error_response.headers = {}
|
||||
mock_error_response.data = {"ok": False, "error": "internal_error"} # Mocking response.data
|
||||
|
||||
original_error = SlackApiError(message="server error", response=mock_error_response)
|
||||
mock_client_instance.conversations_list.side_effect = original_error
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
|
||||
with self.assertRaises(SlackApiError) as context:
|
||||
slack_history.get_all_channels(include_private=True)
|
||||
|
||||
# Check if the raised exception is the same one or has the same properties
|
||||
self.assertEqual(context.exception.response.status_code, 500)
|
||||
self.assertIn("server error", str(context.exception))
|
||||
mock_sleep.assert_not_called()
|
||||
mock_client_instance.conversations_list.assert_called_once_with(
|
||||
types="public_channel,private_channel", cursor=None, limit=1000
|
||||
)
|
||||
|
||||
@patch('surfsense_backend.app.connectors.slack_history.time.sleep')
|
||||
@patch('slack_sdk.WebClient')
|
||||
def test_get_all_channels_handles_missing_name_id_gracefully(self, MockWebClient, mock_sleep):
|
||||
mock_client_instance = MockWebClient.return_value
|
||||
|
||||
# Channel missing 'name', channel missing 'id', valid channel
|
||||
response_with_malformed_data = {
|
||||
"channels": [
|
||||
{"id": "C1_missing_name"},
|
||||
{"name": "channel_missing_id"},
|
||||
{"name": "general", "id": "C2_valid"}
|
||||
],
|
||||
"response_metadata": {"next_cursor": ""}
|
||||
}
|
||||
|
||||
mock_client_instance.conversations_list.return_value = response_with_malformed_data
|
||||
|
||||
slack_history = SlackHistory(token="fake_token")
|
||||
# Patch print to check for warning messages
|
||||
with patch('builtins.print') as mock_print:
|
||||
channels = slack_history.get_all_channels(include_private=True)
|
||||
|
||||
self.assertEqual(len(channels), 1) # Only the valid channel should be included
|
||||
self.assertIn("general", channels)
|
||||
self.assertEqual(channels["general"], "C2_valid")
|
||||
|
||||
# Assert that warnings were printed for malformed channel data
|
||||
self.assertGreaterEqual(mock_print.call_count, 2) # At least two warnings
|
||||
mock_print.assert_any_call("Warning: Channel found with missing name or id. Data: {'id': 'C1_missing_name'}")
|
||||
mock_print.assert_any_call("Warning: Channel found with missing name or id. Data: {'name': 'channel_missing_id'}")
|
||||
|
||||
mock_sleep.assert_not_called() # No pagination, so no sleep
|
||||
mock_client_instance.conversations_list.assert_called_once_with(
|
||||
types="public_channel,private_channel", cursor=None, limit=1000
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Add table
Reference in a new issue