Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
luigiba committed Sep 6, 2019
1 parent 154d225 commit 29bf2db
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 156 deletions.
10 changes: 10 additions & 0 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, cpp_lib_path=None, init_new_entities=False):
if init_new_entities == False:
#C library
if cpp_lib_path == None:
cpp_lib_path = '/home/luigi/IdeaProjects/OpenKEonSpark/release/Base.so'
cpp_lib_path = './release/Base.so'
base_file = os.path.abspath(cpp_lib_path)
self.lib = ctypes.cdll.LoadLibrary(base_file)
self.lib.sampling.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int64, ctypes.c_int64, ctypes.c_int64]
Expand Down
Binary file modified __pycache__/Config.cpython-36.pyc
Binary file not shown.
Binary file modified __pycache__/distribute_training.cpython-36.pyc
Binary file not shown.
11 changes: 7 additions & 4 deletions base/Base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ struct Parameter {
INT negRelRate;
};


/*
* feed batch with training triples / corrupted triples
*/
void* getBatch(void* con) {
Parameter *para = (Parameter *)(con);
INT id = para -> id;
Expand All @@ -94,7 +96,7 @@ void* getBatch(void* con) {
INT i;

/**
* select batch triple / train triple
* select new batch triple / train triple
**/
if (newBatchTotal > 0){
i = rand_max_range(id, trainTotal_ - newBatchTotal, trainTotal_);
Expand Down Expand Up @@ -137,12 +139,13 @@ void* getBatch(void* con) {
}
}


pthread_exit(NULL);
}



/*
* Sample a batch to use during training
*/
extern "C"
void sampling(INT *batch_h, INT *batch_t, INT *batch_r, REAL *batch_y, INT batchSize, INT negRate = 1, INT negRelRate = 0) {
pthread_t *pt = (pthread_t *)malloc(workThreads * sizeof(pthread_t));
Expand Down
12 changes: 5 additions & 7 deletions base/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ Triple *trainHead;
Triple *trainTail;
Triple *trainRel;


//EDIT
Triple *trainList_no; //data structure containing training triples not ordered


Expand All @@ -30,7 +28,7 @@ void importTrainFiles() {

printf("The toolkit is importing datasets.\n");
FILE *fin;
FILE *fin_; //EDIT
FILE *fin_;
int tmp;

//read relations
Expand Down Expand Up @@ -58,7 +56,7 @@ void importTrainFiles() {
/**
* EDIT
* check if new batch file exists
* if it exists get only new batch size
* if it exists get only new batch size (another script has already incorporeted the new triples into train2id.txt)
**/
fin_ = fopen((inPath + "batch2id.txt").c_str(), "r");
if (fin_ != nullptr){
Expand All @@ -76,7 +74,7 @@ void importTrainFiles() {
return;
}
tmp = fscanf(fin, "%ld", &trainTotal);
trainTotal_ = trainTotal; //traingTotal_ contains # training triples counting duplicates, too
trainTotal_ = trainTotal; //traingTotal_ contains # training triples counting duplicates too
printf("The total of train triples is %ld.\n", trainTotal);


Expand All @@ -87,15 +85,15 @@ void importTrainFiles() {
trainRel = (Triple *)calloc(trainTotal, sizeof(Triple));
freqRel = (INT *)calloc(relationTotal, sizeof(INT));
freqEnt = (INT *)calloc(entityTotal, sizeof(INT));
//EDIT
//not ordered
trainList_no = (Triple *)calloc(trainTotal, sizeof(Triple)); //not ordered

for (INT i = 0; i < trainTotal; i++) {
tmp = fscanf(fin, "%ld", &trainList[i].h);
tmp = fscanf(fin, "%ld", &trainList[i].t);
tmp = fscanf(fin, "%ld", &trainList[i].r);

//EDIT

trainList_no[i].h = trainList[i].h;
trainList_no[i].t = trainList[i].t;
trainList_no[i].r = trainList[i].r;
Expand Down
3 changes: 2 additions & 1 deletion base/Setting.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ void setBern(INT con) {
/*
============================================================
*/
REAL interval = 0.01;

REAL interval = 0.01; //used to compute thresholds during triple classification task


#endif
10 changes: 4 additions & 6 deletions commands.txt → colab/commands.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#commands to run OpenKEonSpark on Google Colaboratory platform
#before running, create a folder in your Google Drive named DBpedia
#uplod in the folder the zip archive containing the dataset given as output from generate.py

#install
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
Expand All @@ -15,12 +17,10 @@ drive.mount('/content/drive')

#clone project
!git clone https://github.com/luigiba/OpenKEonSpark.git
!mv OpenKEonSpark/gpu_info.py /usr/local/lib/python3.6/dist-packages/tensorflowonspark/gpu_info.py
!mv OpenKEonSpark/colab/gpu_info.py /usr/local/lib/python3.6/dist-packages/tensorflowonspark/gpu_info.py

#unzip dataset
!unzip /content/drive/My\ Drive/DBpedia/5.zip -d /content/drive/My\ Drive/DBpedia/
!unzip /content/drive/My\ Drive/DBpedia/10.zip -d /content/drive/My\ Drive/DBpedia/
!unzip /content/drive/My\ Drive/DBpedia/15.zip -d /content/drive/My\ Drive/DBpedia/

#set env vars
import os
Expand All @@ -35,9 +35,7 @@ os.environ["WORK_DIR_PREFIX"] = "/content/OpenKEonSpark"
os.environ["SPARK_HOME"] = "/content/spark-2.1.1-bin-hadoop2.7"

#execute
!bash $WORK_DIR_PREFIX/run_dbpedia.sh 5 64 "TransE" 0.0001
!bash $WORK_DIR_PREFIX/run_dbpedia.sh 10 64 "TransE" 0.0001
!bash $WORK_DIR_PREFIX/run_dbpedia.sh 15 64 "TransE" 0.0001
!bash $WORK_DIR_PREFIX/colab/run_dbpedia.sh 10 64 "TransE" 0.00001



Expand Down
File renamed without changes.
125 changes: 125 additions & 0 deletions colab/run_dbpedia.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#this is an example of script that can be used on google colab to train the embedding and evaluate them
# the link prediction evaluation will be performed only for the last batch
# for the other batch it will be performed only triple classification evaluation
#before starting the script:
# run split.py to set the dataset in a properly format
# set environment variables, e.g.:
# $JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
# $SPARK_WORKER_INSTANCES = '3'
# $PYSPARK_PYTHON = '/usr/bin/python3'
# $CUDA_VISIBLE_DEVICES = "0"
# $CORES_PER_WORKER = "1"
# $MEMORY_PER_WORKER = "4g"
# $LIB_CUDA = "/usr/local/cuda-10.0/lib64"
# $WORK_DIR_PREFIX = "/content/OpenKEonSpark"
# $SPARK_HOME = "/content/spark-2.1.1-bin-hadoop2.7"


echo "====================================== Parameters ======================================"
echo "Number of batches: $1"
echo "Embedding dimensionality: $2"
echo "Model to use: $3"
echo "Learning rate: $4"

echo "====================================== Clearning res_spark directory ======================================"
rm $WORK_DIR_PREFIX/res_spark/*

echo "====================================== Stopping Spark Master & slaves ======================================"
$SPARK_HOME/sbin/stop-slave.sh
$SPARK_HOME/sbin/stop-master.sh


echo "====================================== Starting Spark Master & slaves ======================================"
$SPARK_HOME/sbin/start-master.sh; $SPARK_HOME/sbin/start-slave.sh -c $CORES_PER_WORKER -m $MEMORY_PER_WORKER spark://$(hostname):7077
n=$1
m=$((n-1))

#iterate over batches
for i in `seq 0 $m`
do
#restore link prediction evaluation if evaluation checkpoints have been founded
if [ -f /content/drive/My\ Drive/DBpedia/$n/$i/model/thread0 ]; then
echo "====================================== Test for batch $i ======================================"
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 \
--py-files $WORK_DIR_PREFIX/distribute_training.py,$WORK_DIR_PREFIX/Config.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransE.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransH.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransR.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransD.py \
--driver-library-path=$LIB_CUDA --conf spark.dynamicAllocation.enabled=false --conf spark.task.cpus=$CORES_PER_WORKER --executor-memory $MEMORY_PER_WORKER --num-executors $SPARK_WORKER_INSTANCES \
$WORK_DIR_PREFIX/main_spark.py \
--cluster_size $SPARK_WORKER_INSTANCES --num_ps 1 --num_gpus 1 --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--input_path /content/drive/My\ Drive/DBpedia/$n/$i/ --output_path $WORK_DIR_PREFIX/res_spark --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--embedding_dimension $2 --model $3 --mode evaluation | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt

continue
fi


if [ -f /content/drive/My\ Drive/DBpedia/$n/$i/res.txt ]; then
echo "Batch $i already done; Skipping batch $i"
continue
fi


if [ -f /content/drive/My\ Drive/DBpedia/$n/$i/model/checkpoint ]; then
echo "====================================== Test for batch $i ======================================"
if [ $i -eq $m ]; then
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 \
--py-files $WORK_DIR_PREFIX/distribute_training.py,$WORK_DIR_PREFIX/Config.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransE.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransH.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransR.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransD.py \
--driver-library-path=$LIB_CUDA --conf spark.dynamicAllocation.enabled=false --conf spark.task.cpus=$CORES_PER_WORKER --executor-memory $MEMORY_PER_WORKER --num-executors $SPARK_WORKER_INSTANCES \
$WORK_DIR_PREFIX/main_spark.py \
--cluster_size $SPARK_WORKER_INSTANCES --num_ps 1 --num_gpus 1 --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--input_path /content/drive/My\ Drive/DBpedia/$n/$i/ --output_path $WORK_DIR_PREFIX/res_spark --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--embedding_dimension $2 --model $3 --mode evaluation | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt
else
python3 $WORK_DIR_PREFIX/test.py /content/drive/My\ Drive/DBpedia/$n/$i/ /content/drive/My\ Drive/DBpedia/$n/$i/model/ $WORK_DIR_PREFIX/release/Base.so $2 $3 | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt
fi
fi


if [ $i != 0 ]; then
k=$((i-1))
cd $WORK_DIR_PREFIX/res_spark
if [ "$(ls -1A | wc -l)" -eq 0 ] ; then
echo "Copying model into res_spark dir"
cp /content/drive/My\ Drive/DBpedia/$n/$k/model/* $WORK_DIR_PREFIX/res_spark/
fi

cd /content/drive/My\ Drive/DBpedia/$n/$i
if [ "$(ls -1A | wc -l)" -le 10 ] ; then
echo "Copying data into new batch dir"
cp /content/drive/My\ Drive/DBpedia/$n/$k/entity2id.txt /content/drive/My\ Drive/DBpedia/$n/$k/relation2id.txt /content/drive/My\ Drive/DBpedia/$n/$k/test2id.txt /content/drive/My\ Drive/DBpedia/$n/$k/valid2id.txt /content/drive/My\ Drive/DBpedia/$n/$k/train2id.txt /content/drive/My\ Drive/DBpedia/$n/$i/
fi

cd /content
fi


echo "====================================== Starting Training for batch $i ======================================"
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 \
--py-files $WORK_DIR_PREFIX/distribute_training.py,$WORK_DIR_PREFIX/Config.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransE.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransH.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransR.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransD.py \
--driver-library-path=$LIB_CUDA --conf spark.dynamicAllocation.enabled=false --conf spark.task.cpus=$CORES_PER_WORKER --executor-memory $MEMORY_PER_WORKER \
--num-executors $SPARK_WORKER_INSTANCES \
$WORK_DIR_PREFIX/main_spark.py \
--cluster_size $SPARK_WORKER_INSTANCES --num_ps 1 --num_gpus 1 --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--input_path /content/drive/My\ Drive/DBpedia/$n/$i/ \
--output_path $WORK_DIR_PREFIX/res_spark \
--alpha $4 --optimizer SGD --train_times 50 --ent_neg_rate 1 --embedding_dimension $2 --margin 1.0 --model $3


echo "====================================== Copying model for batch $i ======================================"
cp $WORK_DIR_PREFIX/res_spark/* /content/drive/My\ Drive/DBpedia/$n/$i/model/


echo "====================================== Test for batch $i ======================================"
if [ $i -eq $m ]; then
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 \
--py-files $WORK_DIR_PREFIX/distribute_training.py,$WORK_DIR_PREFIX/Config.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransE.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransH.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransR.py,$WORK_DIR_PREFIX/Model.py,$WORK_DIR_PREFIX/TransD.py \
--driver-library-path=$LIB_CUDA --conf spark.dynamicAllocation.enabled=false --conf spark.task.cpus=$CORES_PER_WORKER --executor-memory $MEMORY_PER_WORKER --num-executors $SPARK_WORKER_INSTANCES \
$WORK_DIR_PREFIX/main_spark.py \
--cluster_size $SPARK_WORKER_INSTANCES --num_ps 1 --num_gpus 1 --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--input_path /content/drive/My\ Drive/DBpedia/$n/$i/ --output_path $WORK_DIR_PREFIX/res_spark --cpp_lib_path $WORK_DIR_PREFIX/release/Base.so \
--embedding_dimension $2 --model $3 --mode evaluation | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt
else
python3 $WORK_DIR_PREFIX/test.py /content/drive/My\ Drive/DBpedia/$n/$i/ /content/drive/My\ Drive/DBpedia/$n/$i/model/ $WORK_DIR_PREFIX/release/Base.so $2 $3 | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt
fi

done

9 changes: 4 additions & 5 deletions distribute_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ def get_conf(argv=None):
else: con.set_test_link_prediction(True)

con.set_train_times(argv.train_times)
con.set_nbatches(argv.n_batches)
con.set_nbatches(argv.n_mini_batches)
con.set_alpha(argv.alpha)
con.set_margin(argv.margin)

if argv.bern_flag:
con.set_bern(1)
con.set_bern(argv.bern_flag)

if argv.ent_dimension != 0 and argv.rel_dimension != 0:
con.set_ent_dimension(argv.ent_dimension)
Expand Down Expand Up @@ -109,7 +107,8 @@ def create_model(con):

def create_model_evaluation(con):
'''
create the model using the Config parameters
create the model using the Config parameters for evaluation mode
creates only the model and gloabal step
'''
with tf.variable_scope("", reuse=None, initializer = tf.contrib.layers.xavier_initializer(uniform = True)):
trainModel = con.model(config = con)
Expand Down
Loading

0 comments on commit 29bf2db

Please sign in to comment.