Skip to content

Commit

Permalink
ubuntu hadoop
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Jun 1, 2024
1 parent e5872a3 commit 5e155b1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,6 @@ jobs:
export WITH_HADOOP="1"
export HADOOP_HOME="/usr/local/hadoop"
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
echo "=====JAVA HOME===="
echo $JAVA_HOME
echo $HADOOP_HOME
echo $CLASSPATH
which java
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,23 @@ def setup_hdfs():
from pyarrow import fs

hdfs = fs.HadoopFileSystem(host="localhost", port=8020)
if hdfs.get_file_info(TEST_DIR):
file = hdfs.get_file_info(TEST_DIR)
if file.type != fs.FileType.NotFound:
hdfs.delete_dir(TEST_DIR)
try:
yield hdfs
finally:
if hdfs.get_file_info(TEST_DIR):
file = hdfs.get_file_info(TEST_DIR)
if file.type != fs.FileType.NotFound:
hdfs.delete_dir(TEST_DIR)


@require_hadoop
def test_read_csv_execution(setup, setup_hdfs):
hdfs = setup_hdfs

with hdfs.open_output_stream(f"{TEST_DIR}/simple_test.csv") as file:
file.write(b"name,amount,id\nAlice,100,1\nBob,200,2")
with hdfs.open_output_stream(f"{TEST_DIR}/simple_test.csv") as f:
f.write(b"name,amount,id\nAlice,100,1\nBob,200,2")

df = md.read_csv(f"hdfs://localhost:8020{TEST_DIR}/simple_test.csv")
expected = pd.read_csv(BytesIO(b"name,amount,id\nAlice,100,1\nBob,200,2"))
Expand All @@ -74,7 +76,7 @@ def test_read_csv_execution(setup, setup_hdfs):
test_df[10:].to_csv(buf)
csv_content2 = buf.getvalue().encode()

with hdfs.open(f"{TEST_DIR}/chunk_test.csv", "wb", replication=1) as f:
with hdfs.open_output_stream(f"{TEST_DIR}/chunk_test.csv") as f:
f.write(csv_content)

df = md.read_csv(f"hdfs://localhost:8020{TEST_DIR}/chunk_test.csv", chunk_bytes=50)
Expand All @@ -85,10 +87,10 @@ def test_read_csv_execution(setup, setup_hdfs):
)

test_read_dir = f"{TEST_DIR}/test_read_csv_directory"
hdfs.mkdir(test_read_dir)
with hdfs.open(f"{test_read_dir}/part.csv", "wb", replication=1) as f:
hdfs.create_dir(test_read_dir)
with hdfs.open_output_stream(f"{test_read_dir}/part.csv") as f:
f.write(csv_content)
with hdfs.open(f"{test_read_dir}/part2.csv", "wb", replication=1) as f:
with hdfs.open_output_stream(f"{test_read_dir}/part2.csv") as f:
f.write(csv_content2)

df = md.read_csv(f"hdfs://localhost:8020{test_read_dir}", chunk_bytes=50)
Expand Down Expand Up @@ -120,7 +122,9 @@ def test_read_parquet_execution(setup, setup_hdfs):
}
)

with hdfs.open(f"{TEST_DIR}/test.parquet", "wb", replication=1) as f:
with hdfs.open_output_stream(f"{TEST_DIR}/test.parquet") as f:
f.write(b"name,amount,id\nAlice,100,1\nBob,200,2")
with hdfs.open_output_stream(f"{TEST_DIR}/test.parquet") as f:
test_df.to_parquet(f, row_group_size=3)

df = md.read_parquet(f"hdfs://localhost:8020{TEST_DIR}/test.parquet")
Expand All @@ -129,15 +133,11 @@ def test_read_parquet_execution(setup, setup_hdfs):
expected = test_df.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(res, expected)

hdfs.mkdir(f"{TEST_DIR}/test_partitioned")
hdfs.create_dir(f"{TEST_DIR}/test_partitioned")

with hdfs.open(
f"{TEST_DIR}/test_partitioned/file1.parquet", "wb", replication=1
) as f:
with hdfs.open_output_stream(f"{TEST_DIR}/test_partitioned/file1.parquet") as f:
test_df.to_parquet(f, row_group_size=3)
with hdfs.open(
f"{TEST_DIR}/test_partitioned/file2.parquet", "wb", replication=1
) as f:
with hdfs.open_output_stream(f"{TEST_DIR}/test_partitioned/file2.parquet") as f:
test_df2.to_parquet(f, row_group_size=3)

df = md.read_parquet(f"hdfs://localhost:8020{TEST_DIR}/test_partitioned")
Expand Down

0 comments on commit 5e155b1

Please sign in to comment.