diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 855e1df..b31c1d1 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,13 +4,15 @@ - - + + + - + + @@ -134,4 +136,15 @@ + + + + + file://$PROJECT_DIR$/TransE.py + 1 + + + + \ No newline at end of file diff --git a/Config.py b/Config.py index ef6ea5a..5dd4e06 100644 --- a/Config.py +++ b/Config.py @@ -12,12 +12,17 @@ class Config(object): ''' use ctypes to call C functions from python and set essential parameters. ''' - #EDIT + def __init__(self, cpp_lib_path=None, init_new_entities=False): - self.init_new_entities = init_new_entities + ''' + Init Config Class + :param cpp_lib_path: absolute path to .so file + :param init_new_entities: if true training and test variables are not initialized + ''' + self.init_new_entities = init_new_entities if init_new_entities == False: - #set c++ lib + #C library if cpp_lib_path == None: cpp_lib_path = '/home/luigi/IdeaProjects/OpenKEonSpark/release/Base.so' base_file = os.path.abspath(cpp_lib_path) @@ -30,7 +35,7 @@ def __init__(self, cpp_lib_path=None, init_new_entities=False): self.lib.testTail.restype = ctypes.POINTER(ctypes.c_int64 * 8) self.lib.getHeadBatch.argtypes = [ctypes.c_int64, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] self.lib.testHead.argtypes = [ctypes.c_int64, ctypes.c_void_p] - self.lib.testHead.restype = ctypes.POINTER(ctypes.c_int64 * 4) + self.lib.testHead.restype = ctypes.POINTER(ctypes.c_int64 * 8) self.test_head = 0 #triple classification @@ -54,13 +59,9 @@ def __init__(self, cpp_lib_path=None, init_new_entities=False): self.negative_rel = 0 self.workThreads = 8 self.alpha = 0.001 - self.lmbda = 0.000 - self.log_on = 1 self.exportName = None self.importName = None - self.export_steps = 0 self.opt_method = "SGD" - self.optimizer = None self.test_link_prediction = False self.test_triple_classification = False self.valid_triple_classification = False @@ -84,9 +85,6 @@ def init_triple_classification(self): self.lib.importTestFiles() self.lib.importTypeFiles() - self.acc = np.zeros(1, dtype = np.float32) - self.acc_addr = self.acc.__array_interface__['data'][0] - self.test_pos_h = np.zeros(self.lib.getTestTotal(), dtype = np.int64) self.test_pos_t = np.zeros(self.lib.getTestTotal(), dtype = np.int64) self.test_pos_r = np.zeros(self.lib.getTestTotal(), dtype = np.int64) @@ -116,14 +114,18 @@ def init_triple_classification(self): self.relThresh = np.zeros(self.lib.getRelationTotal(), dtype = np.float32) self.relThresh_addr = self.relThresh.__array_interface__['data'][0] + self.acc = np.zeros(1, dtype = np.float32) + self.acc_addr = self.acc.__array_interface__['data'][0] + def init_valid_triple_classification(self): + r''' + import essential files and set essential interfaces for triple classification + (on validation set, used during training) + ''' self.lib.importTestFiles() self.lib.importTypeFiles() - self.acc = np.zeros(1, dtype = np.float32) - self.acc_addr = self.acc.__array_interface__['data'][0] - self.valid_pos_h = np.zeros(self.lib.getValidTotal(), dtype = np.int64) self.valid_pos_t = np.zeros(self.lib.getValidTotal(), dtype = np.int64) self.valid_pos_r = np.zeros(self.lib.getValidTotal(), dtype = np.int64) @@ -141,9 +143,15 @@ def init_valid_triple_classification(self): self.relThresh = np.zeros(self.lib.getRelationTotal(), dtype = np.float32) self.relThresh_addr = self.relThresh.__array_interface__['data'][0] + self.acc = np.zeros(1, dtype = np.float32) + self.acc_addr = self.acc.__array_interface__['data'][0] + + - # prepare for train and test def init(self): + ''' + prepare for train and test + ''' if self.init_new_entities == False: self.trainModel = None if self.in_path != None: @@ -176,13 +184,21 @@ def init(self): self.init_valid_triple_classification() def set_n_threads_LP(self, n): + ''' + Set the number of threads used during Link prediction evaluation + :param n: the number of threads + ''' self.N_THREADS_LP = n self.lp_res = [] for _ in range(self.N_THREADS_LP): self.lp_res.append({}) def set_mini_batch(self): + ''' + Set mini batch used during training + This function checks for specified mini batch parameter + If it has not been specified the mini batch is automatically set + ''' tot = None - if self.bt > 0: tot = self.bt else: @@ -201,129 +217,197 @@ def set_mini_batch(self): def set_test_log_path(self, p): + ''' + Set test log path used from link prediction to store checkpoint file + If the link prediction evaluation task is interrupted, the evaluation will restart from last checkpoint + :param p: absolute path + ''' self.test_log_path = p def get_ent_total(self): + ''' + :return: the number of entites + ''' return self.entTotal def get_rel_total(self): + ''' + :return: the number of relations + ''' return self.relTotal - def set_lmbda(self, lmbda): - self.lmbda = lmbda - - def set_optimizer(self, optimizer): - self.optimizer = optimizer - def set_opt_method(self, method): + ''' + Set the optimization method + :param method: a string representing the optimization method + the current opt method supported are SGD and Adam + ''' self.opt_method = method def set_test_link_prediction(self, flag): + ''' + If True link prediction evaluation will be performed when test method is called + ''' self.test_link_prediction = flag def set_test_triple_classification(self, flag): + ''' + If true triple classification evaluation will be performed when test method is called + ''' self.test_triple_classification = flag def set_valid_triple_classification(self, flag): + ''' + If true, triple classification evaluation will be performed on validation set during training + (Early stop) + ''' self.valid_triple_classification = flag - def set_log_on(self, flag): - self.log_on = flag - def set_alpha(self, alpha): + ''' + Set learning rate + ''' self.alpha = alpha def set_in_path(self, path): + ''' + Set path where training files are located + ''' self.in_path = path def set_out_files(self, path): + ''' + Set path where the model will be saved + ''' self.out_path = path def set_bern(self, bern): + ''' + Set whether to use bern method during sampling + :param bern: 1 for True, 0 for False + ''' self.bern = bern def set_test_head(self, test_head): + ''' + Set whether to test link prediction on triple head, too + By default the link prediction evaluation will be performed only on tail + :param test_head: 1 for True, 0 for False + ''' self.test_head = test_head def set_dimension(self, dim): + ''' + Set embedding dimension for both the entities and relations + ''' self.hidden_size = dim self.ent_size = dim self.rel_size = dim def set_ent_dimension(self, dim): + ''' + Set embedding dimension for entities + ''' self.ent_size = dim def set_rel_dimension(self, dim): + ''' + Set embedding dimension for relations + ''' self.rel_size = dim def set_train_times(self, times): + ''' + Set number of epochs + ''' self.train_times = times def set_nbatches(self, nbatches): + ''' + Set number of batches + ''' self.nbatches = nbatches def set_margin(self, margin): + ''' + Set margin hyperparameter + ''' self.margin = margin def set_ent_neg_rate(self, rate): + ''' + Set number of corrupted triples generated during training for each triple + (Corrupt head/tail) + ''' self.negative_ent = rate def set_rel_neg_rate(self, rate): + ''' + Set number of corrupted triples generated during training for each triple + (Corrupt rel) + ''' self.negative_rel = rate def set_import_files(self, path): + ''' + Set path where is located the model to import + ''' self.importName = path - def set_export_files(self, path, steps = 0): + def set_export_files(self, path): + ''' + Set path where output model will be located + ''' self.exportName = path - self.export_steps = steps - def set_export_steps(self, steps): - self.export_steps = steps - # call C function for sampling def sampling(self): + ''' + Call C function for batch sampling during training + ''' self.lib.sampling(self.batch_h_addr, self.batch_t_addr, self.batch_r_addr, self.batch_y_addr, self.batch_size, self.negative_ent, self.negative_rel) - # save model + def save_tensorflow(self): + ''' + Save tensorflow model + ''' with self.graph.as_default(): with self.sess.as_default(): self.saver.save(self.sess, self.exportName) + def save_tensorflow_weights(self, export_name=None, write_meta_graph=False): + ''' + Save only tensorflow model weights + :return: + ''' if export_name == None: export_name = self.exportName with self.graph.as_default(): with self.sess.as_default(): self.saver.save(self.sess, export_name, write_meta_graph=write_meta_graph, write_state=False) + def restore_tensorflow(self): + ''' + Restore tensorflow model defined in importName var + ''' with self.graph.as_default(): with self.sess.as_default(): self.saver.restore(self.sess, self.importName) - - def export_variables(self, path = None): - with self.graph.as_default(): - with self.sess.as_default(): - if path == None: - self.saver.save(self.sess, self.exportName) - else: - self.saver.save(self.sess, path) - - def import_variables(self, path = None): - with self.graph.as_default(): - with self.sess.as_default(): - if path == None: - self.saver.restore(self.sess, self.importName) - else: - self.saver.restore(self.sess, path) - def get_parameter_lists(self): + ''' + :return: trainModel variables + ''' return self.trainModel.parameter_lists def get_parameters_by_name(self, var_name): + ''' + :param var_name: + :return: trainModel variable + ''' with self.graph.as_default(): with self.sess.as_default(): if var_name in self.trainModel.parameter_lists: @@ -358,11 +442,19 @@ def set_parameters(self, lists): for i in lists: self.set_parameters_by_name(i, lists[i]) + def set_model(self, model): + ''' + Set training model + ''' self.model = model def import_model(self, ckpt): + ''' + Import variables from a specific trained model version + :param ckpt: path/to/checkpoint/ + ''' self.graph = tf.Graph() with self.graph.as_default(): self.sess = tf.Session() @@ -374,6 +466,10 @@ def import_model(self, ckpt): def set_model_and_session(self, model): + ''' + Init the training algorithm variables and the tensorflow session + :parm model: (TransE / TransH / TransR / TransD) + ''' self.model = model self.graph = tf.Graph() with self.graph.as_default(): @@ -387,6 +483,9 @@ def set_model_and_session(self, model): def train_step(self, batch_h, batch_t, batch_r, batch_y): + ''' + Perform a single training step + ''' feed_dict = { self.trainModel.batch_h: batch_h, self.trainModel.batch_t: batch_t, @@ -398,6 +497,9 @@ def train_step(self, batch_h, batch_t, batch_r, batch_y): def test_step(self, test_h, test_t, test_r): + ''' + Perform a single test step + ''' feed_dict = { self.trainModel.predict_h: test_h, self.trainModel.predict_t: test_t, @@ -408,13 +510,21 @@ def test_step(self, test_h, test_t, test_r): def test_lp_range(self, index, lef, rig): + ''' + This method is used to parallelize link prediction evaluation task among different threads + Each thread will perform link prediction evaluation from the lef-th test triple and the rig-th test triple + Each thread will save checkpoints from which to restore the task in case of interruption + :param index: thread index + :param lef: test triple id left limit + :param rig: test triple id right limit + ''' + #init tail variables current_lp_res = { 'r_tot' : 0.0, 'r_filter_tot' : 0.0, 'r_tot_constrain' : 0.0, 'r_filter_tot_constrain' : 0.0, 'r1_tot' : 0.0, 'r1_filter_tot' : 0.0, 'r1_tot_constrain' : 0.0, 'r1_filter_tot_constrain' : 0.0, 'r3_tot' : 0.0, 'r3_filter_tot' : 0.0, 'r3_tot_constrain' : 0.0, 'r3_filter_tot_constrain' : 0.0, 'r_rank' : 0.0, 'r_filter_rank' : 0.0, 'r_rank_constrain' : 0.0, 'r_filter_rank_constrain' : 0.0, 'r_reci_rank' : 0.0,'r_filter_reci_rank' : 0.0, 'r_reci_rank_constrain' : 0.0, 'r_filter_reci_rank_constrain' : 0.0, - 'r_mis_err' : 0.0, 'r_spec_err' : 0.0, 'r_gen_err' : 0.0, 'r_filter_mis_err' : 0.0, 'r_filter_spec_err' : 0.0, 'r_filter_gen_err' : 0.0, 'r_mis_err_constrain' : 0.0, 'r_spec_err_constrain' : 0.0, 'r_gen_err_constrain' : 0.0, @@ -422,6 +532,7 @@ def test_lp_range(self, index, lef, rig): } if self.test_head != 0: + #init head variable current_lp_res['l_tot'] = 0.0 current_lp_res['l_filter_tot'] = 0.0 current_lp_res['l_tot_constrain'] = 0.0 @@ -442,7 +553,20 @@ def test_lp_range(self, index, lef, rig): current_lp_res['l_filter_reci_rank'] = 0.0 current_lp_res['l_reci_rank_constrain'] = 0.0 current_lp_res['l_filter_reci_rank_constrain'] = 0.0 - + current_lp_res['l_mis_err'] = 0.0 + current_lp_res['l_spec_err'] = 0.0 + current_lp_res['l_gen_err'] = 0.0 + current_lp_res['l_filter_mis_err'] = 0.0 + current_lp_res['l_filter_spec_err'] = 0.0 + current_lp_res['l_filter_gen_err'] = 0.0 + current_lp_res['l_mis_err_constrain'] = 0.0 + current_lp_res['l_spec_err_constrain'] = 0.0 + current_lp_res['l_gen_err_constrain'] = 0.0 + current_lp_res['l_filter_mis_err_constrain'] = 0.0 + current_lp_res['l_filter_spec_err_constrain'] = 0.0 + current_lp_res['l_filter_gen_err_constrain'] = 0.0 + + #init test arrays test_h = np.zeros(self.lib.getEntityTotal(), dtype = np.int64) test_t = np.zeros(self.lib.getEntityTotal(), dtype = np.int64) test_r = np.zeros(self.lib.getEntityTotal(), dtype = np.int64) @@ -452,6 +576,7 @@ def test_lp_range(self, index, lef, rig): print("Test link prediction range from {} to {}".format(lef, rig-1)) + #restore from last checkpoint (if founded) if os.path.exists(self.test_log_path+"thread"+str(index)): with open(self.test_log_path+"thread"+str(index), 'r') as f: last_i = int(f.readline()) @@ -462,7 +587,7 @@ def test_lp_range(self, index, lef, rig): test_triples_done = 0 for i in range(lef, rig): - #tail + #tail link prediction on i-th test triple self.lib.getTailBatch(i, test_h_addr, test_t_addr, test_r_addr) res = self.test_step(test_h, test_t, test_r) test_tail_res = [j for j in self.lib.testTail(i, res.__array_interface__['data'][0]).contents] @@ -471,121 +596,131 @@ def test_lp_range(self, index, lef, rig): r_filter_s = test_tail_res[1] r_s_constrain = test_tail_res[2] r_filter_s_constrain = test_tail_res[3] - r_min = test_tail_res[4] r_filter_min = test_tail_res[5] r_constrain_min = test_tail_res[6] r_filter_constrain_min = test_tail_res[7] + #hits if (r_filter_s < 10): current_lp_res['r_filter_tot'] += 1 if (r_s < 10): current_lp_res['r_tot'] += 1 if (r_filter_s < 3): current_lp_res['r3_filter_tot'] += 1 if (r_s < 3): current_lp_res['r3_tot'] += 1 - if (r_filter_s < 1): - current_lp_res['r1_filter_tot'] += 1 - elif (r_filter_min == 1): - current_lp_res['r_filter_gen_err'] += 1 - elif (r_filter_min == 2): - current_lp_res['r_filter_spec_err'] += 1 - else: - current_lp_res['r_filter_mis_err'] += 1 - - if (r_s < 1): - current_lp_res['r1_tot'] += 1 - elif (r_min == 1): - current_lp_res['r_gen_err'] += 1 - elif (r_min == 2): - current_lp_res['r_spec_err'] += 1 - else: - current_lp_res['r_mis_err'] += 1 + if (r_filter_s_constrain < 10): current_lp_res['r_filter_tot_constrain'] += 1 + if (r_s_constrain < 10): current_lp_res['r_tot_constrain'] += 1 + if (r_filter_s_constrain < 3): current_lp_res['r3_filter_tot_constrain'] += 1 + if (r_s_constrain < 3): current_lp_res['r3_tot_constrain'] += 1 + + #ontology + if (r_filter_s < 1): current_lp_res['r1_filter_tot'] += 1 + elif (r_filter_min == 1): current_lp_res['r_filter_gen_err'] += 1 + elif (r_filter_min == 2): current_lp_res['r_filter_spec_err'] += 1 + else: current_lp_res['r_filter_mis_err'] += 1 + if (r_s < 1): current_lp_res['r1_tot'] += 1 + elif (r_min == 1): current_lp_res['r_gen_err'] += 1 + elif (r_min == 2): current_lp_res['r_spec_err'] += 1 + else: current_lp_res['r_mis_err'] += 1 + if (r_filter_s_constrain < 1): current_lp_res['r1_filter_tot_constrain'] += 1 + elif (r_filter_constrain_min == 1): current_lp_res['r_filter_gen_err_constrain'] += 1 + elif (r_filter_constrain_min == 2): current_lp_res['r_filter_spec_err_constrain'] += 1 + else: current_lp_res['r_filter_mis_err_constrain'] += 1 + if (r_s_constrain < 1): current_lp_res['r1_tot_constrain'] += 1 + elif (r_constrain_min == 1): current_lp_res['r_gen_err_constrain'] += 1 + elif (r_constrain_min == 2): current_lp_res['r_spec_err_constrain'] += 1 + else: current_lp_res['r_mis_err_constrain'] += 1 + + #MR current_lp_res['r_filter_rank'] += (1+r_filter_s) current_lp_res['r_rank'] += (1+r_s) current_lp_res['r_filter_reci_rank'] += np.divide(1.0, (1+r_filter_s)) current_lp_res['r_reci_rank'] += np.divide(1.0, (1+r_s)) - #TYPE_C - if (r_filter_s_constrain < 10): current_lp_res['r_filter_tot_constrain'] += 1 - if (r_s_constrain < 10): current_lp_res['r_tot_constrain'] += 1 - if (r_filter_s_constrain < 3): current_lp_res['r3_filter_tot_constrain'] += 1 - if (r_s_constrain < 3): current_lp_res['r3_tot_constrain'] += 1 - - if (r_filter_s_constrain < 1): - current_lp_res['r1_filter_tot_constrain'] += 1 - elif (r_filter_constrain_min == 1): - current_lp_res['r_filter_gen_err_constrain'] += 1 - elif (r_filter_constrain_min == 2): - current_lp_res['r_filter_spec_err_constrain'] += 1 - else: - current_lp_res['r_filter_mis_err_constrain'] += 1 - - if (r_s_constrain < 1): - current_lp_res['r1_tot_constrain'] += 1 - elif (r_constrain_min == 1): - current_lp_res['r_gen_err_constrain'] += 1 - elif (r_constrain_min == 2): - current_lp_res['r_spec_err_constrain'] += 1 - else: - current_lp_res['r_mis_err_constrain'] += 1 - current_lp_res['r_filter_rank_constrain'] += (1+r_filter_s_constrain) current_lp_res['r_rank_constrain'] += (1+r_s_constrain) current_lp_res['r_filter_reci_rank_constrain'] += np.divide(1.0, (1+r_filter_s_constrain)) current_lp_res['r_reci_rank_constrain'] += np.divide(1.0, (1+r_s_constrain)) - #head + if self.test_head != 0: - #head + #head link prediction on i-th test triple self.lib.getHeadBatch(i, test_h_addr, test_t_addr, test_r_addr) res = self.test_step(test_h, test_t, test_r) test_head_res = [j for j in self.lib.testHead(i, res.__array_interface__['data'][0]).contents] + l_s = test_head_res[0] l_filter_s = test_head_res[1] l_s_constrain = test_head_res[2] l_filter_s_constrain = test_head_res[3] + l_min = test_head_res[4] + l_filter_min = test_head_res[5] + l_constrain_min = test_head_res[6] + l_filter_constrain_min = test_head_res[7] + #hits if (l_filter_s < 10): current_lp_res['l_filter_tot'] += 1 if (l_s < 10): current_lp_res['l_tot'] += 1 if (l_filter_s < 3): current_lp_res['l3_filter_tot'] += 1 if (l_s < 3): current_lp_res['l3_tot'] += 1 - if (l_filter_s < 1): current_lp_res['l1_filter_tot'] += 1 - if (l_s < 1): current_lp_res['l1_tot'] += 1 - current_lp_res['l_filter_rank'] += (l_filter_s+1) - current_lp_res['l_rank'] += (1+l_s) - current_lp_res['l_filter_reci_rank'] += np.divide(1.0, (l_filter_s+1)) - current_lp_res['l_reci_rank'] += np.divide(1.0, (l_s+1)) - - #TYPE_C if (l_filter_s_constrain < 10): current_lp_res['l_filter_tot_constrain'] += 1 if (l_s_constrain < 10): current_lp_res['l_tot_constrain'] += 1 if (l_filter_s_constrain < 3): current_lp_res['l3_filter_tot_constrain'] += 1 if (l_s_constrain < 3): current_lp_res['l3_tot_constrain'] += 1 + + #ontology + if (l_filter_s < 1): current_lp_res['l1_filter_tot'] += 1 + elif (l_filter_min == 1): current_lp_res['l_filter_gen_err'] += 1 + elif (l_filter_min == 2): current_lp_res['l_filter_spec_err'] += 1 + else: current_lp_res['l_filter_mis_err'] += 1 + + if (l_s < 1): current_lp_res['l1_tot'] += 1 + elif (l_min == 1): current_lp_res['l_gen_err'] += 1 + elif (l_min == 2): current_lp_res['l_spec_err'] += 1 + else: current_lp_res['l_mis_err'] += 1 + if (l_filter_s_constrain < 1): current_lp_res['l1_filter_tot_constrain'] += 1 + elif (l_filter_constrain_min == 1): current_lp_res['l_filter_gen_err_constrain'] += 1 + elif (l_filter_constrain_min == 2): current_lp_res['l_filter_spec_err_constrain'] += 1 + else: current_lp_res['l_filter_mis_err_constrain'] += 1 + if (l_s_constrain < 1): current_lp_res['l1_tot_constrain'] += 1 + elif (l_constrain_min == 1): current_lp_res['l_gen_err_constrain'] += 1 + elif (l_constrain_min == 2): current_lp_res['l_spec_err_constrain'] += 1 + else: current_lp_res['l_mis_err_constrain'] += 1 + + #MR + current_lp_res['l_filter_rank'] += (l_filter_s+1) + current_lp_res['l_rank'] += (1+l_s) + current_lp_res['l_filter_reci_rank'] += np.divide(1.0, (l_filter_s+1)) + current_lp_res['l_reci_rank'] += np.divide(1.0, (l_s+1)) current_lp_res['l_filter_rank_constrain'] += (l_filter_s_constrain+1) current_lp_res['l_rank_constrain'] += (1+l_s_constrain) current_lp_res['l_filter_reci_rank_constrain'] += np.divide(1.0, (l_filter_s_constrain+1)) current_lp_res['l_reci_rank_constrain'] += np.divide(1.0, (l_s_constrain+1)) - if index == 0: sys.stdout.write("\r# of test triples processed: {}".format(i * self.N_THREADS_LP)) + if index == 0: sys.stdout.write("\r# of test triples processed: {}".format(i * self.N_THREADS_LP)) test_triples_done += 1 - + #save checkpoint if test_triples_done % 100 == 0: with open(self.test_log_path+"thread"+str(index), "w") as f: f.write(str(i)+'\n') for key in current_lp_res.keys(): f.write(str(current_lp_res[key])+'\n') + #share results self.lp_res[index] = current_lp_res def test(self): + ''' + Perform triple classifcation and link prediction evaluation + ''' with self.graph.as_default(): with self.sess.as_default(): if self.importName != None: @@ -594,13 +729,13 @@ def test(self): test_time_start = time.time() if self.test_link_prediction: + #set link prediction on tail variables d = { 'r_tot' : 0.0, 'r_filter_tot' : 0.0, 'r_tot_constrain' : 0.0, 'r_filter_tot_constrain' : 0.0, 'r1_tot' : 0.0, 'r1_filter_tot' : 0.0, 'r1_tot_constrain' : 0.0, 'r1_filter_tot_constrain' : 0.0, 'r3_tot' : 0.0, 'r3_filter_tot' : 0.0, 'r3_tot_constrain' : 0.0, 'r3_filter_tot_constrain' : 0.0, 'r_rank' : 0.0, 'r_filter_rank' : 0.0, 'r_rank_constrain' : 0.0, 'r_filter_rank_constrain' : 0.0, 'r_reci_rank' : 0.0,'r_filter_reci_rank' : 0.0, 'r_reci_rank_constrain' : 0.0, 'r_filter_reci_rank_constrain' : 0.0, - 'r_mis_err' : 0.0, 'r_spec_err' : 0.0, 'r_gen_err' : 0.0, 'r_filter_mis_err' : 0.0, 'r_filter_spec_err' : 0.0, 'r_filter_gen_err' : 0.0, 'r_mis_err_constrain' : 0.0, 'r_spec_err_constrain' : 0.0, 'r_gen_err_constrain' : 0.0, @@ -608,6 +743,7 @@ def test(self): } if self.test_head != 0: + #set link prediction on head variables d['l_tot'] = 0.0 d['l_filter_tot'] = 0.0 d['l_tot_constrain'] = 0.0 @@ -628,13 +764,25 @@ def test(self): d['l_filter_reci_rank'] = 0.0 d['l_reci_rank_constrain'] = 0.0 d['l_filter_reci_rank_constrain'] = 0.0 - + d['l_mis_err'] = 0.0 + d['l_spec_err'] = 0.0 + d['l_gen_err'] = 0.0 + d['l_filter_mis_err'] = 0.0 + d['l_filter_spec_err'] = 0.0 + d['l_filter_gen_err'] = 0.0 + d['l_mis_err_constrain'] = 0.0 + d['l_spec_err_constrain'] = 0.0 + d['l_gen_err_constrain'] = 0.0 + d['l_filter_mis_err_constrain'] = 0.0 + d['l_filter_spec_err_constrain'] = 0.0 + d['l_filter_gen_err_constrain'] = 0.0 testTotal = self.lib.getTestTotal() triples_per_thread = int(testTotal / self.N_THREADS_LP) print("Number of test triples: {} Number of triples per test thread: {}".format(testTotal, triples_per_thread)) threads_array = [] + #parallelize link prediction evaluation to speedup the work lef = 0 rig = 0 for j in range(self.N_THREADS_LP): @@ -644,51 +792,47 @@ def test(self): rig += triples_per_thread threads_array.append(threading.Thread(target=self.test_lp_range, args=(j, lef, rig, ))) lef = rig - - for t in threads_array: t.start() - for t in threads_array: t.join() + #get results from threds for res in self.lp_res: for key in res.keys(): d[key] += res[key] - for key in d.keys(): d[key] = np.divide(d[key], testTotal) - + #print link prediction evaluation results print("\n ========== LINK PREDICTION RESULTS ==========\nNo type constraint results:") print("{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}".format("metric", "MRR", "MR", "hit@10", "hit@3", "hit@1", "hit@1GenError", "hit@1SpecError", "hit@1MisError")) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format("l(raw):",d['l_reci_rank'], d['l_rank'], d['l_tot'], d['l3_tot'], d['l1_tot'])) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format("l(raw):",d['l_reci_rank'], d['l_rank'], d['l_tot'], d['l3_tot'], d['l1_tot'], d['l_gen_err'], d['l_spec_err'], d['l_mis_err'])) print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format("r(raw):", d['r_reci_rank'], d['r_rank'], d['r_tot'], d['r3_tot'], d['r1_tot'], d['r_gen_err'], d['r_spec_err'], d['r_mis_err'])) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format("mean(raw):", np.divide((d['l_reci_rank']+d['r_reci_rank']),2), np.divide((d['l_rank']+d['r_rank']),2), np.divide((d['l_tot']+d['r_tot']),2), np.divide((d['l3_tot']+d['r3_tot']),2), np.divide((d['l1_tot']+d['r1_tot']),2))) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format("mean(raw):", np.divide((d['l_reci_rank']+d['r_reci_rank']),2), np.divide((d['l_rank']+d['r_rank']),2), np.divide((d['l_tot']+d['r_tot']),2), np.divide((d['l3_tot']+d['r3_tot']),2), np.divide((d['l1_tot']+d['r1_tot']),2), np.divide((d['l_gen_err']+d['r_gen_err']),2), np.divide((d['l_spec_err']+d['r_spec_err']),2), np.divide((d['l_mis_err']+d['r_mis_err']),2))) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format("l(filter):", d['l_filter_reci_rank'], d['l_filter_rank'], d['l_filter_tot'], d['l3_filter_tot'], d['l1_filter_tot'])) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format("l(filter):", d['l_filter_reci_rank'], d['l_filter_rank'], d['l_filter_tot'], d['l3_filter_tot'], d['l1_filter_tot'], d['l_filter_gen_err'], d['l_filter_spec_err'], d['l_filter_mis_err'])) print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('r(filter):', d['r_filter_reci_rank'], d['r_filter_rank'], d['r_filter_tot'], d['r3_filter_tot'], d['r1_filter_tot'], d['r_filter_gen_err'], d['r_filter_spec_err'], d['r_filter_mis_err'])) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(filter):', np.divide((d['l_filter_reci_rank']+d['r_filter_reci_rank']),2), np.divide((d['l_filter_rank']+d['r_filter_rank']),2), np.divide((d['l_filter_tot']+d['r_filter_tot']),2), np.divide((d['l3_filter_tot']+d['r3_filter_tot']),2), np.divide((d['l1_filter_tot']+d['r1_filter_tot']),2))) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(filter):', np.divide((d['l_filter_reci_rank']+d['r_filter_reci_rank']),2), np.divide((d['l_filter_rank']+d['r_filter_rank']),2), np.divide((d['l_filter_tot']+d['r_filter_tot']),2), np.divide((d['l3_filter_tot']+d['r3_filter_tot']),2), np.divide((d['l1_filter_tot']+d['r1_filter_tot']),2), np.divide((d['l_filter_gen_err']+d['r_filter_gen_err']),2), np.divide((d['l_filter_spec_err']+d['r_filter_spec_err']),2), np.divide((d['l_filter_mis_err']+d['r_filter_mis_err']),2))) print("Type constraint results:") print("{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}{:<20}".format("metric", "MRR", "MR", "hit@10", "hit@3", "hit@1", "hit@1GenError", "hit@1SpecError", "hit@1MisError")) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('l(raw):', d['l_reci_rank_constrain'], d['l_rank_constrain'], d['l_tot_constrain'], d['l3_tot_constrain'], d['l1_tot_constrain'])) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('l(raw):', d['l_reci_rank_constrain'], d['l_rank_constrain'], d['l_tot_constrain'], d['l3_tot_constrain'], d['l1_tot_constrain'], d['l_gen_err_constrain'], d['l_spec_err_constrain'], d['l_mis_err_constrain'])) print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('r(raw):', d['r_reci_rank_constrain'], d['r_rank_constrain'], d['r_tot_constrain'], d['r3_tot_constrain'], d['r1_tot_constrain'], d['r_gen_err_constrain'], d['r_spec_err_constrain'], d['r_mis_err_constrain'])) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(raw):', np.divide((d['l_reci_rank_constrain']+d['r_reci_rank_constrain']),2), np.divide((d['l_rank_constrain']+d['r_rank_constrain']),2), np.divide((d['l_tot_constrain']+d['r_tot_constrain']),2), np.divide((d['l3_tot_constrain']+d['r3_tot_constrain']),2), np.divide((d['l1_tot_constrain']+d['r1_tot_constrain']),2))) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(raw):', np.divide((d['l_reci_rank_constrain']+d['r_reci_rank_constrain']),2), np.divide((d['l_rank_constrain']+d['r_rank_constrain']),2), np.divide((d['l_tot_constrain']+d['r_tot_constrain']),2), np.divide((d['l3_tot_constrain']+d['r3_tot_constrain']),2), np.divide((d['l1_tot_constrain']+d['r1_tot_constrain']),2), np.divide((d['l_gen_err_constrain']+d['r_gen_err_constrain']),2), np.divide((d['l_spec_err_constrain']+d['r_spec_err_constrain']),2), np.divide((d['l_mis_err_constrain']+d['r_mis_err_constrain']),2))) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('l(filter):', d['l_filter_reci_rank_constrain'], d['l_filter_rank_constrain'], d['l_filter_tot_constrain'], d['l3_filter_tot_constrain'], d['l1_filter_tot_constrain'])) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('l(filter):', d['l_filter_reci_rank_constrain'], d['l_filter_rank_constrain'], d['l_filter_tot_constrain'], d['l3_filter_tot_constrain'], d['l1_filter_tot_constrain'], d['l_filter_gen_err_constrain'], d['l_filter_spec_err_constrain'], d['l_filter_mis_err_constrain'])) print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}".format('r(filter):', d['r_filter_reci_rank_constrain'], d['r_filter_rank_constrain'], d['r_filter_tot_constrain'], d['r3_filter_tot_constrain'], d['r1_filter_tot_constrain'], d['r_filter_gen_err_constrain'], d['r_filter_spec_err_constrain'], d['r_filter_mis_err_constrain'])) - if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(filter):', np.divide((d['l_filter_reci_rank_constrain']+d['r_filter_reci_rank_constrain']),2), np.divide((d['l_filter_rank_constrain']+d['r_filter_rank_constrain']),2), np.divide((d['l_filter_tot_constrain']+d['r_filter_tot_constrain']),2), np.divide((d['l3_filter_tot_constrain']+d['r3_filter_tot_constrain']),2), np.divide((d['l1_filter_tot_constrain']+d['r1_filter_tot_constrain']),2))) + if self.test_head != 0: print("{:<20}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}{:<20.5f}\n".format('mean(filter):', np.divide((d['l_filter_reci_rank_constrain']+d['r_filter_reci_rank_constrain']),2), np.divide((d['l_filter_rank_constrain']+d['r_filter_rank_constrain']),2), np.divide((d['l_filter_tot_constrain']+d['r_filter_tot_constrain']),2), np.divide((d['l3_filter_tot_constrain']+d['r3_filter_tot_constrain']),2), np.divide((d['l1_filter_tot_constrain']+d['r1_filter_tot_constrain']),2), np.divide((d['l_filter_gen_err_constrain']+d['r_filter_gen_err_constrain']),2), np.divide((d['l_filter_spec_err_constrain']+d['r_filter_spec_err_constrain']),2), np.divide((d['l_filter_mis_err_constrain']+d['r_filter_mis_err_constrain']),2))) - #remove test checkpoint + #remove checkpoint generated from threads print() for index in range(0, self.N_THREADS_LP): - try: - os.remove(self.test_log_path+"thread"+str(index)) - except: - print(" LOG:\tFile " + self.test_log_path+"thread"+str(index) + " not founded") + try: os.remove(self.test_log_path+"thread"+str(index)) + except: print(" LOG:\tFile " + self.test_log_path+"thread"+str(index) + " not founded") print() + #perform triple classification evaluation if self.test_triple_classification: self.lib.getValidBatch(self.valid_pos_h_addr, self.valid_pos_t_addr, self.valid_pos_r_addr, self.valid_neg_h_addr, self.valid_neg_t_addr, self.valid_neg_r_addr) res_pos = self.test_step(self.valid_pos_h, self.valid_pos_t, self.valid_pos_r) @@ -708,18 +852,6 @@ def test(self): print("\nElapsed test time (seconds): {}".format(test_time_elapsed)) - def valid(self): - with self.graph.as_default(): - with self.sess.as_default(): - if self.importName != None: - self.restore_tensorflow() - self.lib.getValidBatch(self.valid_pos_h_addr, self.valid_pos_t_addr, self.valid_pos_r_addr, self.valid_neg_h_addr, self.valid_neg_t_addr, self.valid_neg_r_addr) - res_pos = self.test_step(self.valid_pos_h, self.valid_pos_t, self.valid_pos_r) - res_neg = self.test_step(self.valid_neg_h, self.valid_neg_t, self.valid_neg_r) - self.lib.getBestThreshold(self.relThresh_addr, res_pos.__array_interface__['data'][0], res_neg.__array_interface__['data'][0]) - self.lib.test_triple_classification(self.relThresh_addr, res_pos.__array_interface__['data'][0], res_neg.__array_interface__['data'][0], self.acc_addr) - - def predict_head_entity(self, t, r, k): r'''This mothod predicts the top k head entities given tail entity and relation. diff --git a/TransE.py b/TransE.py index 91fef0b..8810365 100644 --- a/TransE.py +++ b/TransE.py @@ -55,5 +55,4 @@ def predict_def(self): predict_h_e = tf.nn.embedding_lookup(self.ent_embeddings, predict_h) predict_t_e = tf.nn.embedding_lookup(self.ent_embeddings, predict_t) predict_r_e = tf.nn.embedding_lookup(self.rel_embeddings, predict_r) - ##--## self.predict = tf.reduce_mean(self._calc(predict_h_e, predict_t_e, predict_r_e), 1, keep_dims = False) \ No newline at end of file diff --git a/__pycache__/Config.cpython-36.pyc b/__pycache__/Config.cpython-36.pyc index 916bfcc..c38d694 100644 Binary files a/__pycache__/Config.cpython-36.pyc and b/__pycache__/Config.cpython-36.pyc differ diff --git a/__pycache__/TransE.cpython-36.pyc b/__pycache__/TransE.cpython-36.pyc index a2bd80d..46528fe 100644 Binary files a/__pycache__/TransE.cpython-36.pyc and b/__pycache__/TransE.cpython-36.pyc differ diff --git a/__pycache__/distribute_training.cpython-36.pyc b/__pycache__/distribute_training.cpython-36.pyc index c6ac2ab..59a2ad0 100644 Binary files a/__pycache__/distribute_training.cpython-36.pyc and b/__pycache__/distribute_training.cpython-36.pyc differ diff --git a/base/Base.cpp b/base/Base.cpp index 2444e11..e0ddb16 100644 --- a/base/Base.cpp +++ b/base/Base.cpp @@ -40,11 +40,9 @@ INT getTestTotal(); extern "C" INT getValidTotal(); -//EDIT extern "C" INT getTrainTotal_(); -//EDIT extern "C" INT getBatchTotal(); @@ -54,7 +52,6 @@ void randReset(); extern "C" void importTrainFiles(); -//EDIT extern "C" void importOntologyFiles(); @@ -95,7 +92,6 @@ void* getBatch(void* con) { INT i; /** - * EDIT * select batch triple / train triple **/ if (newBatchTotal > 0){ @@ -105,7 +101,7 @@ void* getBatch(void* con) { i = rand_max(id, trainTotal_); } - //EDIT changed trainList to trainList_no + batch_h[batch] = trainList_no[i].h; batch_t[batch] = trainList_no[i].t; batch_r[batch] = trainList_no[i].r; diff --git a/base/Reader.h b/base/Reader.h index 7644e17..25966d6 100644 --- a/base/Reader.h +++ b/base/Reader.h @@ -100,16 +100,18 @@ void importTrainFiles() { trainList_no[i].t = trainList[i].t; trainList_no[i].r = trainList[i].r; } + fclose(fin); std::sort(trainList, trainList + trainTotal, Triple::cmp_head); + tmp = trainTotal; trainTotal = 1; trainHead[0] = trainTail[0] = trainRel[0] = trainList[0]; freqEnt[trainList[0].t] += 1; freqEnt[trainList[0].h] += 1; freqRel[trainList[0].r] += 1; - for (INT i = 1; i < tmp; i++) + for (INT i = 1; i < tmp; i++){ if (trainList[i].h != trainList[i - 1].h || trainList[i].r != trainList[i - 1].r || trainList[i].t != trainList[i - 1].t) { @@ -120,6 +122,7 @@ void importTrainFiles() { freqEnt[trainList[i].h]++; freqRel[trainList[i].r]++; } + } std::sort(trainHead, trainHead + trainTotal, Triple::cmp_head); std::sort(trainTail, trainTail + trainTotal, Triple::cmp_tail); @@ -174,6 +177,7 @@ void importTrainFiles() { left_mean[i] = freqRel[i] / left_mean[i]; right_mean[i] = freqRel[i] / right_mean[i]; } + } Triple *testList; diff --git a/base/Setting.h b/base/Setting.h index 53207d7..5c8504f 100644 --- a/base/Setting.h +++ b/base/Setting.h @@ -53,13 +53,11 @@ INT tripleTotal = 0; INT testTotal = 0; INT trainTotal = 0; -//EDIT //training triples with duplicates INT trainTotal_ = 0; INT validTotal = 0; -//EDIT INT newBatchTotal = 0; extern "C" @@ -82,13 +80,13 @@ INT getTrainTotal() { return trainTotal; } -//EDIT + extern "C" INT getTrainTotal_() { return trainTotal_; } -//EDIT + extern "C" INT getBatchTotal() { return newBatchTotal; diff --git a/base/Test.h b/base/Test.h index c5c7023..501715f 100644 --- a/base/Test.h +++ b/base/Test.h @@ -34,12 +34,22 @@ INT* testHead(INT index, REAL *con) { INT r = testList[index].r; REAL minimal = con[h]; - INT* l_arr = new INT[4]; + INT* l_arr = new INT[8]; INT l_s = 0; INT l_filter_s = 0; INT l_s_constrain = 0; INT l_filter_s_constrain = 0; + INT l_min = h; + INT l_filter_min = h; + INT l_constrain_min = h; + INT l_filter_constrain_min = h; + + REAL l_min_s = minimal; + REAL l_filter_min_s = minimal; + REAL l_constrain_min_s = minimal; + REAL l_filter_constrain_min_s = minimal; + INT lef = 0; INT rig = 0; lef = head_lef[r]; @@ -51,8 +61,17 @@ INT* testHead(INT index, REAL *con) { REAL value = con[j]; if (value < minimal) { l_s += 1; + if (value < l_min_s){ + l_min_s = value; + l_min = j; + } + if (not _find(j, t, r)) l_filter_s += 1; + if (value < l_filter_min_s){ + l_filter_min_s = value; + l_filter_min = j; + } } //TYPE_C @@ -60,8 +79,17 @@ INT* testHead(INT index, REAL *con) { if (lef < rig && j == head_type[lef]) { if (value < minimal) { l_s_constrain += 1; + if (value < l_constrain_min_s){ + l_constrain_min_s = value; + l_constrain_min = j; + } + if (not _find(j, t, r)) { l_filter_s_constrain += 1; + if (value < l_filter_constrain_min_s){ + l_filter_constrain_min_s = value; + l_filter_constrain_min = j; + } } } } @@ -73,6 +101,36 @@ INT* testHead(INT index, REAL *con) { l_arr[2] = l_s_constrain; l_arr[3] = l_filter_s_constrain; + l_arr[4] = l_min; + l_arr[5] = l_filter_min; + l_arr[6] = l_constrain_min; + l_arr[7] = l_filter_constrain_min; + + INT lef_sup = sup_lef[h]; + INT rig_sup = sup_rig[h]; + INT lef_sub = sub_lef[h]; + INT rig_sub = sub_rig[h]; + for(INT i = 4; i < 8; i++){ + if (l_arr[i] == h){ //it's ok + l_arr[i] = 0; + continue; + } + + while (lef_sup < rig_sup && sup_type[lef_sup] < l_arr[i]) lef_sup ++; //generalization error + if (lef_sup < rig_sup && l_arr[i] == sup_type[lef_sup]){ + l_arr[i] = 1; + continue; + } + + while (lef_sub < rig_sub && sub_type[lef_sub] < l_arr[i]) lef_sub ++; //specialization error + if (lef_sub < rig_sub && l_arr[i] == sub_type[lef_sub]){ + l_arr[i] = 2; + continue; + } + + l_arr[i] = 3; //misclassification error + } + return l_arr; } @@ -112,7 +170,6 @@ INT* testTail(INT index, REAL *con) { REAL value = con[j]; if (value < minimal) { r_s += 1; - if (value < r_min_s){ r_min_s = value; r_min = j; @@ -120,7 +177,6 @@ INT* testTail(INT index, REAL *con) { if (not _find(h, j, r)){ r_filter_s += 1; - if (value < r_filter_min_s){ r_filter_min_s = value; r_filter_min = j; @@ -134,7 +190,6 @@ INT* testTail(INT index, REAL *con) { if (lef < rig && j == tail_type[lef]) { if (value < minimal) { r_s_constrain += 1; - if (value < r_constrain_min_s){ r_constrain_min_s = value; r_constrain_min = j; @@ -143,7 +198,6 @@ INT* testTail(INT index, REAL *con) { if (not _find(h, j ,r)) { r_filter_s_constrain += 1; - if (value < r_filter_constrain_min_s){ r_filter_constrain_min_s = value; r_filter_constrain_min = j; diff --git a/commands.txt b/commands.txt index f5839ca..8c23688 100644 --- a/commands.txt +++ b/commands.txt @@ -1,8 +1,6 @@ #commands to run OpenKEonSpark on Google Colaboratory platform #install -!sudo apt-get update -!sudo apt-get upgrade !apt-get install openjdk-8-jdk-headless -qq > /dev/null import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" @@ -15,8 +13,8 @@ os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" from google.colab import drive drive.mount('/content/drive') -#unzip project -!unzip /content/drive/My\ Drive/DBpedia/OpenKEonSpark.zip +#clone project +!git clone https://github.com/luigiba/OpenKEonSpark.git !mv OpenKEonSpark/gpu_info.py /usr/local/lib/python3.6/dist-packages/tensorflowonspark/gpu_info.py #unzip dataset diff --git a/distribute_training.py b/distribute_training.py index a93baad..0ff3d7d 100644 --- a/distribute_training.py +++ b/distribute_training.py @@ -12,18 +12,17 @@ import time -#if set to Ture prints additional debug information -DEBUG = True - - def get_conf_to_update_model(output_path): + ''' + Set the Config class variables necessary to update model tensors + ''' con = Config(init_new_entities=True) ckpt = None with open(output_path + "/checkpoint", 'r') as f: first_line = f.readline() ckpt = first_line.split(':')[1].strip().replace('"', '') - if DEBUG: print("Checkpoint file is: " + ckpt) + print("Checkpoint file is: " + ckpt) con.import_model(ckpt) @@ -31,6 +30,9 @@ def get_conf_to_update_model(output_path): def get_conf(argv=None): + ''' + Set the Config class using the program args + ''' if argv == None: argv = sys.argv con = Config(cpp_lib_path=argv.cpp_lib_path) @@ -56,19 +58,22 @@ def get_conf(argv=None): con.set_opt_method(argv.optimizer) con.init() - if argv.model.lower() == "transe": - con.set_model(TransE) - elif argv.model.lower() == "transh": + if argv.model.lower() == "transh": con.set_model(TransH) elif argv.model.lower() == "transr": con.set_model(TransR) - else: + elif argv.model.lower() == "transd": con.set_model(TransD) + else: + con.set_model(TransE) return con def create_model(con): + ''' + create the model using the Config parameters + ''' with tf.variable_scope("", reuse=None, initializer = tf.contrib.layers.xavier_initializer(uniform = True)): trainModel = con.model(config = con) @@ -85,6 +90,7 @@ def create_model(con): with tf.name_scope("predict"): trainModel.predict_def() + #allowed Optimization algorithms are Adam and SGD if con.opt_method == "Adam" or con.opt_method == "adam": optimizer = tf.train.AdamOptimizer(con.alpha) else: @@ -99,10 +105,13 @@ def create_model(con): def get_last_step(): + ''' + :return: last global step; 0 if is the first batch + ''' last_global_step = 0 try: if os.path.isfile(sys.argv.output_path+"/checkpoint"): - if DEBUG: + if sys.argv.debug: print("Checkpoint file founded") print("Reading last global step...") @@ -111,9 +120,9 @@ def get_last_step(): last = int(line[len(line)-1].split("-")[1].strip()) last_global_step = last - if DEBUG: print("Last global step: " + str(last_global_step)) + if sys.argv.debug: print("Last global step: " + str(last_global_step)) else: - if DEBUG: print("Checkpoint file not founded") + if sys.argv.debug: print("Checkpoint file not founded") except Exception as e: print("Error occured during last global step reading:") print(e) @@ -124,59 +133,62 @@ def get_last_step(): def main_fun(argv, ctx): + ''' + Continue training on already seen training set / Start training on new batch + If the new batch contains new entities, model tensors which depends from entity size are updated accordingly + :param argv: + :param ctx: + ''' job_name = ctx.job_name task_index = ctx.task_index sys.argv = argv - if DEBUG: print("Starting cluster and server...") + if sys.argv.debug: print("Starting cluster and server...") cluster, server = TFNode.start_cluster_server(ctx, num_gpus=argv.num_gpus, rdma=False) - if DEBUG: print("Cluster and server started") + if sys.argv.debug: print("Cluster and server started") if job_name == "ps": + #parameter server print("PS: joining...") server.join() - if DEBUG: print("PS: join finished") + if sys.argv.debug: print("PS: join finished") + elif job_name == "worker": + #worker print("WORKER: training...") - - #Online learning last_global_step = get_last_step() - - - #set config - if DEBUG: print("Creating conf...") con = get_conf() - + if sys.argv.debug: print("Creating model...") with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % task_index, cluster=cluster, ps_strategy=GreedyLoadBalancingStrategy(num_tasks=argv.num_ps, load_fn=byte_size_load_fn))): - - if DEBUG: print("Creating model...") trainModel, global_step, train_op, init_op, saver, summary_op = create_model(con) - - - if DEBUG: print("Creating Hooks, Scaffold, FileWriter...") - + if sys.argv.debug: print("Creating Hooks, Scaffold, FileWriter, ConfigProto...") iterations = con.train_times * con.nbatches + last_global_step - hooks=[tf.train.StopAtStepHook(last_step=iterations)] scaffold = tf.train.Scaffold(init_op=init_op, saver=saver, summary_op=summary_op) tf.summary.FileWriter("tensorboard_%d" % ctx.worker_num, graph=tf.get_default_graph()) + config_monitored = tf.ConfigProto() - - print("Num gpus: " + str(argv.num_gpus)) - gpu_options = tf.GPUOptions(allow_growth = True, visible_device_list = "0") - config_monitored = tf.ConfigProto(gpu_options=gpu_options) + if argv.num_gpus > 0: + if sys.argv.debug: print("Setting GPU options...") + visible_device_list = '' + try: + visible_device_list = os.environ["CUDA_VISIBLE_DEVICES"] + except KeyError: + visible_device_list = '0' + gpu_options = tf.GPUOptions(allow_growth = True, visible_device_list = visible_device_list) + config_monitored = tf.ConfigProto(gpu_options=gpu_options) - if DEBUG: print("Starting MonitoredTrainingSession...") + if sys.argv.debug: print("Starting MonitoredTrainingSession...") sess = tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0), scaffold=scaffold, @@ -187,7 +199,7 @@ def main_fun(argv, ctx): hooks=hooks, summary_dir="tensorboard_%d" % ctx.worker_num ) - if DEBUG: + if sys.argv.debug: print("Monitoring training sessions started") print("Task index is: {}".format(task_index)) @@ -250,7 +262,7 @@ def main_fun(argv, ctx): con.lib.test_triple_classification(con.relThresh_addr, res_pos.__array_interface__['data'][0], res_neg.__array_interface__['data'][0], con.acc_addr) acc = con.acc[0] - if DEBUG: + if sys.argv.debug: print("\n[ Early Stop Check (Accuracy) ]") print("Best Accuracy = %.10f" %(best_acc)) print("Accuracy after run = %.10f" %(acc)) @@ -258,12 +270,12 @@ def main_fun(argv, ctx): if acc > best_acc: best_acc = acc wait_steps_acc = 0 - if DEBUG: print("New best Accuracy founded. Wait steps reset.") + if sys.argv.debug: print("New best Accuracy founded. Wait steps reset.") best_model_global_step_acc = g elif wait_steps_acc < patience: wait_steps_acc += 1 - if DEBUG: print("Wait steps Accuracy incremented: {}\n".format(wait_steps_acc)) + if sys.argv.debug: print("Wait steps Accuracy incremented: {}\n".format(wait_steps_acc)) if wait_steps_acc >= patience: @@ -273,7 +285,7 @@ def main_fun(argv, ctx): break ################## LOSS ################## - if DEBUG: + if sys.argv.debug: print("\n[ Early Stop Checking (Loss) ]") print("Best loss = %.10f" %(best_loss)) print("Loss after run = %.10f" %(loss)) @@ -281,12 +293,12 @@ def main_fun(argv, ctx): if loss < best_loss: best_loss = loss wait_steps_loss = 0 - if DEBUG: print("New best loss founded. Wait steps reset.") + if sys.argv.debug: print("New best loss founded. Wait steps reset.") best_model_global_step_loss = g elif wait_steps_loss < patience: wait_steps_loss += 1 - if DEBUG: print("Wait steps loss incremented: {}\n".format(wait_steps_loss)) + if sys.argv.debug: print("Wait steps loss incremented: {}\n".format(wait_steps_loss)) if wait_steps_loss >= patience: diff --git a/main_spark.py b/main_spark.py index 85b61fd..611e6fc 100644 --- a/main_spark.py +++ b/main_spark.py @@ -10,9 +10,6 @@ import time from os import path - -DEBUG = distribute_training.DEBUG - NEW_BATCH_TRIPLES_FILE_NAME = 'batch2id.txt' NEW_BATCH_ENTITIES_FILE_NAME = 'batchEntity2id.txt' NEW_BATCH_TEST_FILE_NAME = 'batchTest2id.txt' @@ -23,10 +20,17 @@ TEST_FILE_NAME = 'test2id.txt' VALID_FILE_NAME = 'valid2id.txt' -ENTITY_EMBEDDING_TENSOR_NAME = 'ent_embeddings' +#TODO: retrieve these from models methods +#tensors which depends from entities dimension +ENTITY_EMBEDDING_TENSOR_NAME = 'ent_embeddings' #from TransE, TransH, TransR, TransD +ENTITY_TRANSFER_TENSOR_NAME = 'ent_transfer' #from TransD def update_entities_and_model(): + ''' + Update the tensor variables if new entites are introduced in the new batch + ''' + n_entities = 0 n_new_entities = 0 final_entity_size = 0 @@ -46,7 +50,7 @@ def update_entities_and_model(): entity_lines = f.readlines() n_entities = int(entity_lines[0]) final_entity_size = n_entities + n_new_entities - if DEBUG: print("Number of new entities in batch: " + str(n_new_entities)) + if sys.argv.debug: print("Number of new entities in batch: " + str(n_new_entities)) ######### UPDATE THE MODEL ######### @@ -54,16 +58,16 @@ def update_entities_and_model(): con, ckpt = distribute_training.get_conf_to_update_model(sys.argv.output_path) vars = [] - if DEBUG: print("\nGLOBAL VARS FOUNDED IN CHECKPOINT:\n") + if sys.argv.debug: print("\nGLOBAL VARS FOUNDED IN CHECKPOINT:\n") with con.graph.as_default(): with con.sess.as_default(): for v in tf.global_variables(): - if DEBUG: print(str(v.name) + " " + str(v.shape)) + if sys.argv.debug: print(str(v.name) + " " + str(v.shape)) vars.append(v) - if DEBUG: print('\n') + if sys.argv.debug: print('\n') - if DEBUG: print("NEW GLOBAL VARIABLES") + if sys.argv.debug: print("NEW GLOBAL VARIABLES") graph = tf.Graph() with graph.as_default(): sess = tf.Session() @@ -71,13 +75,13 @@ def update_entities_and_model(): for v in vars: current_name = v.name.split(':')[0] - if current_name == ENTITY_EMBEDDING_TENSOR_NAME: + if current_name == ENTITY_EMBEDDING_TENSOR_NAME or current_name == ENTITY_TRANSFER_TENSOR_NAME: tmp = tf.get_variable(name=current_name, shape=[final_entity_size, v.shape[1]], initializer=tf.contrib.layers.xavier_initializer(uniform = False), dtype=v.dtype) sess.run(tf.initialize_variables([tmp])) tmp_value = con.sess.run(v) sess.run(tf.scatter_update(tmp, [i for i in range(0, n_entities)], tmp_value)) - elif current_name in [ENTITY_EMBEDDING_TENSOR_NAME+'/Adam', ENTITY_EMBEDDING_TENSOR_NAME+'/Adam_1']: + elif current_name in [ENTITY_EMBEDDING_TENSOR_NAME+'/Adam', ENTITY_EMBEDDING_TENSOR_NAME+'/Adam_1', ENTITY_TRANSFER_TENSOR_NAME+'/Adam', ENTITY_TRANSFER_TENSOR_NAME+'/Adam_1']: tmp = tf.get_variable(name=current_name, shape=[final_entity_size, v.shape[1]], initializer=tf.zeros_initializer(), dtype=v.dtype) sess.run(tf.initialize_variables([tmp])) tmp_value = con.sess.run(v) @@ -103,15 +107,21 @@ def update_entities_and_model(): #update entities: append the new entities at the end of the file entity_lines = entity_lines + batch_entities - #update entity2id + #update entity2id.txt with open(sys.argv.input_path+ENTITIES_FILE_NAME, "w") as f: f.writelines(entity_lines) - if DEBUG: print("Entity file updated") + if sys.argv.debug: print("Entity file updated") return n_new_entities, final_entity_size def update_triples(file_name_to_update, file_name_batch): + ''' + Update file_name_to_update by appending the triples contained in file_name_batch + The first line of file_name_to_update (i.e. the number of triples) is updated accordingly + :param file_name_to_update: + :param file_name_batch: + ''' batch_triples_size = 0 batch_triples = [] @@ -137,23 +147,26 @@ def update_triples(file_name_to_update, file_name_batch): #update file with open(sys.argv.input_path + file_name_to_update, "w") as f: f.writelines(lines) - if DEBUG: print("File updated") + if sys.argv.debug: print("File updated") def feed_batch(): + ''' + Update files containing entities / relations / triples with data contained in new batch + ''' current_time = time.time() - if DEBUG: print("New batch file founded") + if sys.argv.debug: print("New batch file founded") try: - if DEBUG: print("Updating "+ENTITIES_FILE_NAME+" and model tensors...") + if sys.argv.debug: print("Updating "+ENTITIES_FILE_NAME+" and model tensors...") update_entities_and_model() - if DEBUG: print("Updating "+TRIPLES_FILE_NAME+"...") + if sys.argv.debug: print("Updating "+TRIPLES_FILE_NAME+"...") update_triples(TRIPLES_FILE_NAME, NEW_BATCH_TRIPLES_FILE_NAME) - if DEBUG: print("Updating "+TEST_FILE_NAME+"...") + if sys.argv.debug: print("Updating "+TEST_FILE_NAME+"...") update_triples(TEST_FILE_NAME, NEW_BATCH_TEST_FILE_NAME) - if DEBUG: print("Updating "+VALID_FILE_NAME+"...") + if sys.argv.debug: print("Updating "+VALID_FILE_NAME+"...") update_triples(VALID_FILE_NAME, NEW_BATCH_VALID_FILE_NAME) elapsed_time = time.time() - current_time @@ -168,6 +181,9 @@ def feed_batch(): def is_new_batch(): + ''' + Return True if there is a new batch to train + ''' return os.path.isfile(sys.argv.input_path+NEW_BATCH_TRIPLES_FILE_NAME) and \ os.path.isfile(sys.argv.input_path+NEW_BATCH_ENTITIES_FILE_NAME) and \ os.path.isfile(sys.argv.input_path+NEW_BATCH_TEST_FILE_NAME) and \ @@ -175,6 +191,9 @@ def is_new_batch(): def remove_batch_files(): + ''' + Remove 4 batch files + ''' os.remove(sys.argv.input_path+NEW_BATCH_TRIPLES_FILE_NAME) os.remove(sys.argv.input_path+NEW_BATCH_ENTITIES_FILE_NAME) os.remove(sys.argv.input_path+NEW_BATCH_TEST_FILE_NAME) @@ -182,6 +201,9 @@ def remove_batch_files(): def n_n(): + ''' + Generate type constrain file + ''' lef = {} rig = {} rellef = {} @@ -262,11 +284,10 @@ def n_n(): if __name__ == '__main__': - if DEBUG: print("Creating Spark Context...") + print("Creating Spark Context...") sc = SparkContext(conf=SparkConf().setAppName('OpenKEonSpark')) - - if DEBUG: print("Parsing arguments...") + print("Parsing arguments...") parser = argparse.ArgumentParser() parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=int(sc._conf.get("spark.executor.instances"))) parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1) @@ -289,6 +310,7 @@ def n_n(): parser.add_argument("--early_stop_stopping_step", help="perfrom early stop each stopping step", type=int, default=1) parser.add_argument("--early_stop_start_step", help="perfrom early stop from start step", type=int, default=1) parser.add_argument("--model", help="model to be used", type=str, default="TransE") + parser.add_argument("--debug", help="if Ture prints additional debug information", type=bool, default=True) (args, remainder) = parser.parse_known_args() @@ -296,46 +318,47 @@ def n_n(): print("===== num_executors={}, num_workers={}, num_ps={}".format(args.cluster_size, num_workers, args.num_ps)) - if DEBUG: print("Setting batch files if present...") + if args.debug: print("Setting batch files if present...") sys.argv = args if is_new_batch(): feed_batch() - if DEBUG: print("Generating type files...") + if args.debug: print("Generating type files...") n_n() - if DEBUG: print("Removing stop file...") + if args.debug: print("Removing stop file...") try: os.remove(args.output_path+"/stop.txt") except: pass - if DEBUG: print("Creating cluster...") + if args.debug: print("Creating cluster...") training_time = time.time() + #perform training on new batch / continute training on old training set cluster = TFCluster.run(sc, distribute_training.main_fun, args, args.cluster_size, args.num_ps, True, TFCluster.InputMode.TENSORFLOW) - if DEBUG: print("Shutdowning cluster...") + if args.debug: print("Shutdowning cluster...") cluster.shutdown() training_time = time.time() - training_time - if DEBUG: print("Removing batch files if present...") + if args.debug: print("Removing batch files if present...") if is_new_batch(): remove_batch_files() - if DEBUG: print("Printing time information on files...") + if args.debug: print("Printing time information on files...") with open(args.output_path+'/time.txt', 'w') as f: f.write("Training time: " + str(training_time) + "\n") - if DEBUG: print("Restoring the best model founded during training...") + if args.debug: print("Restoring the best model founded during training...") if path.exists(args.output_path+"/stop.txt"): step = None with open(args.output_path+"/stop.txt", "r") as f: @@ -365,5 +388,5 @@ def n_n(): os.remove(args.output_path+"/"+f) - if DEBUG: print("Training finished") + if args.debug: print("Training finished") diff --git a/release/Base.so b/release/Base.so index 5bd94ee..2b8e904 100755 Binary files a/release/Base.so and b/release/Base.so differ diff --git a/res_spark/dummy.txt b/res_spark/dummy.txt new file mode 100644 index 0000000..e69de29 diff --git a/run_dbpedia.sh b/run_dbpedia.sh index f93637e..ca7faec 100644 --- a/run_dbpedia.sh +++ b/run_dbpedia.sh @@ -5,7 +5,7 @@ echo "$3" echo "$4" echo "====================================== Clearning res_spark directory ======================================" -rm /home/luigi/IdeaProjects/OpenKE_new_Spark/res_spark/* +rm $WORK_DIR_PREFIX/res_spark/* echo "====================================== Stopping Spark Master & slaves ======================================" $SPARK_HOME/sbin/stop-slave.sh @@ -30,6 +30,15 @@ do continue fi + if [ -f /content/drive/My\ Drive/DBpedia/$n/$i/model/checkpoint ]; then + echo "====================================== Test for batch $i ======================================" + if [ $i -eq $m ]; then + python3 $WORK_DIR_PREFIX/test.py $i $n $2 $3 1 | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt + else + python3 $WORK_DIR_PREFIX/test.py $i $n $2 $3 0 | tee /content/drive/My\ Drive/DBpedia/$n/$i/res.txt + fi + fi + if [ $i != 0 ]; then k=$((i-1)) diff --git a/test.py b/test.py index 46c02b2..a16d051 100644 --- a/test.py +++ b/test.py @@ -31,19 +31,35 @@ def get_ckpt(p): con = Config(cpp_lib_path='/content/OpenKEonSpark/release/Base.so') con.set_in_path(dataset_path) -con.set_test_link_prediction(bool(lp)) +con.set_test_link_prediction(bool(int(lp))) con.set_test_triple_classification(True) con.set_dimension(int(dim)) con.init() if model.lower() == "transe": con.set_model_and_session(TransE) + if (int(dim) <= 32): con.set_n_threads_LP(10) + if (int(dim) > 32 and int(dim) <= 64): con.set_n_threads_LP(10) + if (int(dim) > 64 and int(dim) <= 128): con.set_n_threads_LP(5) + if (int(dim) > 128 and int(dim) <= 256): con.set_n_threads_LP(5) elif model.lower() == "transh": con.set_model_and_session(TransH) + if (int(dim) <= 32): con.set_n_threads_LP(10) + if (int(dim) > 32 and int(dim) <= 64): con.set_n_threads_LP(7) + if (int(dim) > 64 and int(dim) <= 128): con.set_n_threads_LP(5) + if (int(dim) > 128 and int(dim) <= 256): con.set_n_threads_LP(5) elif model.lower() == "transr": con.set_model_and_session(TransR) + if (int(dim) <= 32): con.set_n_threads_LP(7) + if (int(dim) > 32 and int(dim) <= 64): con.set_n_threads_LP(5) + if (int(dim) > 64 and int(dim) <= 128): con.set_n_threads_LP(4) + if (int(dim) > 128 and int(dim) <= 256): con.set_n_threads_LP(3) else: con.set_model_and_session(TransD) + if (int(dim) <= 32): con.set_n_threads_LP(7) + if (int(dim) > 32 and int(dim) <= 64): con.set_n_threads_LP(5) + if (int(dim) > 64 and int(dim) <= 128): con.set_n_threads_LP(4) + if (int(dim) > 128 and int(dim) <= 256): con.set_n_threads_LP(3) con.set_import_files(path+ckpt) con.set_test_log_path(path) diff --git a/test_1.py b/test_1.py index ee865e6..0abee2d 100644 --- a/test_1.py +++ b/test_1.py @@ -1,9 +1,11 @@ from Config import Config from TransE import TransE from TransH import TransH +from TransD import TransD +from TransR import TransR import sys # import os - +# os.environ["CUDA_VISIBLE_DEVICES"]="-1" def get_ckpt(p): ckpt = None @@ -15,7 +17,7 @@ def get_ckpt(p): #/home/luigi/IdeaProjects/OpenKE_new_Spark/benchmarks/DBpedia -dataset_path = '/home/luigi/files/stuff/Done/DBpedia/5/0/' +dataset_path = '/home/luigi/files/stuff/DBpedia/5/0/' # dataset_path = '/home/luigi/files/stuff/superuser/9/1/' path = dataset_path + 'model/' # path = '/home/luigi/IdeaProjects/OpenKEonSpark/res_spark/' @@ -29,10 +31,10 @@ def get_ckpt(p): con.set_test_triple_classification(True) con.set_dimension(int(64)) con.init() -con.set_model_and_session(TransE) +con.set_model_and_session(TransD) con.set_import_files(path+ckpt) con.set_test_log_path(path) -con.set_n_threads_LP(1) +con.set_n_threads_LP(5) con.test() con.predict_tail_entity(349585, 5, 10)