-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPGUtils.py
executable file
·191 lines (169 loc) · 6.51 KB
/
PGUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import psycopg2
import json
from math import log
from QPE.ImportantConfig import Config
class PGConfig:
def __init__(self):
self.keepExecutedPlan =True
self.maxTimes = 5
self.maxTime = 300000
latency_record_dict = {}
# selectivity_dict = {}
latency_record_file = None
config = Config()
class PGGRunner:
def __init__(self,dbname = '',user = '',password = '',host = '',port = '',need_latency_record = True,latency_file = "RecordFile.json"):
"""
:param dbname:
:param user:
:param password:
:param host:
:param port:
:param latencyRecord:-1:loadFromFile
:param latencyRecordFile:
"""
self.con = psycopg2.connect(database=dbname, user=user,
password=password, host=host, port=port)
self.cur = self.con.cursor()
self.config = PGConfig()
self.need_latency_record = need_latency_record
# self.cur.execute("load 'pg_hint_plan';")
global latency_record_file
self.cost_plan_json = {}
if need_latency_record:
latency_record_file = self.generateLatencyPool(latency_file)
def generateLatencyPool(self,fileName):
"""
:param fileName:
:return:
"""
import os
import json
# print('in generateLatencyPool')
if os.path.exists(fileName):
f = open(fileName,"r")
lines = f.readlines()
for line in lines:
data = json.loads(line)
global latency_record_dict
if data[0].find('/*+Leading')==-1:
if not data[0] in latency_record_dict:
latency_record_dict[data[0]] = data[1]
f = open(fileName,"a")
else:
f = open(fileName,"w")
return f
def addLatency(self,k,v):
latency_record_dict[k] = v
latency_record_file.write(json.dumps([k,v])+"\n")
latency_record_file.flush()
def getAnalysePlanJson(self,sql,timeout=300*1000):
if config.cost_test_for_debug:
raise
if sql in latency_record_dict:
return latency_record_dict[sql]
timeout += 300
try:
self.cur.execute("SET geqo_threshold = 12;")
self.cur.execute("SET statement_timeout = "+str(timeout)+ ";")
self.cur.execute("explain (COSTS, FORMAT JSON, ANALYSE) "+sql)
rows = self.cur.fetchall()
plan_json = rows[0][0][0]
plan_json['timeout'] = False
except KeyboardInterrupt:
raise
except:
plan_json = {}
plan_json['Planning Time'] = 20
plan_json['Plan'] = {'Actual Total Time':config.max_time_out}
plan_json['timeout'] = True
self.con.commit()
if not plan_json['timeout']:
self.addLatency(sql,plan_json)
return plan_json
def getLatency(self,sql,timeout = 300*1000):
"""
:param sql:a sqlSample object.
:return: the latency of sql
"""
if config.cost_test_for_debug:
raise
plan_json = self.getAnalysePlanJson(sql,timeout)
return plan_json['Plan']['Actual Total Time'],plan_json['timeout']
def getAnalysePlanJsonNoCache(self,sql,timeout=300*1000):
if config.cost_test_for_debug:
raise
timeout += 300
try:
self.cur.execute("SET geqo_threshold = 12;")
self.cur.execute("SET statement_timeout = "+str(timeout)+ ";")
self.cur.execute("explain (COSTS, FORMAT JSON, ANALYSE) "+sql)
rows = self.cur.fetchall()
plan_json = rows[0][0][0]
plan_json['timeout'] = False
except KeyboardInterrupt:
raise
except:
plan_json = {}
plan_json['Plan'] = {'Actual Total Time':config.max_time_out}
plan_json['timeout'] = True
self.con.commit()
return plan_json
def getLatencyNoCache(self,sql,timeout = 300*1000):
"""
:param sql:a sqlSample object.
:return: the latency of sql
"""
if config.cost_test_for_debug:
raise
plan_json = self.getAnalysePlanJsonNoCache(sql,timeout)
return plan_json['Plan']['Actual Total Time'],plan_json['timeout']
def getResult(self, sql):
"""
:param sql:a sqlSample object
:return: the latency of sql
"""
self.cur.execute("SET statement_timeout = 300000;")
import time
self.cur.execute(sql)
rows = self.cur.fetchall()
return rows
def getCostPlanJson(self,sql,timeout=300*1000):
if sql in self.cost_plan_json:
return self.cost_plan_json[sql]
import time
# startTime = time.time()
self.cur.execute("SET statement_timeout = "+str(timeout)+ ";")
self.cur.execute("SET geqo_threshold = 12;")
self.cur.execute("explain (COSTS, FORMAT JSON) "+sql)
rows = self.cur.fetchall()
plan_json = rows[0][0][0]
# plan_json['Planning Time'] = time.time()-startTime
self.cost_plan_json[sql] = plan_json
return plan_json
def getCost(self,sql):
"""
:param sql: a sqlSample object
:return: the cost of sql
"""
plan_json = self.getCostPlanJson(sql)
return plan_json['Plan']['Total Cost'],0
def getSelectivity(self,table,whereCondition):
global latency_record_dict
if whereCondition in latency_record_dict:
return latency_record_dict[whereCondition]
self.cur.execute("SET statement_timeout = "+str(int(100000))+ ";")
totalQuery = "select * from "+table+";"
self.cur.execute("EXPLAIN "+totalQuery)
rows = self.cur.fetchall()[0][0]
total_rows = int(rows.split("rows=")[-1].split(" ")[0])
resQuery = "select * from "+table+" Where "+whereCondition+";"
self.cur.execute("EXPLAIN "+resQuery)
rows = self.cur.fetchall()[0][0]
select_rows = int(rows.split("rows=")[-1].split(" ")[0])
latency_record_dict[whereCondition] = -log(select_rows/total_rows)
self.addLatency(whereCondition,-log(select_rows/total_rows))
return latency_record_dict[whereCondition]
from itertools import count
from pathlib import Path
pgrunner = PGGRunner(config.database,config.user,config.password,config.ip,config.port,need_latency_record=True,latency_file=config.latency_file)