diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index dd8143f6..8fdff973 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -33,6 +33,7 @@ from instructlab.sdg.utils.taxonomy import ( leaf_node_to_samples, read_taxonomy_leaf_nodes, + kprintds, ) logger = logging.getLogger(__name__) @@ -299,7 +300,6 @@ def generate_data( "freeform_skills.yaml", and "grounded_skills.yaml". """ generate_start = time.time() - print(f"THIS IS KHALED: {model_name=}") # FIXME: remove this when ilab knows to pass batch_size=0 with llama.cpp if batch_size is None: @@ -373,12 +373,14 @@ def generate_data( if not samples: raise GenerateException("Error: No samples found in leaf node.") + + kprintds(samples, extra_info="in function generate_data, printing samples") - if samples[0].get("document"): + if "document" in samples.column_names: pipe = knowledge_pipe is_knowledge = True - elif samples[0].get("seed_context"): + elif "seed_context" in samples.column_names: pipe = grounded_skills_pipe else: @@ -386,10 +388,8 @@ def generate_data( logger.debug("Samples: %s", samples) - # TODO will already be a dataset at this point so refactor as needed - ds = Dataset.from_list(samples) - logger.debug("Dataset: %s", ds) - new_generated_data = pipe.generate(ds, leaf_node_path) + print('THIS IS KHALED WE GENERATING NOW') + new_generated_data = pipe.generate(samples, leaf_node_path) if len(new_generated_data) == 0: raise EmptyDatasetError( "Pipeline stopped: Empty dataset after running pipe" @@ -407,7 +407,7 @@ def generate_data( generate_eval_task_data( mmlu_bench_pipe, leaf_node_path, - ds, + samples, output_dir, date_suffix, ) diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index f349e5d5..52033e9d 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -145,9 +145,13 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset: # Separate checkpoints with sub directories checkpoint_dir = os.path.join(self.ctx.checkpoint_dir, checkpoint_name) + print("THIS IS KHALED IN PIPELINE GENERATE ABT TO CHECKPOINT") + checkpointer = Checkpointer(checkpoint_dir, self.ctx.save_freq) dataset, pre_generated_data = checkpointer.load(dataset) + print("THIS IS KHALED IN PIPELINE GENERATE SUCCESSFULLY CHECKPOINTED(?)") + # If not batching, simply delegate to _generate_single if not self.ctx.batching_enabled: logger.info("Running pipeline single-threaded") @@ -195,7 +199,6 @@ def _generate_single(self, dataset) -> Dataset: drop_duplicates_cols = block_prop.get("drop_duplicates", False) block = block_type(self.ctx, self, block_name, **block_config) logger.info("Running block: %s", block_name) - logger.info(dataset) # Execute the block and wrap errors with the block name/type dataset = block.generate(dataset) diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index 00d1e32a..42819028 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -129,7 +129,9 @@ def generate(self, samples: Dataset) -> Dataset: value_name=self.value_name, var_name=self.var_name, ) - return pandas.dataset_from_pandas_dataframe(flatten_df) + ds = pandas.dataset_from_pandas_dataframe(flatten_df) + logger.info(f"THIS IS KHALED: {ds=}") + return ds class DuplicateColumnsBlock(Block): @@ -162,6 +164,7 @@ def __init__(self, ctx, pipe, block_name: str, columns_map: dict) -> None: def generate(self, samples: Dataset): samples = samples.rename_columns(self.columns_map) + logger.info(f"THIS IS KHALED: {samples=}") return samples diff --git a/src/instructlab/sdg/utils/chunkers.py b/src/instructlab/sdg/utils/chunkers.py index e73aad53..1d4236af 100644 --- a/src/instructlab/sdg/utils/chunkers.py +++ b/src/instructlab/sdg/utils/chunkers.py @@ -135,7 +135,7 @@ def chunk_documents(self) -> Dataset: Returns: List[str]: List of chunked documents. """ - num_tokens_per_doc = self._num_tokens_from_words() + num_tokens_per_doc = self._num_tokens_from_words(self.chunk_word_count) if num_tokens_per_doc > int(self.server_ctx_size - 1024): raise ValueError( "Error: {}".format( @@ -144,7 +144,7 @@ def chunk_documents(self) -> Dataset: ) ) ) - if self.documents == []: + if self.document_contents == []: return [] # Placeholder for params @@ -160,7 +160,7 @@ def chunk_documents(self) -> Dataset: ) # Determine file type for heuristics, default with markdown - for doc in self.documents: + for doc in self.document_contents: # Use regex to remove unnecessary dashes in front of pipe characters in a markdown table. doc = re.sub(r"-{2,}\|", "-|", doc) # Remove unnecessary spaces in front of pipe characters in a markdown table. @@ -171,9 +171,11 @@ def chunk_documents(self) -> Dataset: return content - def _num_tokens_from_words(self) -> int: - return int(self.num_words * 1.3) # 1 word ~ 1.3 token + @staticmethod + def _num_tokens_from_words(num_words) -> int: + return int(num_words * 1.3) # 1 word ~ 1.3 token + @staticmethod def _num_chars_from_tokens(num_tokens) -> int: return int(num_tokens * 4) # 1 token ~ 4 English character @@ -187,7 +189,6 @@ def __init__( output_dir: Path, tokenizer_model_name=None, ): - print(f"THIS IS KHALED IN SEMANTIC CHUNKER: {leaf_node_path=}") self.document_paths = document_paths self.filepaths = filepaths self.leaf_node_path = leaf_node_path @@ -208,14 +209,11 @@ def chunk_documents(self) -> Dataset: """Semantically chunk PDF documents. Returns: - Dataset: A Dataset object containing the chunked documents + List: a list of chunks from the documents """ if self.document_paths == []: return [] - print(f"""THIS IS KHALED: CHUNKING PDF DOCS - {self.document_paths[0]=} - """) model_artifacts_path = DocumentConverter.download_models_hf() converter = DocumentConverter(artifacts_path=model_artifacts_path) inputs = DocumentConversionInput.from_paths(self.filepaths) @@ -224,12 +222,10 @@ def chunk_documents(self) -> Dataset: docling_artifacts_path = self.export_documents(parsed_documents) docling_json_paths = list(docling_artifacts_path.glob("*.json")) - print(f"THIS IS KHALED: {docling_json_paths=}") # TODO export to global function - chunk_datasets_lst = [self._process_parsed_docling_json(p) for p in docling_json_paths] - print(f"THIS IS KHALED: {chunk_datasets_lst=}") - chunks: Dataset = _safe_concatenate_datasets(chunk_datasets_lst) - print(f"THIS IS KHALED: {chunks=}") + chunks = [] + for p in docling_json_paths: + chunks.extend(self._process_parsed_docling_json(p)) return chunks @@ -269,7 +265,6 @@ def _process_parsed_docling_json(self, json_fp: Path) -> Dataset: List: a list of chunks built from the provided json file """ logger.info(f"Processing parsed docling json file: {json_fp}") - print(f"THIS IS KHALED: {json_fp=}") with open(json_fp, "r", encoding="utf-8") as f: data = json.load(f) @@ -279,6 +274,9 @@ def _process_parsed_docling_json(self, json_fp: Path) -> Dataset: max_token_per_chunk=500, tokenizer=self.tokenizer, ) + return self.fuse_texts(chunks, 200) + + # TODO remove chunks = self.fuse_texts(chunks, 200) return Dataset.from_dict( { @@ -552,10 +550,8 @@ def export_documents(self, converted_docs: Iterable[ConvertedDocument]): Returns: Path: path to directory with docling json artifacts """ - print(f"THIS IS KHALED IN EXPORT DOCUMENTS: {self.output_dir=}") docling_artifacts_path = self.output_dir / "docling-artifacts" docling_artifacts_path.mkdir(parents=True, exist_ok=True) - print(f"THIS IS KHALED IN EXPORT DOCUMENTS: {docling_artifacts_path=}") success_count = 0 failure_count = 0 @@ -583,6 +579,7 @@ def export_documents(self, converted_docs: Iterable[ConvertedDocument]): return docling_artifacts_path +# TODO move this somewhere def _safe_concatenate_datasets(datasets: List[Dataset]) -> Dataset | None: """ Concatenate datasets safely, ignoring any datasets that are None or empty. diff --git a/src/instructlab/sdg/utils/taxonomy.py b/src/instructlab/sdg/utils/taxonomy.py index 7c8a28cd..10e9dc72 100644 --- a/src/instructlab/sdg/utils/taxonomy.py +++ b/src/instructlab/sdg/utils/taxonomy.py @@ -378,47 +378,32 @@ def read_taxonomy_leaf_nodes(taxonomy, taxonomy_base, yaml_rules, document_outpu return leaf_nodes -def icl_mapping(chunked_dataset): - samples = [] - for record in chunked_dataset: - sample = { - "icl_document": record.get("icl_document", ""), - "document": record.get("document", ""), - "document_outline": record.get("document_outline", ""), - "domain": record.get("domain", ""), - "icl_query_1": record.get("icl_query_1", ""), - "icl_response_1": record.get("icl_response_1", ""), - "icl_query_2": record.get("icl_query_2", ""), - "icl_response_2": record.get("icl_response_2", ""), - "icl_query_3": record.get("icl_query_3", ""), - "icl_response_3": record.get("icl_response_3", ""), - } - samples.append(sample) - return samples - -def map_chunk_to_icls(chunk, leaf_node): +def map_chunks_to_icls(chunks: List, leaf_node: Dict) -> Dataset: chunked_dataset = [] # domain is the same for the whole leaf node - for icl_ in leaf_node: - qna_pairs = icl_.get("questions_and_answers", []) - for i, qna in enumerate(qna_pairs): - # TODO GET THESE FROM CHUNK DATASETS - # or just append icls to end of each entry - # TODO figure out if this is the right way to access a - # dataset object + domain = leaf_node[0].get("domain") + for chunk in chunks: + for icl_ in leaf_node: record = { + "document": chunk, "icl_document": icl_.get("context", ""), - "document": chunk.get("document", []), - "document_outline": chunk.get("document_outline", ""), - "domain": chunk.get("domain", ""), - f"icl_query_{i+1}": qna.get("question", ""), - f"icl_response_{i+1}": qna.get("answer", ""), + "document_outline": icl_.get("document_outline", ""), + "domain": domain, } - print(f"THIS IS KHALED IN map_chunk_to_icls: {record=}") + + qna_pairs = icl_.get("questions_and_answers", []) + for i, qna in enumerate(qna_pairs): + record.update({ + f"icl_query_{i+1}": qna.get("question", ""), + f"icl_response_{i+1}": qna.get("answer", ""), + }) + chunked_dataset.append(record) - return chunked_dataset + print(f"THIS IS KHALED IN map_chunk_to_icls: {record=}") + + return Dataset.from_list(chunked_dataset) def _knowledge_leaf_node_to_samples( @@ -434,11 +419,8 @@ def _knowledge_leaf_node_to_samples( chunks = chunker.chunk_documents() # TODO find a native datasets way of doing this - samples = [] - for chunk in chunks: - chunk_icl_mapping = map_chunk_to_icls(chunk, leaf_node) - print(f"THIS IS KHALED: {chunk_icl_mapping=}") - samples.extend(chunk_icl_mapping) + samples = map_chunks_to_icls(chunks, leaf_node) + kprintds(samples, extra_info="in function _knowledge_leaf_node_to_samples, printing samples") return samples @@ -455,7 +437,7 @@ def _skill_leaf_node_to_samples(leaf_node): samples[-1]["seed_question"] = leaf_node[i]["instruction"] samples[-1]["seed_response"] = leaf_node[i]["output"] - return samples + return Dataset.from_list(samples) def leaf_node_to_samples( @@ -468,3 +450,28 @@ def leaf_node_to_samples( leaf_node, server_ctx_size, chunk_word_count, document_output_dir, model_name ) return _skill_leaf_node_to_samples(leaf_node) + + +def kprintds(ds: Dataset, extra_info=None): + def truncate(example, max_str_length=35, max_list_length=None): + for key, value in example.items(): + if isinstance(value, str): + return {key: (value[:max_str_length] + '...') if len(value) > max_str_length else value} + elif isinstance(value, list): + return {key: [(v[:max_str_length] + '...') if len(v) > max_str_length else v for v in value]} + else: + return {key: value} + + # Get the first 5 rows and truncate long text fields + first_five_rows = ds.select(range(5)).map(truncate) + + # Print the truncated rows + s = f"THIS IS KHALED: {extra_info if extra_info is not None else ''}\n\n" + s += f"DATASET HAS COLUMNS {ds.column_names}\n\n" + s += f"COLUMNS HAVE DTYPES {ds.features}\n\n" + for row in first_five_rows: + for colname, entry in row.items(): + s += f"{colname} ({type(entry)}): {entry}\n\n" + s += "\n" + + return s \ No newline at end of file