From 887f09abac0858c9f45896e033d27c9561f505d2 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Tue, 12 Mar 2024 00:17:56 -0500 Subject: [PATCH 1/8] Add tests for mixing, no sampling. --- .../datapreprocess/ray/tokenize_shuffle.py | 2 +- tests/assets/test_sample.yaml | 13 +++++ tests/test_tokenize_shuffle.py | 53 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 tests/assets/test_sample.yaml diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index f0e350f2..b9d987d2 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -234,7 +234,7 @@ def _flush_buffer(self, folder, counter): tokens = [int(x) for x in self.buffer[i]["tokens"]] token_count += len(tokens) json_string = json.dumps(tokens) - uid = hashlib.md5(json_string.encode()).hexdigest() + uid = f"{tar_index_str}_{i:0{digits}}" sample = {"__key__": uid, "json.gz": json_string} sink.write(sample) bio.seek(0) diff --git a/tests/assets/test_sample.yaml b/tests/assets/test_sample.yaml new file mode 100644 index 00000000..5c09b650 --- /dev/null +++ b/tests/assets/test_sample.yaml @@ -0,0 +1,13 @@ +sources: + - source: "SOURCE_A" + markers: ["source_a"] + - source: "SOURCE_B" + markers: ["source_b"] + - source: "UNKNOWN" + markers: [] # No specific markers for UNKNOWN + +sampling_frequencies: + SOURCE_A: 2.0 + SOURCE_B: 0.5 + UNKNOWN: 0 + diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 97d73169..a3ee0dee 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -115,3 +115,56 @@ def test_tokenize_shuffle_local_read_local_write(): total += len(x["json.gz"]) assert total == NUM_TOKENS assert exit_value == 0 + + +def test_mixing_no_sampling(): + content_len = 2048 + docs_a = 1000 + docs_b = 500 + + # Tokens for gpt-neox tokenizer (default) + token_a = 247 + token_b = 270 + + # Store some fake sources in ./test_input + os.system("mkdir test_input") + os.system("mkdir test_input/source_a/") + os.system("mkdir test_input/source_b/") + os.system("mkdir test_output") + + with open("test_input/source_a/input.jsonl", "w") as f: + # This will create 2048 copies of the " a" string + data = {"text": " " + " ".join(["a" for _ in range(content_len)])} + json_string = json.dumps(data) + for _ in range(docs_a): + f.write(json_string) + f.write("\n") + + with open("test_input/source_b/input.jsonl", "w") as f: + data = {"text": " " + " ".join(["b" for _ in range(content_len)])} + json_string = json.dumps(data) + for _ in range(docs_b): + f.write(json_string) + f.write("\n") + + # run tokenize script + exit_value = os.system( + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input --content_key text --seqlen {content_len} --output ./test_output/" + ) + + tars = [os.path.join("test_output", fname) for fname in os.listdir("test_output") if fname.endswith(".tar")] + total_a = total_b = 0 + for tar in tars: + ds = wds.WebDataset(tar).decode() + for x in ds: + assert len(x["json.gz"]) == content_len + 1 + if x["json.gz"][0] == token_a: + total_a += len(x["json.gz"]) + elif x["json.gz"][0] == token_b: + total_b += len(x["json.gz"]) + else: + assert False, f"Unrecognized tokens {x['json.gz'][0]} - probably using a different tokenizer?" + + assert total_a == docs_a * (content_len + 1) + assert total_b == docs_b * (content_len + 1) + assert exit_value == 0 From ff2ed85b19e1013d61a732b9b62c9cbbd78463d6 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Tue, 12 Mar 2024 01:13:33 -0500 Subject: [PATCH 2/8] Add sampling test. --- tests/test_tokenize_shuffle.py | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index a3ee0dee..cd64e286 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -168,3 +168,64 @@ def test_mixing_no_sampling(): assert total_a == docs_a * (content_len + 1) assert total_b == docs_b * (content_len + 1) assert exit_value == 0 + + +def test_mixing_sampling(): + content_len = 2048 + docs_a = 10000 + docs_b = 10000 + + # Tokens for gpt-neox tokenizer (default) + token_a = 247 + token_b = 270 + + # Store some fake sources in ./test_input + os.system("mkdir test_input") + os.system("mkdir test_input/source_a/") + os.system("mkdir test_input/source_b/") + os.system("mkdir test_output") + + with open("test_input/source_a/input.jsonl", "w") as f: + # This will create 2048 copies of the " a" string + data = {"text": " " + " ".join(["a" for _ in range(content_len)])} + json_string = json.dumps(data) + for _ in range(docs_a): + f.write(json_string) + f.write("\n") + + with open("test_input/source_b/input.jsonl", "w") as f: + data = {"text": " " + " ".join(["b" for _ in range(content_len)])} + json_string = json.dumps(data) + for _ in range(docs_b): + f.write(json_string) + f.write("\n") + + # run tokenize script + exit_value = os.system( + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input --content_key text --seqlen {content_len} --output ./test_output/ --do_sample --default_dataset_yaml ./tests/assets/test_sample.yaml" + ) + assert exit_value == 0 + + tars = [os.path.join("test_output", fname) for fname in os.listdir("test_output") if fname.endswith(".tar")] + total_a = total_b = 0 + for tar in tars: + ds = wds.WebDataset(tar).decode() + for x in ds: + assert len(x["json.gz"]) == content_len + 1 + if x["json.gz"][0] == token_a: + total_a += len(x["json.gz"]) + elif x["json.gz"][0] == token_b: + total_b += len(x["json.gz"]) + else: + assert False, f"Unrecognized tokens {x['json.gz'][0]} - probably using a different tokenizer?" + + # Sampling for source a should be 2.0, so it should be exactly 2 + assert total_a == 2 * docs_a * (content_len + 1) + + # Source b is sampled with probability 0.5, so the number of documents from source b follows Bin(10000, 0.5). + # Via (multiplicative) Chernoff bounds, for margin delta the error probability is 2 * exp(-delta**2 * mu / 3) + # In this case for error probability <= 1e-4, we need delta * mu = sqrt(-3 * ln(0.5e-10) / mu) * mu ~= 386 + # TODO (gsmyrnis): I think you can get a better bound here. + mixing_error = 386 + assert total_b <= (0.5 * docs_b + mixing_error) * (content_len + 1) + assert total_b >= (0.5 * docs_b - mixing_error) * (content_len + 1) From a638f2ab33189207480eae914111becdabe336dd Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Tue, 12 Mar 2024 22:48:52 -0500 Subject: [PATCH 3/8] Fix small bug in tokenization + version bump. --- .../datapreprocess/ray/tokenize_shuffle.py | 18 ++- tests/test_tokenize_shuffle.py | 116 ++++++++++-------- 2 files changed, 82 insertions(+), 52 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index b9d987d2..aeac1de8 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -304,9 +304,21 @@ def preprocess( buffer = buffer[idx:] if len(buffer) > 0: - if source_counter is not None: - ray.get(source_counter.increment_token_count.remote(len(buffer))) - yield buffer + [PAD] * (seqlen - len(buffer)) + if do_sample: + local_sample_freq = sample_freq + while local_sample_freq > 1: + if source_counter is not None: + ray.get(source_counter.increment_token_count.remote(len(buffer))) + yield buffer + [PAD] * (seqlen - len(buffer)) + local_sample_freq -= 1 + if rng.random() < local_sample_freq: + if source_counter is not None: + ray.get(source_counter.increment_token_count.remote(len(buffer))) + yield buffer + [PAD] * (seqlen - len(buffer)) + else: + if source_counter is not None: + ray.get(source_counter.increment_token_count.remote(len(buffer))) + yield buffer + [PAD] * (seqlen - len(buffer)) except (IncompleteReadError, ReadTimeoutError, ResponseStreamingError) as e: logger.error(f"There was an incomplete read error: {e} for key {key}") diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index cd64e286..0bbe4fc1 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -2,6 +2,7 @@ import os import pytest import webdataset as wds +import numpy as np @pytest.fixture(autouse=True) @@ -117,35 +118,51 @@ def test_tokenize_shuffle_local_read_local_write(): assert exit_value == 0 -def test_mixing_no_sampling(): +@pytest.mark.parametrize("num_sources", [2, 3]) +@pytest.mark.parametrize("generation_length", [1024, 2048, 2500]) +def test_mixing_no_sampling(num_sources, generation_length): content_len = 2048 docs_a = 1000 docs_b = 500 + docs_c = 2000 # Tokens for gpt-neox tokenizer (default) token_a = 247 token_b = 270 + token_c = 260 # Store some fake sources in ./test_input os.system("mkdir test_input") os.system("mkdir test_input/source_a/") os.system("mkdir test_input/source_b/") + os.system("mkdir test_input/source_c/") os.system("mkdir test_output") - with open("test_input/source_a/input.jsonl", "w") as f: - # This will create 2048 copies of the " a" string - data = {"text": " " + " ".join(["a" for _ in range(content_len)])} - json_string = json.dumps(data) - for _ in range(docs_a): - f.write(json_string) - f.write("\n") - - with open("test_input/source_b/input.jsonl", "w") as f: - data = {"text": " " + " ".join(["b" for _ in range(content_len)])} - json_string = json.dumps(data) - for _ in range(docs_b): - f.write(json_string) - f.write("\n") + for i in range(docs_a // 100): + with open(f"test_input/source_a/input_{i:08d}.jsonl", "w") as f: + # This will create 2048 copies of the " a" string + data = {"text": " " + " ".join(["a" for _ in range(generation_length)])} + json_string = json.dumps(data) + for _ in range(100): + f.write(json_string) + f.write("\n") + + for i in range(docs_b // 100): + with open(f"test_input/source_b/input_{i:08d}.jsonl", "w") as f: + data = {"text": " " + " ".join(["b" for _ in range(generation_length)])} + json_string = json.dumps(data) + for _ in range(100): + f.write(json_string) + f.write("\n") + + if num_sources == 3: + for i in range(docs_c // 100): + with open(f"test_input/source_c/input_{i:08d}.jsonl", "w") as f: + data = {"text": " " + " ".join(["c" for _ in range(generation_length)])} + json_string = json.dumps(data) + for _ in range(100): + f.write(json_string) + f.write("\n") # run tokenize script exit_value = os.system( @@ -153,24 +170,26 @@ def test_mixing_no_sampling(): ) tars = [os.path.join("test_output", fname) for fname in os.listdir("test_output") if fname.endswith(".tar")] - total_a = total_b = 0 + total_a = total_b = total_c = 0 for tar in tars: ds = wds.WebDataset(tar).decode() for x in ds: assert len(x["json.gz"]) == content_len + 1 - if x["json.gz"][0] == token_a: - total_a += len(x["json.gz"]) - elif x["json.gz"][0] == token_b: - total_b += len(x["json.gz"]) - else: - assert False, f"Unrecognized tokens {x['json.gz'][0]} - probably using a different tokenizer?" - - assert total_a == docs_a * (content_len + 1) - assert total_b == docs_b * (content_len + 1) + tokens = np.array(x["json.gz"]) + total_a += np.sum(tokens == token_a) + total_b += np.sum(tokens == token_b) + total_c += np.sum(tokens == token_c) + + assert total_a == docs_a * generation_length + assert total_b == docs_b * generation_length + if num_sources == 3: + assert total_c == docs_c * generation_length + assert exit_value == 0 -def test_mixing_sampling(): +@pytest.mark.parametrize("generation_length", [1024, 2048, 2500]) +def test_mixing_sampling(generation_length): content_len = 2048 docs_a = 10000 docs_b = 10000 @@ -185,20 +204,22 @@ def test_mixing_sampling(): os.system("mkdir test_input/source_b/") os.system("mkdir test_output") - with open("test_input/source_a/input.jsonl", "w") as f: - # This will create 2048 copies of the " a" string - data = {"text": " " + " ".join(["a" for _ in range(content_len)])} - json_string = json.dumps(data) - for _ in range(docs_a): - f.write(json_string) - f.write("\n") - - with open("test_input/source_b/input.jsonl", "w") as f: - data = {"text": " " + " ".join(["b" for _ in range(content_len)])} - json_string = json.dumps(data) - for _ in range(docs_b): - f.write(json_string) - f.write("\n") + for i in range(docs_a // 100): + with open(f"test_input/source_a/input_{i:08d}.jsonl", "w") as f: + # This will create 2048 copies of the " a" string + data = {"text": " " + " ".join(["a" for _ in range(generation_length)])} + json_string = json.dumps(data) + for _ in range(100): + f.write(json_string) + f.write("\n") + + for i in range(docs_b // 100): + with open(f"test_input/source_b/input_{i:08d}.jsonl", "w") as f: + data = {"text": " " + " ".join(["b" for _ in range(generation_length)])} + json_string = json.dumps(data) + for _ in range(100): + f.write(json_string) + f.write("\n") # run tokenize script exit_value = os.system( @@ -212,20 +233,17 @@ def test_mixing_sampling(): ds = wds.WebDataset(tar).decode() for x in ds: assert len(x["json.gz"]) == content_len + 1 - if x["json.gz"][0] == token_a: - total_a += len(x["json.gz"]) - elif x["json.gz"][0] == token_b: - total_b += len(x["json.gz"]) - else: - assert False, f"Unrecognized tokens {x['json.gz'][0]} - probably using a different tokenizer?" + tokens = np.array(x["json.gz"]) + total_a += np.sum(tokens == token_a) + total_b += np.sum(tokens == token_b) # Sampling for source a should be 2.0, so it should be exactly 2 - assert total_a == 2 * docs_a * (content_len + 1) + assert total_a == 2 * docs_a * generation_length # Source b is sampled with probability 0.5, so the number of documents from source b follows Bin(10000, 0.5). # Via (multiplicative) Chernoff bounds, for margin delta the error probability is 2 * exp(-delta**2 * mu / 3) # In this case for error probability <= 1e-4, we need delta * mu = sqrt(-3 * ln(0.5e-10) / mu) * mu ~= 386 # TODO (gsmyrnis): I think you can get a better bound here. mixing_error = 386 - assert total_b <= (0.5 * docs_b + mixing_error) * (content_len + 1) - assert total_b >= (0.5 * docs_b - mixing_error) * (content_len + 1) + assert total_b <= (0.5 * docs_b + mixing_error) * generation_length + assert total_b >= (0.5 * docs_b - mixing_error) * generation_length From 624f8ee05f1213bf3c3a8b8482f575f555d1228e Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Tue, 12 Mar 2024 23:29:59 -0500 Subject: [PATCH 4/8] Make markers case insensitive. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index aeac1de8..3959da8e 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -68,7 +68,7 @@ def load_from_yaml(filename): # Add get_source and get_sampling_frequency methods to Sources def get_source_dynamic(self, key): for item in data["sources"]: - if any(marker in key for marker in item["markers"]): + if any(marker in key.lower() for marker in item["markers"]): return Sources[item["source"]] return Sources.UNKNOWN From 4913019f47ecf11ff60af5d28bdebbc27527aa8b Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Tue, 12 Mar 2024 23:45:37 -0500 Subject: [PATCH 5/8] Fix some old tests. --- tests/test_tokenize_shuffle.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 0bbe4fc1..fc9648a9 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -25,7 +25,7 @@ def test_tokenize_shuffle_simple(): for x in ds: assert len(x["json.gz"]) == content_len + 1 total += len(x["json.gz"]) - # assert total == NUM_TOKENS + assert total == NUM_TOKENS with open("test_output/manifest.jsonl", "rb") as f: out = f.read() @@ -57,7 +57,7 @@ def test_tokenize_shuffle_tar(content_key, NUM_TOKENS): def test_tokenize_shuffle_simple_do_sample(): content_len = 2048 - NUM_TOKENS = 32784 + NUM_TOKENS = 86058 exit_value = os.system( f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input s3://dcnlp-west-test/tokenize_shuffle_test/C4_V3_tiny/ --content_key content --output test_output/ --seqlen {content_len} --do_sample" ) @@ -67,7 +67,12 @@ def test_tokenize_shuffle_simple_do_sample(): for x in ds: assert len(x["json.gz"]) == content_len + 1 total += len(x["json.gz"]) - assert total == NUM_TOKENS + + # The sampling prob is 1.037142857 for the C4 source. This means that we will see all tokens at least once. For + # error at most 1e-4, we will need an error of 13950 tokens (by Chernoff bounds). + # TODO(gsmyrnis): Improve this. + assert total <= 1.037142857 * NUM_TOKENS + 13950 + assert total >= 1.037142857 * NUM_TOKENS - 13950 @pytest.mark.s3 @@ -242,7 +247,7 @@ def test_mixing_sampling(generation_length): # Source b is sampled with probability 0.5, so the number of documents from source b follows Bin(10000, 0.5). # Via (multiplicative) Chernoff bounds, for margin delta the error probability is 2 * exp(-delta**2 * mu / 3) - # In this case for error probability <= 1e-4, we need delta * mu = sqrt(-3 * ln(0.5e-10) / mu) * mu ~= 386 + # In this case for error probability <= 1e-4, we need delta * mu = sqrt(-3 * ln(0.5e-4) / mu) * mu ~= 386 # TODO (gsmyrnis): I think you can get a better bound here. mixing_error = 386 assert total_b <= (0.5 * docs_b + mixing_error) * generation_length From 490c842e36affe46ba41089892fd62463f4ab4b3 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 15 Mar 2024 03:36:51 +0000 Subject: [PATCH 6/8] Update lowercase. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 3959da8e..79bdea79 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -68,7 +68,7 @@ def load_from_yaml(filename): # Add get_source and get_sampling_frequency methods to Sources def get_source_dynamic(self, key): for item in data["sources"]: - if any(marker in key.lower() for marker in item["markers"]): + if any(marker.lower() in key.lower() for marker in item["markers"]): return Sources[item["source"]] return Sources.UNKNOWN From a4758cb707f54f36accafe6c6dae4e63c10172cd Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Mon, 20 May 2024 23:02:48 +0200 Subject: [PATCH 7/8] Revert naming change. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 79bdea79..86a3c959 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -234,7 +234,7 @@ def _flush_buffer(self, folder, counter): tokens = [int(x) for x in self.buffer[i]["tokens"]] token_count += len(tokens) json_string = json.dumps(tokens) - uid = f"{tar_index_str}_{i:0{digits}}" + uid = hashlib.md5(json_string.encode()).hexdigest() sample = {"__key__": uid, "json.gz": json_string} sink.write(sample) bio.seek(0) From 15a8c717ff3fa249e223e6f4e69539a61a0aa76d Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Wed, 22 May 2024 07:02:31 +0200 Subject: [PATCH 8/8] One more update on naming. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 86a3c959..1783c89e 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -234,7 +234,7 @@ def _flush_buffer(self, folder, counter): tokens = [int(x) for x in self.buffer[i]["tokens"]] token_count += len(tokens) json_string = json.dumps(tokens) - uid = hashlib.md5(json_string.encode()).hexdigest() + uid = f"{hashlib.md5(json_string.encode()).hexdigest()}_{tar_index:{digits}}_{i:0{digits}}" sample = {"__key__": uid, "json.gz": json_string} sink.write(sample) bio.seek(0)