Skip to content

Commit

Permalink
Add timestamps to status (#328)
Browse files Browse the repository at this point in the history
* Added timestamps to jobs

* copy timestamp from job to status

* Fixed tests

* Review feedback

* Added started at
  • Loading branch information
mdemare authored Sep 9, 2024
1 parent 014765c commit ca32e2b
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 66 deletions.
4 changes: 3 additions & 1 deletion src/nl/surf/eduhub_rio_mapper/endpoints/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
[ring.middleware.json :refer [wrap-json-response]]
[ring.util.response :as response])
(:import [java.net MalformedURLException URL]
[java.time Instant]
java.util.UUID))

(def server-stopping (atom false))
Expand All @@ -62,7 +63,8 @@
(if job
(let [token (UUID/randomUUID)]
(with-mdc {:token token}
(enqueue-fn (assoc job :token token)))
; store created-at in job itself as soon as it is created
(enqueue-fn (assoc job :token token :created-at (str (Instant/now)))))
(assoc res :body {:token token}))
res))))

Expand Down
21 changes: 15 additions & 6 deletions src/nl/surf/eduhub_rio_mapper/endpoints/status.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
[nl.surf.eduhub-rio-mapper.specs.rio :as rio]
[nl.surf.eduhub-rio-mapper.utils.http-utils :as http-utils]
[nl.surf.eduhub-rio-mapper.utils.logging :as logging]
[nl.surf.eduhub-rio-mapper.utils.redis :as redis]))
[nl.surf.eduhub-rio-mapper.utils.redis :as redis])
(:import [java.time Instant]))

(defn- status-key
[{:keys [redis-key-prefix]
Expand Down Expand Up @@ -99,14 +100,22 @@
(some-> x :errors :retryable? boolean)))

(defn make-set-status-fn [config]
(fn [{::job/keys [callback-url] :keys [token action] ::ooapi/keys [id type] :as job}
(fn [{::job/keys [callback-url] :keys [token action created-at started-at] ::ooapi/keys [id type] :as job}
status & [data]]
(let [opleidingseenheidcode (-> data :aanleveren_opleidingseenheid_response :opleidingseenheidcode)
aangeb-opleidingcode (-> data ::rio/aangeboden-opleiding-code)
value (cond-> {:status status
:token token
:action action
:resource (str type "/" id)}
value (cond-> {:status status
:token token
:action action
; copy created-at and started-at from job to status.
; We'll need it later whenever the status changes
:created-at created-at
:started-at started-at
:resource (str type "/" id)}

; set finished-at in status, not in job. the job is no longer needed.
(#{:done :error :time-out} status)
(assoc :finished-at (str (Instant/now)))

(and (= :done status)
opleidingseenheidcode)
Expand Down
98 changes: 51 additions & 47 deletions src/nl/surf/eduhub_rio_mapper/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
[taoensso.carmine :as car])
(:import java.io.EOFException
java.util.UUID
[java.net UnknownHostException]))
[java.net UnknownHostException]
[java.time Instant]))

(defn- prefix-key
[{:keys [redis-key-prefix]
Expand Down Expand Up @@ -225,51 +226,54 @@
(recover-aborted-job! config queue)

(when-let [job (pop-job! config queue)]
;; Don't count job as started while retrying it
(when (nil? (::retries job))
(metrics/increment-count config job :started))
;; run job asynchronous
(let [set-status-fn (metrics/wrap-increment-count config set-status-fn)
c (async/thread
(.setName (Thread/currentThread) (str "runner-" queue))
(run-job-fn job))]
(set-status-fn job :in-progress)

;; wait for job to finish and refresh lock regularly while waiting
(loop []
(let [result (async/alt!! c ([v] v)
(async/timeout timeout-ms) ::ping)]
(extend-lock! config queue @token lock-ttl-ms)

(cond
(= ::ping result)
(recur)

;; ack, success
(not (or (error-fn result)
(retryable-fn result)))
(set-status-fn job :done result)

;; ack, not retryable, log and set error status
(not (retryable-fn result))
(do
(logging/with-mdc
(:trace-context job)
(log/debugf "Job %s returns error %s" (pr-str job) (pr-str result)))
(set-status-fn job :error result))

;; nack, retryable error, too many retries
(>= (or (::retries job) 0) max-retries)
(set-status-fn job :time-out result)

;; nack, retryable error, schedule retry
:else
(do
(enqueue-first! config (update job ::retries (fnil inc 0)))
;; extend lock lease to retry-wait
(extend-lock! config queue @token retry-wait-ms)
;; prevent release
(reset! token nil))))))
; Set started-at when job is popped from queue. Only set it the first time, not during retries
(let [job (assoc job :started-at (or (:started-at job)
(str (Instant/now))))]
;; Don't count job as started while retrying it
(when (nil? (::retries job))
(metrics/increment-count config job :started))
;; run job asynchronous
(let [set-status-fn (metrics/wrap-increment-count config set-status-fn)
c (async/thread
(.setName (Thread/currentThread) (str "runner-" queue))
(run-job-fn job))]
(set-status-fn job :in-progress)

;; wait for job to finish and refresh lock regularly while waiting
(loop []
(let [result (async/alt!! c ([v] v)
(async/timeout timeout-ms) ::ping)]
(extend-lock! config queue @token lock-ttl-ms)

(cond
(= ::ping result)
(recur)

;; ack, success
(not (or (error-fn result)
(retryable-fn result)))
(set-status-fn job :done result)

;; ack, not retryable, log and set error status
(not (retryable-fn result))
(do
(logging/with-mdc
(:trace-context job)
(log/debugf "Job %s returns error %s" (pr-str job) (pr-str result)))
(set-status-fn job :error result))

;; nack, retryable error, too many retries
(>= (or (::retries job) 0) max-retries)
(set-status-fn job :time-out result)

;; nack, retryable error, schedule retry
:else
(do
(enqueue-first! config (update job ::retries (fnil inc 0)))
;; extend lock lease to retry-wait
(extend-lock! config queue @token retry-wait-ms)
;; prevent release
(reset! token nil)))))))

;; done, remove from busy
(job-done! config queue))
Expand Down Expand Up @@ -319,7 +323,7 @@
(if (ex-util/backtrace-matches-regex? ex #"carmine")
(RuntimeException. "Redis is not available")
ex))
(catch Exception ex
(catch Throwable ex
ex)
(finally
(reset! worker-busy false))))]))
Expand Down
33 changes: 27 additions & 6 deletions test/nl/surf/eduhub_rio_mapper/endpoints/api_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@
(is (-> res :body :token)
"token returned")

(is (= {:test "dummy"}
(-> @queue-atom first (dissoc :token)))
"job queued")
(let [job-result (-> @queue-atom first (dissoc :token))]
(is (= [:test :created-at] (keys job-result)))
(is (= {:test "dummy"}
(dissoc job-result :created-at))
"job queued"))

(is (-> @queue-atom first :token)
"job has token")
Expand Down Expand Up @@ -265,12 +267,16 @@

(status-set! {:token "test-pending"
:action "upsert"
:created-at "2024-08-30T08:41:34.929378Z"
:started-at nil
::ooapi/type "test"
::ooapi/id "314"}
:pending)

(status-set! {:token "test-error"
:action "link"
:created-at "2024-08-30T08:41:34.929378Z"
:started-at "2024-08-30T08:41:34.929378Z"
::ooapi/type "test"
::ooapi/id "3141"}
:error
Expand All @@ -281,6 +287,8 @@

(status-set! {:token "test-done"
:action "delete"
:created-at "2024-08-30T08:41:34.929378Z"
:started-at "2024-08-30T08:41:34.929378Z"
::ooapi/type "test"
::ooapi/id "31415"}
:done
Expand Down Expand Up @@ -310,8 +318,11 @@
:body {:status :pending
:action "upsert"
:token "test-pending"
:created-at true
:started-at nil
:resource "test/314"}}
(app {:token "test-pending"})))
(cond-> (app {:token "test-pending"})
:created-at (assoc-in [:body :created-at] true))))

;; test done status
(is (= {:token "test-done"
Expand All @@ -320,8 +331,13 @@
:action "delete"
:token "test-done"
:resource "test/31415"
:created-at true
:started-at "2024-08-30T08:41:34.929378Z"
:finished-at true
:attributes {:opleidingseenheidcode "code"}}}
(app {:token "test-done"})))
(cond-> (app {:token "test-done"})
:created-at (assoc-in [:body :created-at] true)
:finished-at (assoc-in [:body :finished-at] true))))

;; test error status
(is (= {:token "test-error"
Expand All @@ -330,9 +346,14 @@
:token "test-error"
:action "link"
:resource "test/3141"
:created-at true
:started-at "2024-08-30T08:41:34.929378Z"
:finished-at true
:phase "middle"
:message "error"}}
(app {:token "test-error"})))
(cond-> (app {:token "test-error"})
:created-at (assoc-in [:body :created-at] true)
:finished-at (assoc-in [:body :finished-at] true))))

(status/purge! config)))

Expand Down
6 changes: 5 additions & 1 deletion test/nl/surf/eduhub_rio_mapper/job_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
:action "delete"
::ooapi/type "course"
::ooapi/id "123123"
:created-at "2024-08-30T08:41:34.929378Z"
:started-at "2024-08-30T08:41:34.929378Z"
:token "12345"}
mock-webhook (fn mock-webhook [req]
(reset! last-seen-request-atom req)
Expand All @@ -75,6 +77,8 @@
:action "delete"
:resource "course/123123"
:attributes {:opleidingseenheidcode "123"}
:created-at "2024-08-30T08:41:34.929378Z"
:started-at "2024-08-30T08:41:34.929378Z"
:token "12345"}
(json/read-str (:body req) {:key-fn keyword})))
(dissoc (json/read-str (:body req) {:key-fn keyword}) :finished-at)))
(is (= (:url req) "https://github.com/")))))))
10 changes: 5 additions & 5 deletions test/nl/surf/eduhub_rio_mapper/worker_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
(assoc-in [:worker :max-retries] max-retries)
(assoc-in [:worker :run-job-fn]
(fn [job]
(reset! last-seen-job job)
(reset! last-seen-job (dissoc job :started-at))
:from-job))
(assoc-in [:worker :retryable-fn]
(fn [result]
Expand Down Expand Up @@ -109,7 +109,7 @@
(assoc-in [:worker :max-retries] max-retries)
(assoc-in [:worker :run-job-fn]
(fn [job]
(reset! last-seen-job job)
(reset! last-seen-job (dissoc job :started-at))
(::worker/retries job)))
(assoc-in [:worker :retryable-fn]
(fn [result]
Expand Down Expand Up @@ -137,7 +137,7 @@
(assoc-in [:worker :retry-wait-ms] retry-wait-ms)
(assoc-in [:worker :run-job-fn]
(fn [job]
(reset! last-seen-job job)
(reset! last-seen-job (dissoc job :started-at))
(::worker/retries job)))
(assoc-in [:worker :retryable-fn]
(fn [result]
Expand Down Expand Up @@ -170,9 +170,9 @@
(assoc-in [:worker :run-job-fn] identity)
(assoc-in [:worker :set-status-fn]
(fn [job status & [data]]
(reset! last-seen-status {:job job
(reset! last-seen-status {:job (dissoc job :started-at)
:status status
:data data}))))]
:data (dissoc data :started-at)}))))]
(worker/purge! config)

;; queue a successful job
Expand Down

0 comments on commit ca32e2b

Please sign in to comment.