Skip to content

Commit

Permalink
Merge pull request dmwm#4775 from ticoann/couch_view_default_stale
Browse files Browse the repository at this point in the history
set default stale for wq calls
  • Loading branch information
ericvaandering committed Sep 10, 2013
2 parents 096c7ae + af7b368 commit 098fa5e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,35 @@ def __init__(self, couchURL, dbName = None):
self.hostWithAuth = couchURL
self.server = CouchServer(couchURL)
self.db = self.server.connectDatabase(dbName, create = False)
self.defaultOptions = {'stale': "update_after", 'reduce' : True, 'group' : True}

def getTopLevelJobsByRequest(self):
"""Get data items we have work in the queue for"""
data = self.db.loadView('WorkQueue', 'jobsByRequest',
{'reduce' : True, 'group' : True})

data = self.db.loadView('WorkQueue', 'jobsByRequest', self.defaultOptions)
return [{'request_name' : x['key'],
'total_jobs' : x['value']} for x in data.get('rows', [])]

def getChildQueues(self):
"""Get data items we have work in the queue for"""
data = self.db.loadView('WorkQueue', 'childQueues',
{'reduce' : True, 'group' : True})
data = self.db.loadView('WorkQueue', 'childQueues', self.defaultOptions)
return [x['key'] for x in data.get('rows', [])]

def getChildQueuesByRequest(self):
"""Get data items we have work in the queue for"""
data = self.db.loadView('WorkQueue', 'childQueuesByRequest',
{'reduce' : True, 'group' : True})
self.defaultOptions)
return [{'request_name' : x['key'][0],
'local_queue' : x['key'][1]} for x in data.get('rows', [])]

def getWMBSUrl(self):
"""Get data items we have work in the queue for"""
data = self.db.loadView('WorkQueue', 'wmbsUrl',
{'reduce' : True, 'group' : True})
data = self.db.loadView('WorkQueue', 'wmbsUrl', self.defaultOptions)
return [x['key'] for x in data.get('rows', [])]

def getWMBSUrlByRequest(self):
"""Get data items we have work in the queue for"""
data = self.db.loadView('WorkQueue', 'wmbsUrlByRequest',
{'reduce' : True, 'group' : True})
data = self.db.loadView('WorkQueue', 'wmbsUrlByRequest', self.defaultOptions)
return [{'request_name' : x['key'][0],
'wmbs_url' : x['key'][1]} for x in data.get('rows', [])]

Expand All @@ -56,7 +54,7 @@ def getJobStatusByRequest(self):
This service only provided by global queue
"""
data = self.db.loadView('WorkQueue', 'jobStatusByRequest',
{'reduce' : True, 'group' : True})
self.defaultOptions)
return [{'request_name' : x['key'][0], 'status': x['key'][1],
'jobs' : x['value']} for x in data.get('rows', [])]

Expand All @@ -65,7 +63,7 @@ def getJobInjectStatusByRequest(self):
This service only provided by global queue
"""
data = self.db.loadView('WorkQueue', 'jobInjectStatusByRequest',
{'reduce' : True, 'group' : True})
self.defaultOptions)
return [{'request_name' : x['key'][0], x['key'][1]: x['value']}
for x in data.get('rows', [])]

Expand All @@ -74,7 +72,7 @@ def getAnalyticsData(self):
This getInject status and input dataset from workqueue
"""
results = self.db.loadView('WorkQueue', 'jobInjectStatusByRequest',
{'reduce' : True, 'group' : True})
self.defaultOptions)
statusByRequest = {}
for x in results.get('rows', []):
statusByRequest.setdefault(x['key'][0], {})
Expand All @@ -87,7 +85,7 @@ def getSiteWhitelistByRequest(self):
This service only provided by global queue
"""
data = self.db.loadView('WorkQueue', 'siteWhitelistByRequest',
{'reduce' : True, 'group' : True})
self.defaultOptions)
return [{'request_name' : x['key'][0], 'site_whitelist': x['key'][1]}
for x in data.get('rows', [])]

Expand All @@ -111,7 +109,7 @@ def getAvailableWorkflows(self):
"""Get the workflows that have all their elements
available in the workqueue"""
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'reduce' : False})
{'reduce' : False, 'stale': 'update_after'})
availableSet = set((x['value']['RequestName'], x['value']['Priority']) for x in data.get('rows', []) if x['key'][1] == 'Available')
notAvailableSet = set((x['value']['RequestName'], x['value']['Priority']) for x in data.get('rows', []) if x['key'][1] != 'Available')
return availableSet - notAvailableSet
Expand Down
6 changes: 6 additions & 0 deletions test/python/WMCore_t/Services_t/WorkQueue_t/WorkQueue_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def testWorkQueueService(self):
self.assertTrue(globalQ.queueWork(specUrl, "RerecoSpec", "teamA") > 0)

wqApi = WorkQueueDS(self.testInit.couchUrl, 'workqueue_t')
#overwrite default - can't test with stale view
wqApi.defaultOptions = {'reduce' : True, 'group' : True}
#This only checks minimum client call not exactly correctness of return
# values.
self.assertEqual(wqApi.getTopLevelJobsByRequest(),
Expand Down Expand Up @@ -93,6 +95,8 @@ def testUpdatePriorityService(self):
# Try a full chain of priority update and propagation
self.assertTrue(globalQ.queueWork(specUrl, "RerecoSpec", "teamA") > 0)
globalApi = WorkQueueDS(self.testInit.couchUrl, 'workqueue_t')
#overwrite default - can't test with stale view
globalApi.defaultOptions = {'reduce' : True, 'group' : True}
globalApi.updatePriority(specName, 100)
self.assertEqual(globalQ.backend.getWMSpec(specName).priority(), 100)
storedElements = globalQ.backend.getElementsForWorkflow(specName)
Expand All @@ -104,6 +108,8 @@ def testUpdatePriorityService(self):
for element in storedElements:
self.assertEqual(element['Priority'], 100)
localApi = WorkQueueDS(self.testInit.couchUrl, 'local_workqueue_t')
#overwrite default - can't test with stale view
localApi.defaultOptions = {'reduce' : True, 'group' : True}
localApi.updatePriority(specName, 500)
self.assertEqual(localQ.backend.getWMSpec(specName).priority(), 500)
storedElements = localQ.backend.getElementsForWorkflow(specName)
Expand Down

0 comments on commit 098fa5e

Please sign in to comment.