This repository has been archived by the owner on Mar 22, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathr0701014.py
1216 lines (1093 loc) · 51.4 KB
/
r0701014.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import csv
import ctypes
from functools import partial
from multiprocessing import Pool, RawArray
from typing import Tuple
import numpy as np
import Reporter
# Dictionary to store the location of the arrays used by the parallel processes
var_dict: dict = {}
def distance_function_parallel(perm2: np.array, perm1: np.array) -> float:
"""
Computes the distance between two permutations by counting the edges of the
first permutation that are not in the second permutation.
:param perm1: The first permutation
:param perm2: The second permutation
:return: The distance between the two permutations
"""
tour_size = var_dict["tour_size"]
distance = 0
for i in range(tour_size - 1):
element = perm1[i]
x = np.where(perm2 == element)[0][0]
y = 0 if x == tour_size - 1 else x + 1
if perm1[i + 1] != perm2[y]:
distance += 1
element = perm1[-1]
x = np.where(perm2 == element)[0][0]
if perm1[0] != perm2[(x + 1) % tour_size]:
distance += 1
return distance
def parallel_3_opt(individual: np.array) -> np.array:
"""
This function performs local search on the individual. It can be performed in parallel by using the RawArrays from
the multiprocessing library. This array cannot be locked. The location of the array can be accessed from the global
dictionary var_dict.
:param individual: The individual to perform local search on.
:return: The (improved) individual
"""
tour_size = var_dict["tour_size"]
nearest_neighbors = np.frombuffer(
var_dict["nearest_neighbors"], dtype=np.int
).reshape(tour_size, 15)
individual = np.roll(individual, -np.random.randint(tour_size))
for point1 in range(tour_size):
v1 = individual[0]
v2 = individual[point1 - 1]
v3 = individual[point1]
v6 = individual[-1]
for i, neighbor in enumerate(nearest_neighbors[v2]):
point2 = np.where(individual == neighbor)[0][0]
if point2 > point1:
v4 = individual[point2 - 1]
v5 = individual[point2]
individual = check_for_3_opt_move(
individual, point1, point2, v1, v2, v3, v4, v5, v6
)
return individual
def check_for_3_opt_move(
individual: np.array,
point1: int,
point2: int,
v1: int,
v2: int,
v3: int,
v4: int,
v5: int,
v6: int,
):
"""
This function checks for a possible 3 opt move for the arcs between endpoints v1-v2, v3-v4 and v5-v6.
:param individual: The individual to try the 3 opt move on
:param point1: The first crossing point, the point between v2 and v3
:param point2: The second crossing point, the point between v4 and v5
:param v1: The beginning of the first arc
:param v2: The ending of the first arc
:param v3: The beginning of the second arc
:param v4: The ending of the second arc
:param v5: The beginning of the third arc
:param v6: The ending of the third arc
:return: The individual with the 3 opt move applied (if it is better than not)
"""
tour_size = var_dict["tour_size"]
distance_matrix = np.frombuffer(
var_dict["distance_matrix"], dtype=np.float64
).reshape(tour_size, tour_size)
old_distance = (
distance_matrix[v2][v3] + distance_matrix[v4][v5] + distance_matrix[v6][v1]
)
new_distance = (
distance_matrix[v2][v5] + distance_matrix[v6][v3] + distance_matrix[v4][v1]
)
if new_distance < old_distance:
a = individual[:point1]
b = individual[point1:point2]
c = individual[point2:]
individual = np.concatenate([a, c, b])
return individual
def alias_setup(probabilities):
"""
Sets up the tables used in the alias method
:param probabilities: The probabilities of all the discrete variables to sample from
:return: Two tables to be used in the alias_draw method
"""
k = len(probabilities)
q = np.zeros(k)
j = np.zeros(k, dtype=np.int)
# Sort the data into the outcomes with probabilities
smaller = []
larger = []
for kk, prob in enumerate(probabilities):
q[kk] = k * prob
if q[kk] < 1:
smaller.append(kk)
else:
larger.append(kk)
# Loop through and create little binary mixtures that
# appropriately allocate the larger outcomes over the
# overall uniform mixture.
while len(smaller) > 0 and len(larger) > 0:
small = smaller.pop()
large = larger.pop()
j[small] = large
q[large] = q[large] - (1.0 - q[small])
if q[large] < 1.0:
smaller.append(large)
else:
larger.append(large)
return j, q
def alias_draw(j, q):
"""
Draw a random item from the discrete probabilities in constant time.
:param j: The j matrix from the alias_setup method.
:param q: The q matrix from the alias_setup method.
:return: An index from the discrete distribution where for the selection from it, the probabilities are taken into
account.
"""
k = len(j)
# Draw from the overall uniform mixture
kk = int(np.floor(np.random.rand() * k))
if np.random.rand() < q[kk]:
return kk
else:
return j[kk]
class r0701014:
def __init__(self) -> None:
self.reporter = Reporter.Reporter(
self.__class__.__name__
) # The reporter for the results
self.distance_matrix: np.array = None # Distance matrix
self.tour_size: int = 0 # Number of cities in a tour
self.generation: int = 0 # Current generation
self.nearest_neighbors = None # List with the nearest neighbors for each node, used in the 3-opt local search
# EA parameters
self.population_size: int = 16 # Number of individuals in population
self.offspring_size: int = 50 # Number of children created per generation
self.k: int = 3 # The k used in k-tournament selection
self.selection_pressure: float = 0.0 # The selection pressure used
self.selection_pressure_decay = (
0.0 # The factor for the decay of the selection pressure in geometric decay
)
self.alpha: float = 0.15 # The mutation rate
self.rcl: float = (
0.1 # Fraction that a solution can be longer than the greedy solution
)
self.number_of_nearest_neighbors: int = (
15 # Number of NN used in the 3 opt local search
)
self.sigma: int = (
0 # Sigma used in the fitness sharing, will be set after tour_size is known
)
self.percentage_local_search: float = (
0.5 # Percentage of individuals to perform local search on
)
# Arrays for parallel execution
self.raw_distance_matrix = (
None # RawArray to be shared between parallel processes
)
self.raw_nearest_neighbors = (
None # RawArray to be shared between parallel processes
)
# EA options
self.use_random_initialization: bool = (
False # Use a random initialization instead of heuristic methods
)
self.local_search_on_all: bool = (
False # Perform local search on the entire population
)
self.use_multiple_mutation_operators: bool = (
False # Combine different mutation operators
)
# EA functions
self.selection_function = (
self.selection_roulette_wheel
) # Selection function to use
self.recombination_function = (
self.order_crossover
) # Recombination function to use
self.mutation_function = (
self.reverse_sequence_mutation
) # Mutation function to use
self.elimination_function = (
self.lambda_and_mu_elimination
) # Elimination function to use
self.local_search_operator = (
self.local_search_optimized_3_opt
) # Local search operator to use
# EA scores
self.mean_objective: float = (
np.inf
) # Mean objective value of the current generation
self.best_objective: float = (
np.inf
) # Best objective value of the current generation
self.best_solution: np.array = None # Best solution of the current generation
self.last_mean_objective: float = (
0 # Mean objective value of the previous generation
)
self.last_best_objective: float = (
0 # Best objective value of the previous generation
)
self.same_best_objective: int = (
0 # Streak where last best objective == current best objective
)
self.time_left = 300
self.set_selection_pressure() # Depending on the selection function, the selection pressure will be different
def optimize(self, filename: str):
"""
The main loop of the genetic algorithm.
:param filename: The filename of the tour for which a candidate solution needs to be found.
"""
# Read the distance matrix from file
data = np.loadtxt(filename, delimiter=",")
self.tour_size = data.shape[0]
self.raw_distance_matrix = RawArray(ctypes.c_double, self.tour_size * self.tour_size)
self.distance_matrix = np.frombuffer(self.raw_distance_matrix, dtype=np.float64) \
.reshape(self.tour_size, self.tour_size)
np.copyto(self.distance_matrix, data)
self.build_nearest_neighbor_list()
self.init_dictionary()
population = self.initialize_population()
while True:
offspring = self.recombination(population)
mutated_population = self.mutation(population, offspring)
optimized_population = self.local_search_parallel(mutated_population)
population, scores = self.elimination(optimized_population)
population, scores = self.eliminate_duplicate_individuals(population, scores)
population, scores = self.elitism(population, scores)
if self.same_best_objective % 15 == 0 and self.same_best_objective != 0:
self.selection_pressure = 0.8
population = self.random_nearest_insertion()
population[0] = self.best_solution
scores = self.length(population)
if self.mean_objective != np.inf:
self.alpha = (0.2 * self.mean_objective / (self.best_objective + self.mean_objective))
self.update_scores(population[0], scores)
time_left = self.reporter.report(
self.mean_objective, self.best_objective, self.best_solution
)
self.time_left = time_left
if time_left < 0:
break
return 0
####################
# INITIALIZATION #
####################
def initialize_population(self) -> np.array:
"""
:return: Returns an initial population for the evolutionary algorithm to use. The population is a numpy array
of dimensions population_size by tour_size. Each row in the array represents an individual in the population.
Each individual is a numpy array as well and contains a random permutation of the numbers up to the tour_size.
This permutation represents the order in which the individual visits the cities.
"""
if not self.use_random_initialization:
if self.tour_size > self.population_size:
# Create random heuristic solutions
population = self.all_nearest_neighbors()
else:
# Create all heuristic solutions and randomly generate the rest
heuristic_population = self.all_nearest_neighbors()
random_population = self.random_population(
self.population_size - self.tour_size
)
population = np.concatenate([heuristic_population, random_population])
else:
# Completely random population
population = self.random_population(self.population_size)
return population
def all_nearest_neighbors(self) -> np.array:
"""
Creates an initial population using a greedy algorithm starting from every possible starting point.
:return: The population of size self.tour_size in case that the tour size is smaller than the population size,
otherwise the population has size population_size.
"""
if self.population_size < self.tour_size:
# Less individuals than the length of the tour
nn = np.zeros([self.population_size, self.tour_size], dtype=np.int)
random_tours = np.random.choice(
np.arange(self.tour_size), self.population_size, replace=False
)
minimal_value_index = np.argwhere(
self.distance_matrix
== np.min(self.distance_matrix[self.distance_matrix != 0])
)[0][0]
if minimal_value_index not in random_tours:
random_tours[0] = minimal_value_index
for i, x in enumerate(random_tours):
nn[i] = self.make_greedy_tour(x)
return nn
else:
# Create all heuristic individuals and fill the rest with random individuals
nn = np.zeros([self.tour_size, self.tour_size], dtype=np.int)
for i in range(self.tour_size):
new_tour = self.make_greedy_tour(i)
k = 1
while self.length_individual(new_tour) == np.inf:
new_tour = self.make_greedy_tour(i + k)
k += 1
nn[i] = new_tour
return nn
def make_greedy_tour(self, i: int) -> np.array:
"""
The greedy algorithm to create a tour given a starting point i. The algorithm visits the closest city that has
not yet been visited.
:param i: The starting point.
:return: The tour created by the greedy algorithm.
"""
individual = np.zeros(self.tour_size, dtype=np.int)
individual[0] = i
not_used = set(range(self.tour_size))
not_used.remove(i)
for j in range(1, self.tour_size, 1):
minimum_value = np.min(self.distance_matrix[i][list(not_used)])
nearest_city = np.where(self.distance_matrix[i] == minimum_value)
for city in nearest_city[0]:
try:
not_used.remove(city)
individual[j] = city
i = city
break
except KeyError:
# The city is already used, because there are multiple minimal values
pass
return individual
def random_population(self, n: int) -> np.array:
"""
Creates a random population of size n
:param n: The size of the population to generate.
:return: The generated population.
"""
population = np.empty([n, self.tour_size], dtype=np.int32)
for i in range(n):
individual = np.arange(self.tour_size)
np.random.shuffle(individual)
population[i] = individual
return population
def random_nearest_insertion(self) -> np.array:
random_population = self.random_population(self.population_size)
new_population = np.empty([self.population_size, self.tour_size], dtype=np.int32)
for i in range(self.population_size):
new_individual = np.array([], dtype=np.int32)
for j in random_population[i]:
new_individual = self.nearest_insertion(new_individual, j)
new_population[i] = new_individual
return new_population
def nearest_insertion(self, individual, n):
if individual.size < 2:
return np.append(individual, n)
else:
shortest_length = self.distance_matrix[individual[0]][n] + self.distance_matrix[n][individual[1]]
shortest_index = 1
for i in range(2, individual.size):
new_length = self.distance_matrix[individual[i - 1]][n] + self.distance_matrix[n][
individual[i % self.tour_size]]
if new_length < shortest_length:
shortest_length = new_length
shortest_index = i
return np.insert(individual, shortest_index, n)
###############
# SELECTION #
###############
def selection_k_tournament(self, population: np.array, n: int = 1, replace: bool = True) -> np.array:
"""
Performs a k-tournament selection on the population supplied. The k value used is the one provided in self.k.
:param replace: True if the sampling should happen with replacement, this is the standard value.
:param population: The population to select individuals from.
:param n: The number of individuals to select
:return: Returns n individuals in a numpy array
"""
selection = np.empty((n, self.tour_size), dtype=np.int)
for i in range(n):
individuals = population[
np.random.choice(self.population_size, self.k, replace=replace), :
]
objective_values = self.length(individuals)
perm = np.argsort(objective_values)
selection[i] = individuals[perm[0]]
return selection
def selection_roulette_wheel(self, population: np.array, n: int = 1) -> np.array:
"""
Roulette wheel selection (fitness proportionate selection) using the alias method to draw an index in constant
time.
https://lips.cs.princeton.edu/the-alias-method-efficient-sampling-with-many-discrete-outcomes/
https://www.keithschwarz.com/darts-dice-coins/
:param population: The population ordered by the fitness of the individuals
:param n: The number of individuals to select
:return: The selected individuals in an array
"""
self.selection_pressure *= self.selection_pressure_decay
a = np.log(self.selection_pressure) / (self.population_size - 1)
probabilities = np.exp(a * (np.arange(1, self.population_size + 1) - 1))
probabilities /= np.sum(probabilities)
j, q = alias_setup(probabilities)
selection = np.zeros([n, self.tour_size], dtype=np.int)
for i in range(n):
selection[i] = population[int(alias_draw(j, q))]
return selection
def selection_rank_linear(self, population: np.array, n: int = 1) -> np.array:
"""
Selection operator used to select n individuals from the population. This operator uses rank based selection
with linear decay and constant selection pressure.
:param population: The population to select the individuals from.
:param n: The number of individuals to select.
:return: The selected individuals.
"""
total_rank = (
self.population_size * (self.population_size + 1) / 2
) # Sum 1..N = N * (N-1) / 2
probabilities = np.arange(1, self.population_size + 1)[::-1] / total_rank
return population[
np.random.choice(
self.population_size, size=n, replace=True, p=probabilities
),
:,
]
###################
# RECOMBINATION #
###################
def recombination(self, population: np.array) -> np.array:
"""
The method selects two parents for the recombination operator using the selection method specified in
self.selection_function and supplies the selected parents to the recombination operator specified in
self.recombination_function.
:param population: The population to select the parents from.
:return: Returns the offspring created by the recombination operator. The number of offspring created is
specified by self.offspring_size.
"""
offspring = np.empty([self.offspring_size, self.tour_size], dtype=np.int)
if self.recombination_function != self.sequential_constructive_crossover:
# Produce two children per two parents
selection = self.selection_function(population, self.offspring_size)
for i in range(0, self.offspring_size, 2):
offspring[i], offspring[i + 1] = self.recombination_function(
[selection[i], selection[i + 1]]
)
else:
# Produce one child per two parents
selection = self.selection_function(population, self.offspring_size * 2)
for i in range(self.offspring_size):
offspring[i] = self.recombination_function(
[selection[i], selection[i + 1]]
)
return offspring
def pmx(self, individuals):
"""
Perform partially mapped crossover (PMX) on the parents. This chooses two crossover points at random and copies
the partial tour between these points to each of the children. The rest of the nodes are then mapped according
to the structure of both parents to guarantee no duplicate nodes. This algorithm is described in the book of
Eiben and Smith.
:param individuals: The two parents to apply the crossover on
:return: The two children created from the parents.
"""
parent1 = individuals[0]
parent2 = individuals[1]
child1 = parent1.copy()
child2 = parent2.copy()
p1 = np.zeros(self.tour_size, dtype=np.int)
p2 = np.zeros(self.tour_size, dtype=np.int)
# Initialize the position of each indices in the individuals
for i in range(self.tour_size):
p1[child1[i]] = i
p2[child2[i]] = i
# Choose crossover points
point1 = np.random.randint(0, self.tour_size)
point2 = np.random.randint(0, self.tour_size - 1)
if point2 >= point1:
point2 += 1
else: # Swap the two cx points
point1, point2 = point2, point1
# Apply crossover between cx points
for i in range(point1, point2):
# Keep track of the selected values
temp1 = child1[i]
temp2 = child2[i]
# Swap the matched value
child1[i], child1[p1[temp2]] = temp2, temp1
child2[i], child2[p2[temp1]] = temp1, temp2
# Position bookkeeping
p1[temp1], p1[temp2] = p1[temp2], p1[temp1]
p2[temp1], p2[temp2] = p2[temp2], p2[temp1]
return child1, child2
def order_crossover(self, individuals: np.array) -> np.array:
"""
Order crossover (OX) proposed by Davis.
https://www.hindawi.com/journals/cin/2017/7430125/
:param individuals: The two parents to recombine.
:return: The children created from the two parents.
"""
parent1 = individuals[0]
parent2 = individuals[1]
child1 = np.zeros(self.tour_size)
child2 = np.zeros(self.tour_size)
cx1, cx2 = np.sort(np.random.randint(self.tour_size, size=2))
child1[cx1:cx2] = parent1[cx1:cx2]
child2[cx1:cx2] = parent2[cx1:cx2]
used_in_c1 = set(parent1[cx1:cx2])
used_in_c2 = set(parent2[cx1:cx2])
sequence_parent1 = []
sequence_parent2 = []
i = cx2
start = True
while i != cx2 or start:
start = False
if parent1[i] not in used_in_c2:
sequence_parent1.append(parent1[i])
if parent2[i] not in used_in_c1:
sequence_parent2.append(parent2[i])
i = (i + 1) % self.tour_size
while len(sequence_parent1) != 0:
child1[i] = sequence_parent2.pop(0)
child2[i] = sequence_parent1.pop(0)
i = (i + 1) % self.tour_size
return child1, child2
def sequential_constructive_crossover(self, individuals: np.array) -> np.array:
"""
Sequential constructive crossover (SCX) by Ahmed
https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.371.7771&rep=rep1&type=pdf
:param individuals: The parents to use
:return: The child created
"""
parent1, parent2 = individuals
child = np.zeros(self.tour_size)
child[0] = 0
nodes_used = {0}
for i in range(1, self.tour_size, 1):
previous_node = int(child[i - 1])
node_parent1 = self.find_first_node(previous_node, parent1, nodes_used)
node_parent2 = self.find_first_node(previous_node, parent2, nodes_used)
cost_parent1 = self.distance_matrix[previous_node][node_parent1]
cost_parent2 = self.distance_matrix[previous_node][node_parent2]
if cost_parent1 < cost_parent2:
nodes_used.add(node_parent1)
child[i] = node_parent1
else:
nodes_used.add(node_parent2)
child[i] = node_parent2
return child
def find_first_node(
self, previous_node: int, parent: np.array, nodes_used: set
) -> int:
"""
Finds the node after a given node that is not already used.
:param previous_node: The node to start from.
:param parent: The parent sequence in which to search the next possible node.
:param nodes_used: The nodes that are already used in the child.
:return: The first possible node found in the parent that matches the two criteria: if comes after previous_node
and it is not in the nodes_used set.
"""
index_previous_node = np.where(parent == previous_node)[0][0]
for i in range(index_previous_node + 1, self.tour_size, 1):
if parent[i] not in nodes_used:
return parent[i]
for i in range(1, self.tour_size):
if i not in nodes_used:
return i
return -1
##############
# MUTATION #
##############
def mutation(self, population: np.array, offspring: np.array) -> np.array:
"""
This method selects individuals to be mutated. Each individual has a self.alpha percent chance to be mutated.
In order to mutate the selected individuals, the mutation function in self.mutation_function is used. After
mutation the ne population is returned.
:param population: The population from the previous generation.
:param offspring: The offspring created in the current generation.
:return: Returns the combined population from the original population and the offspring in which some
individuals are mutated.
"""
joined_population = np.vstack((population, offspring))
mask = np.random.random(joined_population.shape[0]) < self.alpha
if not self.use_multiple_mutation_operators:
mutated_population = self.mutation_function(joined_population[mask])
return np.vstack((joined_population[~mask], mutated_population))
else:
probabilities = [0.50, 0.50] # RSM, PSM, HRPM
number_of_individuals = joined_population[mask].shape[0]
first = round(number_of_individuals * probabilities[0])
p1 = self.reverse_sequence_mutation(joined_population[mask][:first])
p2 = self.hybridizing_psm_rsm_mutation(joined_population[mask][first:])
return np.vstack((joined_population[mask], p1, p2))
def swap(self, population: np.array) -> np.array:
"""
The swap mutation swaps two random cities in the permutation with each other.
:param population: The population to be mutated.
:return: Returns the mutated population.
"""
random_indexes = np.floor(
np.random.random([population.shape[0], 2]) * self.tour_size
).astype("int")
for i, individual in enumerate(population):
individual[random_indexes[i, 0]], individual[random_indexes[i, 1]] = (
individual[random_indexes[i, 1]],
individual[random_indexes[i, 0]],
)
return population
def reverse_sequence_mutation(self, population: np.array) -> np.array:
"""
This mutation operator chooses two random points in the sequence and flips the subsequence between these
two crossover points. This operator is described in the research paper linked in the HRPM method.
:param population: The population to mutate.
:return: The mutated population.
"""
points = np.floor(
np.random.random([population.shape[0], 2]) * self.tour_size
).astype("int")
for i, individual in enumerate(population):
if points[i, 0] < points[i, 1]:
a, b = points[i]
else:
b, a = points[i]
individual = np.hstack(
[individual[:a], np.flip(individual[a:b]), individual[b:]]
)
population[i] = individual
return population
def partial_shuffle_mutation(self, population: np.array) -> np.array:
"""
This mutation operator chooses two random points in the sequence and randomly shuffles the subsequence between
these two crossover points. This operator is described in the research paper linked in the HRPM method.
:param population: The population to mutate.
:return: The mutated population.
"""
points = np.floor(
np.random.random([population.shape[0], 2]) * self.tour_size
).astype("int")
for i, individual in enumerate(population):
if points[i, 0] < points[i, 1]:
a, b = points[i]
else:
b, a = points[i]
population[i] = np.hstack(
[individual[:a], np.random.permutation(individual[a:b]), individual[b:]]
)
return population
def hybridizing_psm_rsm_mutation(self, population: np.array) -> np.array:
"""
A combination of the PSM and RSM mutation operator described in the following research paper:
https://www.researchgate.net/publication/282732991_A_New_Mutation_Operator_for_Solving_an_NP-Complete_Problem_Travelling_Salesman_Problem
:param population: The population to mutate
:return: The mutated population
"""
points = np.floor(
np.random.random([population.shape[0], 2]) * self.tour_size
).astype("int")
for i, individual in enumerate(population):
if points[i, 0] < points[i, 1]:
a, b = points[i]
else:
b, a = points[i]
while a < b:
individual[a], individual[b] = individual[b], individual[a]
if np.random.random() < self.alpha:
c = np.random.randint(0, self.tour_size)
individual[a], individual[c] = individual[c], individual[a]
a += 1
b -= 1
population[i] = individual
return population
#################
# ELIMINATION #
#################
def elimination(self, joined_population: np.array) -> Tuple[np.array, np.array]:
"""
Calls the elimination scheme specified in self.elimination_function
:param joined_population: The population to perform the elimination on
:return: The population after elimination sorted by fitness value and a list of the corresponding fitness values
"""
return self.elimination_function(joined_population)
def lambda_and_mu_elimination(
self, joined_population: np.array
) -> Tuple[np.array, np.array]:
"""
Performs (lambda + mu)-elimination on the population and offspring (joined population). The number of
individuals returned is equal to self.population_size.
:param joined_population: The joined population (population of the previous generation and offspring of the
current generation).
:return: A new population ordered by their fitness in ascending order and the scores of the individuals in this
new population.
"""
objective_values = self.length(joined_population)
perm = np.argsort(objective_values)
return (
joined_population[perm[: self.population_size]],
objective_values[perm[: self.population_size]],
)
def elitism(self, population: np.array, scores: np.array) -> np.array:
"""
This scheme is applied to the population after elimination to make sure that the fittest member of the previous
generation will also be part of the new generation (unless a better individual is already present in the current
generation.
:param scores:
:param population: The population to potentially add the fittest member of the previous generation to.
:return: The (altered) population.
"""
if (population[0] == self.best_solution).all() or self.best_solution is None:
return population, scores
else:
if self.length_individual(population[0]) < self.best_objective:
return population, scores
else:
population[-1] = self.best_solution
scores[-1] = self.best_objective
population[0], population[-1] = population[-1], population[0]
scores[0], scores[-1] = scores[-1], scores[0]
return population, scores
###############
# DIVERSITY #
###############
def fitness_sharing_elimination(self, population: np.array) -> Tuple[np.array, np.array]:
"""
This function tries to promote diversity in the population by giving a penalty to individuals that resemble too
much an individual already in the next generation.
:param population: The population to perform this scheme on
:return: The new population after the elimination scheme and their respective scores
"""
scores = self.length(population)
perm = np.argsort(scores)
population = population[perm]
scores = scores[perm]
new_population = np.zeros([self.population_size, self.tour_size], dtype=np.int)
new_population[0] = population[0]
penalties = np.ones([population.shape[0]])
penalties = self.compute_penalties(population[0], population, penalties)
for i in range(1, self.population_size):
corrected_scores = scores * penalties
new_population[i] = population[np.argsort(corrected_scores)[0]]
penalties = self.compute_penalties(new_population[i], population, penalties)
return new_population, self.length(new_population)
def compute_penalties(
self, individual_to_compute_distance_from, population, penalties
) -> np.array:
"""
This function computes the penalties introduced by an individual already in the next generation on the
remaining individuals in parallel.
:param individual_to_compute_distance_from: The individual that introduces the penalty on the rest of the
population
:param population: The population to add the penalties to
:param penalties: The penalties that already exist due to other individuals.
:return: The list with the penalties
"""
distance_function_parallel_partial = partial(
distance_function_parallel, perm1=individual_to_compute_distance_from
)
with Pool(processes=2) as pool:
result = pool.map(distance_function_parallel_partial, population)
result = np.array(result)
result = result / -self.sigma
result += 1
result[result < 0] = 0
return penalties + result
def distance_function(self, perm1: np.array, perm2: np.array) -> float:
"""
Computes the distance between two permutations by counting the edges of the first permutation that are not in
the second permutation.
:param perm1: The first permutation
:param perm2: The second permutation
:return: The distance between the two permutations
"""
distance = 0
for i in range(self.tour_size - 1):
element = perm1[i]
x = np.where(perm2 == element)[0][0]
y = 0 if x == self.tour_size - 1 else x + 1
if perm1[i + 1] != perm2[y]:
distance += 1
element = perm1[-1]
x = np.where(perm2 == element)[0][0]
if perm1[0] != perm2[(x + 1) % self.tour_size]:
distance += 1
return distance
def eliminate_duplicate_individuals(
self, joined_population: np.array, objective_values: np.array
) -> Tuple[np.array, np.array]:
"""
This function is an elimination scheme that tries to introduce diversity into the population. If there are two
identical individuals in the population then one of them will be replaced by a greedy randomized individual.
https://arxiv.org/pdf/1702.03594.pdf
:param joined_population: The population to perform the elimination scheme on.
:param objective_values: The objective values of the population in the same order
:return: Returns the new population and the respective scores.
"""
new_population = np.zeros([self.population_size, self.tour_size], dtype=np.int)
new_population[0] = joined_population[0]
for i in range(self.population_size - 1, 0, -1):
if objective_values[i] == objective_values[i - 1]:
new_population[i] = self.greedy_randomized_algorithm()
else:
new_population[i] = joined_population[i]
new_scores = self.length(new_population)
return new_population, new_scores
############################
# LOCAL SEARCH OPERATORS #
############################
def local_search_parallel(self, population: np.array) -> np.array:
"""
This method performs local search on the population in a parallel way by using the multiprocessing library. The
distance matrix is stored in a RawArray to make sure no locks can be applied and all child processes can use the
matrix without needed to pickle it (used in Queues and Pipes). In self.local_search_on_all is specified if the
operator should be applied on all individuals or on 50% of them.
:param population: The population to perform local search on.
:return: The (improved) population
"""
if self.local_search_on_all:
with Pool(processes=2) as pool:
result = pool.map(parallel_3_opt, population)
return np.vstack(result)
else:
random_numbers = np.random.random(population.shape[0])
random_numbers[0] = 1 # Always perform local search on the fittest individual
population_to_search = population[
random_numbers > self.percentage_local_search
]
rest = population[random_numbers <= self.percentage_local_search]
with Pool(processes=2) as pool:
result = pool.map(parallel_3_opt, population_to_search)
return np.vstack([result, rest])
def local_search(self, population: np.array) -> np.array:
"""
Performs local search on the population. The local search operator to use is specified in
self.local_search_operator. In self.local_search_on_all is specified if the operator should be applied on
all individuals or on 50% of them.
:param population: The population to perform local search on
:return: The (improved) population
"""
if self.local_search_on_all:
for i, individual in enumerate(population):
population[i] = self.local_search_operator(individual)
else:
random_numbers = np.random.random(population.shape[0])
random_numbers[
0
] = 1 # Always perform local search on the fittest individual
for i, individual in enumerate(population):
if random_numbers[i] > self.percentage_local_search:
population[i] = self.local_search_operator(individual)
return population
def local_search_swap(self, individual: np.array) -> np.array:
"""
This local search operator loops through the individual and if a combination ABCD is found for which the
combination ACBD is shorter then it swaps nodes B and C. This operator is very fast because it will only loop
through the individual once. But therefore it will not create very good solutions because it can only look at
four nodes at a time.
:param individual: The individual to perform the local search on.
:return: The (improved) individual
"""
for i in range(self.tour_size):
a = individual[i - 1]
b = individual[i]
c = individual[(i + 1) % self.tour_size]
d = individual[(i + 2) % self.tour_size]
normal_distance = (
self.distance_matrix[a][b]
+ self.distance_matrix[b][c]
+ self.distance_matrix[c][d]
)
new_distance = (
self.distance_matrix[a][c]
+ self.distance_matrix[c][b]
+ self.distance_matrix[b][d]
)
if new_distance < normal_distance:
individual[i], individual[(i + 1) % self.tour_size] = (
individual[(i + 1) % self.tour_size],
individual[i],
)
return individual
def local_search_naive_3_opt(self, individual: np.array) -> np.array:
"""
This local search operator searches for a local minimum by swapping two arcs in the individual. For an
individual with arcs ABC it creates individual ACB if the solution is shorter. To find possible shorter
solutions it loops through the list and creates two splitting points (the third is between the start and end
of the representation) to create the arcs. This is a very expensive approach, but the results are better than
the local_search_swap operator because of the bigger neighborhood structure.