mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-25 06:27:38 +00:00
187 lines
6.7 KiB
Python
187 lines
6.7 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
import re
|
|
from typing import TYPE_CHECKING, List, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from unstructured.documents.elements import Element
|
|
|
|
from camel.utils import get_model_encoding
|
|
|
|
from .base import BaseChunker
|
|
|
|
|
|
class CodeChunker(BaseChunker):
|
|
r"""A class for chunking code or text while respecting structure
|
|
and token limits.
|
|
|
|
This class ensures that structured elements such as functions,
|
|
classes, and regions are not arbitrarily split across chunks.
|
|
It also handles oversized lines and Base64-encoded images.
|
|
|
|
Attributes:
|
|
chunk_size (int, optional): The maximum token size per chunk.
|
|
(default: :obj:`8192`)
|
|
remove_image: (bool, optional): If the chunker should skip the images.
|
|
model_name (str, optional): The tokenizer model name used
|
|
for token counting. (default: :obj:`"cl100k_base"`)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
chunk_size: int = 8192,
|
|
model_name: str = "cl100k_base",
|
|
remove_image: Optional[bool] = True,
|
|
):
|
|
self.chunk_size = chunk_size
|
|
self.tokenizer = get_model_encoding(model_name)
|
|
self.remove_image = remove_image
|
|
self.struct_pattern = re.compile(
|
|
r'^\s*(?:(def|class|function)\s+\w+|'
|
|
r'(public|private|protected)\s+[\w<>]+\s+\w+\s*\(|'
|
|
r'\b(interface|enum|namespace)\s+\w+|'
|
|
r'#\s*(region|endregion)\b)'
|
|
)
|
|
self.image_pattern = re.compile(
|
|
r'!\[.*?\]\((?:data:image/[^;]+;base64,[a-zA-Z0-9+/]+=*|[^)]+)\)'
|
|
)
|
|
|
|
def count_tokens(self, text: str):
|
|
r"""Counts the number of tokens in the given text.
|
|
|
|
Args:
|
|
text (str): The input text to be tokenized.
|
|
|
|
Returns:
|
|
int: The number of tokens in the input text.
|
|
"""
|
|
return len(self.tokenizer.encode(text, disallowed_special=()))
|
|
|
|
def _split_oversized(self, line: str) -> List[str]:
|
|
r"""Splits an oversized line into multiple chunks based on token limits
|
|
|
|
Args:
|
|
line (str): The oversized line to be split.
|
|
|
|
Returns:
|
|
List[str]: A list of smaller chunks after splitting the
|
|
oversized line.
|
|
"""
|
|
tokens = self.tokenizer.encode(line, disallowed_special=())
|
|
chunks = []
|
|
buffer = []
|
|
current_count = 0
|
|
|
|
for token in tokens:
|
|
buffer.append(token)
|
|
current_count += 1
|
|
|
|
if current_count >= self.chunk_size:
|
|
chunks.append(self.tokenizer.decode(buffer).strip())
|
|
buffer = []
|
|
current_count = 0
|
|
|
|
if buffer:
|
|
chunks.append(self.tokenizer.decode(buffer))
|
|
return chunks
|
|
|
|
def chunk(self, content: List[str]) -> List["Element"]:
|
|
r"""Splits the content into smaller chunks while preserving
|
|
structure and adhering to token constraints.
|
|
|
|
Args:
|
|
content (List[str]): The content to be chunked.
|
|
|
|
Returns:
|
|
List[str]: A list of chunked text segments.
|
|
"""
|
|
from unstructured.documents.elements import Element, ElementMetadata
|
|
|
|
content_str = "\n".join(map(str, content))
|
|
chunks = []
|
|
current_chunk: list[str] = []
|
|
current_tokens = 0
|
|
struct_buffer: list[str] = []
|
|
struct_tokens = 0
|
|
|
|
for line in content_str.splitlines(keepends=True):
|
|
if self.remove_image:
|
|
if self.image_pattern.match(line):
|
|
continue
|
|
|
|
line_tokens = self.count_tokens(line)
|
|
|
|
if line_tokens > self.chunk_size:
|
|
if current_chunk:
|
|
chunks.append("".join(current_chunk))
|
|
current_chunk = []
|
|
current_tokens = 0
|
|
chunks.extend(self._split_oversized(line))
|
|
continue
|
|
|
|
if self.struct_pattern.match(line):
|
|
if struct_buffer:
|
|
if current_tokens + struct_tokens <= self.chunk_size:
|
|
current_chunk.extend(struct_buffer)
|
|
current_tokens += struct_tokens
|
|
else:
|
|
if current_chunk:
|
|
chunks.append("".join(current_chunk))
|
|
current_chunk = struct_buffer.copy()
|
|
current_tokens = struct_tokens
|
|
struct_buffer = []
|
|
struct_tokens = 0
|
|
|
|
struct_buffer.append(line)
|
|
struct_tokens += line_tokens
|
|
else:
|
|
if struct_buffer:
|
|
struct_buffer.append(line)
|
|
struct_tokens += line_tokens
|
|
else:
|
|
if current_tokens + line_tokens > self.chunk_size:
|
|
chunks.append("".join(current_chunk))
|
|
current_chunk = [line]
|
|
current_tokens = line_tokens
|
|
else:
|
|
current_chunk.append(line)
|
|
current_tokens += line_tokens
|
|
|
|
if struct_buffer:
|
|
if current_tokens + struct_tokens <= self.chunk_size:
|
|
current_chunk.extend(struct_buffer)
|
|
else:
|
|
if current_chunk:
|
|
chunks.append("".join(current_chunk))
|
|
current_chunk = struct_buffer
|
|
|
|
if current_chunk:
|
|
chunks.append("".join(current_chunk))
|
|
|
|
final_chunks = []
|
|
for chunk in chunks:
|
|
chunk_token = self.count_tokens(chunk)
|
|
if chunk_token > self.chunk_size:
|
|
final_chunks.extend(self._split_oversized(chunk))
|
|
else:
|
|
final_chunks.append(chunk)
|
|
|
|
# TODO: need to reconsider how to correctly form metadata (maybe need
|
|
# to decouple the connection with unstructuredIO)
|
|
chunked_elements = []
|
|
for chunk in final_chunks:
|
|
element = Element(metadata=ElementMetadata())
|
|
element.text = chunk
|
|
chunked_elements.append(element)
|
|
return chunked_elements
|