Skip to content

Commit

Permalink
Update chunkers.py with minor cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Aakanksha Duggal <[email protected]>
  • Loading branch information
aakankshaduggal committed Jan 7, 2025
1 parent 309fd11 commit 0b9a2fd
Showing 1 changed file with 1 addition and 143 deletions.
144 changes: 1 addition & 143 deletions src/instructlab/sdg/utils/chunkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,148 +80,6 @@ def split_docs_by_filetype(document_paths: List[Path]) -> Dict[str, List[Path]]:
return dict(document_dict)


# class DocumentChunker:
# """A factory chunker class that instantiates the applicable chunker

# Currently, only Markdown and PDF are supported. For Markdown, returns
# TextSplitChunker, and for PDF, returns ContextAwareChunker"""

# def __new__(
# cls,
# doc_filepaths: List[Path],
# output_dir: Path,
# server_ctx_size=4096,
# chunk_word_count=1024,
# tokenizer_model_name: Optional[str] = None,
# docling_model_path: Optional[str] = None,
# ):
# """Insantiate the appropriate chunker for the provided document

# Args:
# leaf_node: a leaf node dict containing "documents",
# "filepaths", and "taxonomy_path" keys
# output_dir (Path): directory where artifacts should be stored
# server_ctx_size (int): Context window size of server
# chunk_word_count (int): Maximum number of words to chunk a document
# tokenizer_model_name (Optional[str]): name of huggingface model to get
# tokenizer from
# Returns:
# TextSplitChunker | ContextAwareChunker: Object of the appropriate
# chunker class for the provided filetype
# """
# documents = leaf_node[0]["documents"]

# if not isinstance(taxonomy_path, Path):
# taxonomy_path = Path(taxonomy_path)

# if isinstance(documents, str):
# documents = [documents]
# logger.info(
# "Converted single string into a list of string. Assumed the string passed in is the document. Normally, chunk_document() should take a list as input."
# )
# elif not isinstance(documents, list):
# raise TypeError(
# "Expected: documents to be a list, but got {}".format(type(documents))
# )

# filepaths = leaf_node[0]["filepaths"]

# doc_dict = cls._split_docs_by_filetype(documents, filepaths)
# if len(doc_dict.keys()) > 1:
# raise ValueError("Received multiple document types")
# if len(doc_dict.keys()) < 1:
# raise ValueError("Received no document types")

# if SupportedFileTypes.MD in doc_dict:
# doc_contents = [d for d, _ in doc_dict[SupportedFileTypes.MD]]
# # return TextSplitChunker(
# # doc_contents,
# # server_ctx_size,
# # chunk_word_count,
# # output_dir,
# # )

# # TODO CHUNK AS MARKDOWN
# pass

# if SupportedFileTypes.PDF in doc_dict:
# doc_paths = [p for _, p in doc_dict[SupportedFileTypes.PDF]]
# # return ContextAwareChunker(
# # doc_paths,
# # filepaths,
# # output_dir,
# # chunk_word_count,
# # tokenizer_model_name,
# # docling_model_path=docling_model_path,
# # )

# # TODO CHUNK AS PDF
# pass

# @staticmethod
# def _split_docs_by_filetype(
# documents: List[str], filepaths: List[Path]
# ) -> DefaultDict[SupportedFileTypes, List[Tuple[str, Path]]]:
# """Separate documents into lists based on their filetype.

# Currently, only Markdown and PDF are supported.
# Args:
# documents (List[str]): A list of the document contents as strings
# filepaths (List[Path]): Corresponding document filepaths
# Returns:
# DefaultDict: Dictionary with either ".md" or ".pdf" as a key.
# Markdown items contain document contents, PDF items contain
# paths to documents.
# """
# doc_dict = defaultdict(list)
# for doc, path in zip(documents, filepaths):
# if path.suffix == ".md":
# # append doc contents
# doc_dict[SupportedFileTypes.MD].append((doc, path))
# elif path.suffix == ".pdf":
# # append doc paths
# doc_dict[SupportedFileTypes.PDF].append((doc, path))
# else:
# raise ValueError(
# f"Received document of type .{path.suffix}, which is not a supported filetype"
# )
# return doc_dict


# class TextSplitChunker(ChunkerBase):
# def __init__(
# self,
# document_contents: List | str,
# server_ctx_size: int,
# chunk_word_count: int,
# output_dir: Path,
# ):
# self.document_contents = document_contents
# self.server_ctx_size = server_ctx_size
# self.chunk_word_count = chunk_word_count
# self.output_dir = output_dir

# def chunk_documents(self) -> List:
# """Naively chunk markdown documents based on the word count provided by the user.
# Returns:
# List[str]: List of chunked documents.
# """
# num_tokens_per_doc = _num_tokens_from_words(self.chunk_word_count)
# if num_tokens_per_doc > int(self.server_ctx_size - 1024):
# raise ValueError(
# "Error: {}".format(
# str(
# f"Given word count ({self.chunk_word_count}) per doc will exceed the server context window size ({self.server_ctx_size})"
# )
# )
# )
# if self.document_contents == []:
# return []

# chunk_size = _num_chars_from_tokens(num_tokens_per_doc)
# return chunk_markdowns(self.document_contents, chunk_size)


class DocumentChunker: # pylint: disable=too-many-instance-attributes
# def __new__(
# cls,
Expand All @@ -243,7 +101,7 @@ def __init__(
server_ctx_size: int = 4096,
chunk_word_count: int = 1024,
):
if len(document_paths) == 0:
if not document_paths:
raise ValueError("Provided empty list of documents")

document_dict = split_docs_by_filetype(document_paths)
Expand Down

0 comments on commit 0b9a2fd

Please sign in to comment.