SurfSense/surfsense_backend/app/services/streaming_service.py
DESKTOP-RTLN3BA\$punk 5045b7433a refactor: Move utility services to a dedicated 'services' module
- Updated import paths for LLM, connector, query, and streaming services to reflect their new location in the 'services' module.
- Removed obsolete utility service files that have been migrated.
2025-07-06 17:51:24 -07:00

72 lines
No EOL
2.1 KiB
Python

import json
from typing import Any, Dict, List
class StreamingService:
def __init__(self):
self.terminal_idx = 1
self.message_annotations = [
{
"type": "TERMINAL_INFO",
"content": []
},
{
"type": "SOURCES",
"content": []
},
{
"type": "ANSWER",
"content": []
}
]
# It is used to send annotations to the frontend
def _format_annotations(self) -> str:
"""
Format the annotations as a string
Returns:
str: The formatted annotations string
"""
return f'8:{json.dumps(self.message_annotations)}\n'
# It is used to end Streaming
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
"""
Format a completion message
Args:
prompt_tokens: Number of prompt tokens
completion_tokens: Number of completion tokens
Returns:
str: The formatted completion string
"""
total_tokens = prompt_tokens + completion_tokens
completion_data = {
"finishReason": "stop",
"usage": {
"promptTokens": prompt_tokens,
"completionTokens": completion_tokens,
"totalTokens": total_tokens
}
}
return f'd:{json.dumps(completion_data)}\n'
def only_update_terminal(self, text: str, message_type: str = "info") -> str:
self.message_annotations[0]["content"].append({
"id": self.terminal_idx,
"text": text,
"type": message_type
})
self.terminal_idx += 1
return self.message_annotations
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
self.message_annotations[1]["content"] = sources
return self.message_annotations
def only_update_answer(self, answer: List[str]) -> str:
self.message_annotations[2]["content"] = answer
return self.message_annotations