From b82ddf9918ff33508644f58fe5276806fb06b812 Mon Sep 17 00:00:00 2001 From: Brandon Forehand Date: Fri, 31 Jul 2015 10:04:18 -0700 Subject: [PATCH] Drop trailing whitespace. --- base.lua | 22 +++++++++---------- job.lua | 46 +++++++++++++++++++-------------------- queue.lua | 60 +++++++++++++++++++++++++-------------------------- recurring.lua | 10 ++++----- worker.lua | 6 +++--- 5 files changed, 72 insertions(+), 72 deletions(-) diff --git a/base.lua b/base.lua index 2e8ac38..523e5aa 100644 --- a/base.lua +++ b/base.lua @@ -66,14 +66,14 @@ end -- If no group is provided, this returns a JSON blob of the counts of the -- various groups of failures known. If a group is provided, it will report up -- to `limit` from `start` of the jobs affected by that issue. --- +-- -- # If no group, then... -- { -- 'group1': 1, -- 'group2': 5, -- ... -- } --- +-- -- # If a group is provided, then... -- { -- 'total': 20, @@ -119,9 +119,9 @@ end ------------------------------------------------------------------------------- -- Return all the job ids currently considered to be in the provided state -- in a particular queue. The response is a list of job ids: --- +-- -- [ --- jid1, +-- jid1, -- jid2, -- ... -- ] @@ -167,7 +167,7 @@ end -- associated with that id, and 'untrack' stops tracking it. In this context, -- tracking is nothing more than saving the job to a list of jobs that are -- considered special. --- +-- -- { -- 'jobs': [ -- { @@ -252,7 +252,7 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] @@ -263,7 +263,7 @@ function Qless.tag(now, command, ...) redis.call('zadd', 'ql:t:' .. tag, now, jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags)) return tags else @@ -278,7 +278,7 @@ function Qless.tag(now, command, ...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] @@ -286,10 +286,10 @@ function Qless.tag(now, command, ...) redis.call('zrem', 'ql:t:' .. tag, jid) redis.call('zincrby', 'ql:tags', -1, tag) end - + local results = {} for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results)) return results else @@ -416,7 +416,7 @@ function Qless.cancel(...) redis.call('del', QlessJob.ns .. jid .. '-history') end end - + return arg end diff --git a/job.lua b/job.lua index a964870..4cadaab 100644 --- a/job.lua +++ b/job.lua @@ -56,11 +56,11 @@ end -- Complete a job and optionally put it in another queue, either scheduled or -- to be considered waiting immediately. It can also optionally accept other --- jids on which this job will be considered dependent before it's considered +-- jids on which this job will be considered dependent before it's considered -- valid. -- -- The variable-length arguments may be pairs of the form: --- +-- -- ('next' , queue) : The queue to advance it to next -- ('delay' , delay) : The delay for the next queue -- ('depends', : Json of jobs it depends on in the new queue @@ -75,7 +75,7 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) -- Read in all the optional parameters local options = {} for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end - + -- Sanity check on optional args local nextq = options['next'] local delay = assert(tonumber(options['delay'] or 0)) @@ -168,7 +168,7 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) if redis.call('zscore', 'ql:queues', nextq) == false then redis.call('zadd', 'ql:queues', now, nextq) end - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'waiting', 'worker', '', @@ -176,7 +176,7 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) 'queue', nextq, 'expires', 0, 'remaining', tonumber(retries)) - + if (delay > 0) and (#depends == 0) then queue_obj.scheduled.add(now + delay, self.jid) return 'scheduled' @@ -224,18 +224,18 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) 'queue', '', 'expires', 0, 'remaining', tonumber(retries)) - + -- Do the completion dance local count = Qless.config.get('jobs-history-count') local time = Qless.config.get('jobs-history') - + -- These are the default values count = tonumber(count or 50000) time = tonumber(time or 7 * 24 * 60 * 60) - + -- Schedule this job for destructination eventually redis.call('zadd', 'ql:completed', now, self.jid) - + -- Now look at the expired job data. First, based on the current time local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) -- Any jobs that need to be expired... delete @@ -251,7 +251,7 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) end -- And now remove those from the queued-for-cleanup queue redis.call('zremrangebyscore', 'ql:completed', 0, now - time) - + -- Now take the all by the most recent 'count' ids jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do @@ -265,7 +265,7 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) redis.call('del', QlessJob.ns .. jid .. '-history') end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) - + -- Alright, if this has any dependents, then we should go ahead -- and unstick those guys. for i, j in ipairs(redis.call( @@ -289,10 +289,10 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) end end end - + -- Delete our dependents key redis.call('del', QlessJob.ns .. self.jid .. '-dependents') - + return 'complete' end end @@ -303,14 +303,14 @@ end -- specific message. By `group`, we mean some phrase that might be one of -- several categorical modes of failure. The `message` is something more -- job-specific, like perhaps a traceback. --- +-- -- This method should __not__ be used to note that a job has been dropped or -- has failed in a transient way. This method __should__ be used to note that -- a job has something really wrong with it that must be remedied. --- +-- -- The motivation behind the `group` is so that similar errors can be grouped -- together. Optionally, updated data can be provided for the job. A job in --- any state can be marked as failed. If it has been given to a worker as a +-- any state can be marked as failed. If it has been given to a worker as a -- job, then its subsequent requests to heartbeat or complete that job will -- fail. Failed jobs are kept until they are canceled or completed. -- @@ -381,7 +381,7 @@ function QlessJob:fail(now, worker, group, message, data) queue_obj.locks.remove(self.jid) queue_obj.scheduled.remove(self.jid) - -- The reason that this appears here is that the above will fail if the + -- The reason that this appears here is that the above will fail if the -- job doesn't exist if data then redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) @@ -418,7 +418,7 @@ end -- Throws an exception if: -- - the worker is not the worker with a lock on the job -- - the job is not actually running --- +-- -- Otherwise, it returns the number of retries remaining. If the allowed -- retries have been exhausted, then it is automatically failed, and a negative -- number is returned. @@ -431,7 +431,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) assert(worker, 'Retry(): Arg "worker" missing') delay = assert(tonumber(delay or 0), 'Retry(): Arg "delay" not a number: ' .. tostring(delay)) - + -- Let's see what the old priority, and tags were local oldqueue, state, retries, oldworker, priority, failure = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', @@ -464,7 +464,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) -- queue it's in local group = group or 'failed-retries-' .. queue self:history(now, 'failed', {['group'] = group}) - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed', 'worker', '', 'expires', '') @@ -488,7 +488,7 @@ function QlessJob:retry(now, queue, worker, delay, group, message) ['worker'] = unpack(self:data('worker')) })) end - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types @@ -640,11 +640,11 @@ function QlessJob:heartbeat(now, worker, data) redis.call('hmset', QlessJob.ns .. self.jid, 'expires', expires, 'worker', worker) end - + -- Update hwen this job was last updated on that worker -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid) - + -- And now we should just update the locks local queue = Qless.queue( redis.call('hget', QlessJob.ns .. self.jid, 'queue')) diff --git a/queue.lua b/queue.lua index a1f7a3d..753bbd5 100644 --- a/queue.lua +++ b/queue.lua @@ -182,11 +182,11 @@ function QlessQueue:stats(now, date) local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk')) - + count = tonumber(count) or 0 mean = tonumber(mean) or 0 vk = tonumber(vk) - + results.count = count or 0 results.mean = mean or 0 results.histogram = {} @@ -236,8 +236,8 @@ function QlessQueue:peek(now, count) -- Now we've checked __all__ the locks for this queue the could -- have expired, and are no more than the number requested. If - -- we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) @@ -311,8 +311,8 @@ function QlessQueue:pop(now, worker, count) -- look for all the recurring jobs that need jobs run self:check_recurring(now, count - #jids) - -- If we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- If we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) @@ -334,19 +334,19 @@ function QlessQueue:pop(now, worker, count) self:stat(now, 'wait', waiting) redis.call('hset', QlessJob.ns .. jid, 'time', string.format("%.20f", now)) - + -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid) - + -- Update the jobs data, and add its locks, and return the job job:update({ worker = worker, expires = expires, state = 'running' }) - + self.locks.add(expires, jid) - + local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false if tracked then Qless.publish('popped', jid) @@ -397,7 +397,7 @@ function QlessQueue:stat(now, stat, val) redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1) else -- days redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1) - end + end redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk) end @@ -457,7 +457,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) -- Now find what's in the original, but not the new local original = redis.call( 'smembers', QlessJob.ns .. jid .. '-dependencies') - for _, dep in pairs(original) do + for _, dep in pairs(original) do if new[dep] == nil then -- Remove k as a dependency redis.call('srem', QlessJob.ns .. dep .. '-dependents' , jid) @@ -580,7 +580,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) end -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) @@ -650,7 +650,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) if #arg % 2 == 1 then error('Odd number of additional args: ' .. tostring(arg)) end - + -- Read in all the optional parameters local options = {} for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end @@ -670,12 +670,12 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue')) count = count or 0 - -- If it has previously been in another queue, then we should remove + -- If it has previously been in another queue, then we should remove -- some information about it if old_queue then Qless.queue(old_queue).recurring.remove(jid) end - + -- Do some insertions redis.call('hmset', 'ql:r:' .. jid, 'jid' , jid, @@ -693,14 +693,14 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) 'backlog' , options.backlog) -- Now, we should schedule the next run of the job self.recurring.add(now + offset, jid) - + -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) end - + return jid else error('Recur(): schedule type "' .. tostring(spec) .. '" unknown') @@ -746,22 +746,22 @@ function QlessQueue:check_recurring(now, count) ) end end - - -- We're saving this value so that in the history, we can accurately + + -- We're saving this value so that in the history, we can accurately -- reflect when the job would normally have been scheduled while (score <= now) and (moved < count) do local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1) moved = moved + 1 local child_jid = jid .. '-' .. count - + -- Add this job to the list of jobs tagged with whatever tags were -- supplied for i, tag in ipairs(_tags) do redis.call('zadd', 'ql:t:' .. tag, now, child_jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + -- First, let's save its data redis.call('hmset', QlessJob.ns .. child_jid, 'jid' , child_jid, @@ -778,12 +778,12 @@ function QlessQueue:check_recurring(now, count) 'time' , string.format("%.20f", score), 'spawned_from_jid', jid) Qless.job(child_jid):history(score, 'put', {q = self.name}) - + -- Now, if a delay was provided, and if it's in the future, -- then we'll have to schedule it. Otherwise, we're just -- going to add it to the work queue. self.work.add(score, priority, child_jid) - + score = score + interval self.recurring.add(score, jid) end @@ -798,7 +798,7 @@ function QlessQueue:check_scheduled(now, count) -- insert into the work queue local scheduled = self.scheduled.ready(now, 0, count) for index, jid in ipairs(scheduled) do - -- With these in hand, we'll have to go out and find the + -- With these in hand, we'll have to go out and find the -- priorities of these jobs, and then we'll insert them -- into the work queue and then when that's complete, we'll -- remove them from the scheduled queue @@ -883,7 +883,7 @@ function QlessQueue:invalidate_locks(now, count) -- See how many remaining retries the job has local remaining = tonumber(redis.call( 'hincrby', QlessJob.ns .. jid, 'remaining', -1)) - + -- This is where we actually have to time out the work if remaining < 0 then -- Now remove the instance from the schedule, and work queues @@ -891,7 +891,7 @@ function QlessQueue:invalidate_locks(now, count) self.work.remove(jid) self.locks.remove(jid) self.scheduled.remove(jid) - + local group = 'failed-retries-' .. Qless.job(jid):data()['queue'] local job = Qless.job(jid) job:history(now, 'failed', {group = group}) @@ -907,12 +907,12 @@ function QlessQueue:invalidate_locks(now, count) ['when'] = now, ['worker'] = unpack(job:data('worker')) })) - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types redis.call('lpush', 'ql:f:' .. group, jid) - + if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('failed', jid) end diff --git a/recurring.lua b/recurring.lua index 4765d15..70b41f8 100644 --- a/recurring.lua +++ b/recurring.lua @@ -3,11 +3,11 @@ function QlessRecurringJob:data() local job = redis.call( 'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue', 'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog') - + if not job[1] then return nil end - + return { jid = job[1], klass = job[2], @@ -30,7 +30,7 @@ end -- - data -- - klass -- - queue --- - backlog +-- - backlog function QlessRecurringJob:update(now, ...) local options = {} -- Make sure that the job exists @@ -88,10 +88,10 @@ function QlessRecurringJob:tag(...) tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=1,#arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end - + tags = cjson.encode(tags) redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) return tags diff --git a/worker.lua b/worker.lua index b3ef835..7e1c0c3 100644 --- a/worker.lua +++ b/worker.lua @@ -6,7 +6,7 @@ end -- Provide data about all the workers, or if a specific worker is provided, -- then which jobs that worker is responsible for. If no worker is provided, -- expect a response of the form: --- +-- -- [ -- # This is sorted by the recency of activity from that worker -- { @@ -17,9 +17,9 @@ end -- ... -- } -- ] --- +-- -- If a worker id is provided, then expect a response of the form: --- +-- -- { -- 'jobs': [ -- jid1,