Skip to content

Commit

Permalink
Fully export ICL mapping and Dataset construction to taxonomy.py
Browse files Browse the repository at this point in the history
chunkers now return lists of chunks

Signed-off-by: Khaled Sulayman <[email protected]>
  • Loading branch information
khaledsulayman committed Oct 24, 2024
1 parent 95d134a commit fed4d47
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 62 deletions.
16 changes: 8 additions & 8 deletions src/instructlab/sdg/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from instructlab.sdg.utils.taxonomy import (
leaf_node_to_samples,
read_taxonomy_leaf_nodes,
kprintds,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -299,7 +300,6 @@ def generate_data(
"freeform_skills.yaml", and "grounded_skills.yaml".
"""
generate_start = time.time()
print(f"THIS IS KHALED: {model_name=}")

# FIXME: remove this when ilab knows to pass batch_size=0 with llama.cpp
if batch_size is None:
Expand Down Expand Up @@ -373,23 +373,23 @@ def generate_data(

if not samples:
raise GenerateException("Error: No samples found in leaf node.")

Check warning on line 376 in src/instructlab/sdg/generate_data.py

View workflow job for this annotation

GitHub Actions / pylint

C0303: Trailing whitespace (trailing-whitespace)
kprintds(samples, extra_info="in function generate_data, printing samples")

if samples[0].get("document"):
if "document" in samples.column_names:
pipe = knowledge_pipe
is_knowledge = True

elif samples[0].get("seed_context"):
elif "seed_context" in samples.column_names:
pipe = grounded_skills_pipe

else:
pipe = freeform_skills_pipe

logger.debug("Samples: %s", samples)

# TODO will already be a dataset at this point so refactor as needed
ds = Dataset.from_list(samples)
logger.debug("Dataset: %s", ds)
new_generated_data = pipe.generate(ds, leaf_node_path)
print('THIS IS KHALED WE GENERATING NOW')
new_generated_data = pipe.generate(samples, leaf_node_path)
if len(new_generated_data) == 0:
raise EmptyDatasetError(
"Pipeline stopped: Empty dataset after running pipe"
Expand All @@ -407,7 +407,7 @@ def generate_data(
generate_eval_task_data(
mmlu_bench_pipe,
leaf_node_path,
ds,
samples,
output_dir,
date_suffix,
)
Expand Down
7 changes: 6 additions & 1 deletion src/instructlab/sdg/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# First Party
from instructlab.sdg.checkpointing import Checkpointer
from instructlab.sdg.utils import pandas
from instructlab.sdg.utils.taxonomy import kprintds

# Local
from . import filterblock, importblock, llmblock, utilblocks
Expand Down Expand Up @@ -145,9 +146,13 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset:
# Separate checkpoints with sub directories
checkpoint_dir = os.path.join(self.ctx.checkpoint_dir, checkpoint_name)

print("THIS IS KHALED IN PIPELINE GENERATE ABT TO CHECKPOINT")

checkpointer = Checkpointer(checkpoint_dir, self.ctx.save_freq)
dataset, pre_generated_data = checkpointer.load(dataset)

print("THIS IS KHALED IN PIPELINE GENERATE SUCCESSFULLY CHECKPOINTED(?)")

# If not batching, simply delegate to _generate_single
if not self.ctx.batching_enabled:
logger.info("Running pipeline single-threaded")
Expand Down Expand Up @@ -195,7 +200,7 @@ def _generate_single(self, dataset) -> Dataset:
drop_duplicates_cols = block_prop.get("drop_duplicates", False)
block = block_type(self.ctx, self, block_name, **block_config)
logger.info("Running block: %s", block_name)
logger.info(dataset)
logger.info(kprintds(dataset, extra_info=f"{block_name=}, {block_config=}"))

# Execute the block and wrap errors with the block name/type
dataset = block.generate(dataset)
Expand Down
5 changes: 4 additions & 1 deletion src/instructlab/sdg/utilblocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def generate(self, samples: Dataset) -> Dataset:
value_name=self.value_name,
var_name=self.var_name,
)
return pandas.dataset_from_pandas_dataframe(flatten_df)
ds = pandas.dataset_from_pandas_dataframe(flatten_df)
logger.info(f"THIS IS KHALED: {ds=}")
return ds


class DuplicateColumnsBlock(Block):
Expand Down Expand Up @@ -162,6 +164,7 @@ def __init__(self, ctx, pipe, block_name: str, columns_map: dict) -> None:

def generate(self, samples: Dataset):
samples = samples.rename_columns(self.columns_map)
logger.info(f"THIS IS KHALED: {samples=}")
return samples


Expand Down
19 changes: 6 additions & 13 deletions src/instructlab/sdg/utils/chunkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ def __init__(
output_dir: Path,
tokenizer_model_name=None,
):
print(f"THIS IS KHALED IN SEMANTIC CHUNKER: {leaf_node_path=}")
self.document_paths = document_paths
self.filepaths = filepaths
self.leaf_node_path = leaf_node_path
Expand All @@ -208,14 +207,11 @@ def chunk_documents(self) -> Dataset:
"""Semantically chunk PDF documents.
Returns:
Dataset: A Dataset object containing the chunked documents
List: a list of chunks from the documents
"""
if self.document_paths == []:
return []

print(f"""THIS IS KHALED: CHUNKING PDF DOCS
{self.document_paths[0]=}
""")
model_artifacts_path = DocumentConverter.download_models_hf()
converter = DocumentConverter(artifacts_path=model_artifacts_path)
inputs = DocumentConversionInput.from_paths(self.filepaths)
Expand All @@ -224,12 +220,8 @@ def chunk_documents(self) -> Dataset:
docling_artifacts_path = self.export_documents(parsed_documents)

docling_json_paths = list(docling_artifacts_path.glob("*.json"))
print(f"THIS IS KHALED: {docling_json_paths=}")
# TODO export to global function
chunk_datasets_lst = [self._process_parsed_docling_json(p) for p in docling_json_paths]
print(f"THIS IS KHALED: {chunk_datasets_lst=}")
chunks: Dataset = _safe_concatenate_datasets(chunk_datasets_lst)
print(f"THIS IS KHALED: {chunks=}")
chunks = [self._process_parsed_docling_json(p) for p in docling_json_paths]

return chunks

Expand Down Expand Up @@ -269,7 +261,6 @@ def _process_parsed_docling_json(self, json_fp: Path) -> Dataset:
List: a list of chunks built from the provided json file
"""
logger.info(f"Processing parsed docling json file: {json_fp}")
print(f"THIS IS KHALED: {json_fp=}")
with open(json_fp, "r", encoding="utf-8") as f:
data = json.load(f)

Expand All @@ -279,6 +270,9 @@ def _process_parsed_docling_json(self, json_fp: Path) -> Dataset:
max_token_per_chunk=500,
tokenizer=self.tokenizer,
)
return self.fuse_texts(chunks, 200)

# TODO remove
chunks = self.fuse_texts(chunks, 200)
return Dataset.from_dict(
{
Expand Down Expand Up @@ -552,10 +546,8 @@ def export_documents(self, converted_docs: Iterable[ConvertedDocument]):
Returns:
Path: path to directory with docling json artifacts
"""
print(f"THIS IS KHALED IN EXPORT DOCUMENTS: {self.output_dir=}")
docling_artifacts_path = self.output_dir / "docling-artifacts"
docling_artifacts_path.mkdir(parents=True, exist_ok=True)
print(f"THIS IS KHALED IN EXPORT DOCUMENTS: {docling_artifacts_path=}")

success_count = 0
failure_count = 0
Expand Down Expand Up @@ -583,6 +575,7 @@ def export_documents(self, converted_docs: Iterable[ConvertedDocument]):
return docling_artifacts_path


# TODO move this somewhere
def _safe_concatenate_datasets(datasets: List[Dataset]) -> Dataset | None:
"""
Concatenate datasets safely, ignoring any datasets that are None or empty.
Expand Down
85 changes: 46 additions & 39 deletions src/instructlab/sdg/utils/taxonomy.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,47 +378,32 @@ def read_taxonomy_leaf_nodes(taxonomy, taxonomy_base, yaml_rules, document_outpu

return leaf_nodes

def icl_mapping(chunked_dataset):
samples = []
for record in chunked_dataset:
sample = {
"icl_document": record.get("icl_document", ""),
"document": record.get("document", ""),
"document_outline": record.get("document_outline", ""),
"domain": record.get("domain", ""),
"icl_query_1": record.get("icl_query_1", ""),
"icl_response_1": record.get("icl_response_1", ""),
"icl_query_2": record.get("icl_query_2", ""),
"icl_response_2": record.get("icl_response_2", ""),
"icl_query_3": record.get("icl_query_3", ""),
"icl_response_3": record.get("icl_response_3", ""),
}
samples.append(sample)
return samples


def map_chunk_to_icls(chunk, leaf_node):
def map_chunks_to_icls(chunks: List, leaf_node: Dict) -> Dataset:
chunked_dataset = []

# domain is the same for the whole leaf node
for icl_ in leaf_node:
qna_pairs = icl_.get("questions_and_answers", [])
for i, qna in enumerate(qna_pairs):
# TODO GET THESE FROM CHUNK DATASETS
# or just append icls to end of each entry
# TODO figure out if this is the right way to access a
# dataset object
domain = leaf_node[0].get("domain")
for chunk in chunks:
for icl_ in leaf_node:
record = {
"document": chunk,
"icl_document": icl_.get("context", ""),
"document": chunk.get("document", []),
"document_outline": chunk.get("document_outline", ""),
"domain": chunk.get("domain", ""),
f"icl_query_{i+1}": qna.get("question", ""),
f"icl_response_{i+1}": qna.get("answer", ""),
"document_outline": icl_.get("document_outline", ""),
"domain": domain,
}
print(f"THIS IS KHALED IN map_chunk_to_icls: {record=}")

qna_pairs = icl_.get("questions_and_answers", [])
for i, qna in enumerate(qna_pairs):
record.update({
f"icl_query_{i+1}": qna.get("question", ""),
f"icl_response_{i+1}": qna.get("answer", ""),
})

chunked_dataset.append(record)
return chunked_dataset
print(f"THIS IS KHALED IN map_chunk_to_icls: {record=}")

return Dataset.from_list(chunked_dataset)


def _knowledge_leaf_node_to_samples(
Expand All @@ -434,11 +419,8 @@ def _knowledge_leaf_node_to_samples(
chunks = chunker.chunk_documents()

# TODO find a native datasets way of doing this
samples = []
for chunk in chunks:
chunk_icl_mapping = map_chunk_to_icls(chunk, leaf_node)
print(f"THIS IS KHALED: {chunk_icl_mapping=}")
samples.extend(chunk_icl_mapping)
samples = map_chunks_to_icls(chunks, leaf_node)
kprintds(samples, extra_info="in function _knowledge_leaf_node_to_samples, printing samples")

return samples

Expand All @@ -455,7 +437,7 @@ def _skill_leaf_node_to_samples(leaf_node):
samples[-1]["seed_question"] = leaf_node[i]["instruction"]
samples[-1]["seed_response"] = leaf_node[i]["output"]

return samples
return Dataset.from_list(samples)


def leaf_node_to_samples(
Expand All @@ -468,3 +450,28 @@ def leaf_node_to_samples(
leaf_node, server_ctx_size, chunk_word_count, document_output_dir, model_name
)
return _skill_leaf_node_to_samples(leaf_node)


def kprintds(ds: Dataset, extra_info=None):
def truncate(example, max_str_length=35, max_list_length=None):
for key, value in example.items():
if isinstance(value, str):

Check warning on line 458 in src/instructlab/sdg/utils/taxonomy.py

View workflow job for this annotation

GitHub Actions / pylint

R1705: Unnecessary "elif" after "return", remove the leading "el" from "elif" (no-else-return)
return {key: (value[:max_str_length] + '...') if len(value) > max_str_length else value}
elif isinstance(value, list):
return {key: [(v[:max_str_length] + '...') if len(v) > max_str_length else v for v in value]}
else:
return {key: value}

# Get the first 5 rows and truncate long text fields
first_five_rows = ds.select(range(5)).map(truncate)

# Print the truncated rows
s = f"THIS IS KHALED: {extra_info if extra_info is not None else ''}\n\n"
s += f"DATASET HAS COLUMNS {ds.column_names}\n\n"
s += f"COLUMNS HAVE DTYPES {ds.features}\n\n"
for row in first_five_rows:
for colname, entry in row.items():
s += f"{colname} ({type(entry)}): {entry}\n\n"
s += "\n"

return s

Check warning on line 477 in src/instructlab/sdg/utils/taxonomy.py

View workflow job for this annotation

GitHub Actions / pylint

C0304: Final newline missing (missing-final-newline)

0 comments on commit fed4d47

Please sign in to comment.