diff --git a/src/nl/surf/eduhub_rio_mapper/endpoints/api.clj b/src/nl/surf/eduhub_rio_mapper/endpoints/api.clj index e81e6518..1ee4ec5f 100644 --- a/src/nl/surf/eduhub_rio_mapper/endpoints/api.clj +++ b/src/nl/surf/eduhub_rio_mapper/endpoints/api.clj @@ -39,6 +39,7 @@ [ring.middleware.json :refer [wrap-json-response]] [ring.util.response :as response]) (:import [java.net MalformedURLException URL] + [java.time Instant] java.util.UUID)) (def server-stopping (atom false)) @@ -62,7 +63,8 @@ (if job (let [token (UUID/randomUUID)] (with-mdc {:token token} - (enqueue-fn (assoc job :token token))) + ; store created-at in job itself as soon as it is created + (enqueue-fn (assoc job :token token :created-at (str (Instant/now))))) (assoc res :body {:token token})) res)))) diff --git a/src/nl/surf/eduhub_rio_mapper/endpoints/status.clj b/src/nl/surf/eduhub_rio_mapper/endpoints/status.clj index 6d05c987..f864f1c8 100644 --- a/src/nl/surf/eduhub_rio_mapper/endpoints/status.clj +++ b/src/nl/surf/eduhub_rio_mapper/endpoints/status.clj @@ -27,7 +27,8 @@ [nl.surf.eduhub-rio-mapper.specs.rio :as rio] [nl.surf.eduhub-rio-mapper.utils.http-utils :as http-utils] [nl.surf.eduhub-rio-mapper.utils.logging :as logging] - [nl.surf.eduhub-rio-mapper.utils.redis :as redis])) + [nl.surf.eduhub-rio-mapper.utils.redis :as redis]) + (:import [java.time Instant])) (defn- status-key [{:keys [redis-key-prefix] @@ -99,14 +100,22 @@ (some-> x :errors :retryable? boolean))) (defn make-set-status-fn [config] - (fn [{::job/keys [callback-url] :keys [token action] ::ooapi/keys [id type] :as job} + (fn [{::job/keys [callback-url] :keys [token action created-at started-at] ::ooapi/keys [id type] :as job} status & [data]] (let [opleidingseenheidcode (-> data :aanleveren_opleidingseenheid_response :opleidingseenheidcode) aangeb-opleidingcode (-> data ::rio/aangeboden-opleiding-code) - value (cond-> {:status status - :token token - :action action - :resource (str type "/" id)} + value (cond-> {:status status + :token token + :action action + ; copy created-at and started-at from job to status. + ; We'll need it later whenever the status changes + :created-at created-at + :started-at started-at + :resource (str type "/" id)} + + ; set finished-at in status, not in job. the job is no longer needed. + (#{:done :error :time-out} status) + (assoc :finished-at (str (Instant/now))) (and (= :done status) opleidingseenheidcode) diff --git a/src/nl/surf/eduhub_rio_mapper/worker.clj b/src/nl/surf/eduhub_rio_mapper/worker.clj index 86811d7c..d5b0cd0a 100644 --- a/src/nl/surf/eduhub_rio_mapper/worker.clj +++ b/src/nl/surf/eduhub_rio_mapper/worker.clj @@ -26,7 +26,8 @@ [taoensso.carmine :as car]) (:import java.io.EOFException java.util.UUID - [java.net UnknownHostException])) + [java.net UnknownHostException] + [java.time Instant])) (defn- prefix-key [{:keys [redis-key-prefix] @@ -225,51 +226,54 @@ (recover-aborted-job! config queue) (when-let [job (pop-job! config queue)] - ;; Don't count job as started while retrying it - (when (nil? (::retries job)) - (metrics/increment-count config job :started)) - ;; run job asynchronous - (let [set-status-fn (metrics/wrap-increment-count config set-status-fn) - c (async/thread - (.setName (Thread/currentThread) (str "runner-" queue)) - (run-job-fn job))] - (set-status-fn job :in-progress) - - ;; wait for job to finish and refresh lock regularly while waiting - (loop [] - (let [result (async/alt!! c ([v] v) - (async/timeout timeout-ms) ::ping)] - (extend-lock! config queue @token lock-ttl-ms) - - (cond - (= ::ping result) - (recur) - - ;; ack, success - (not (or (error-fn result) - (retryable-fn result))) - (set-status-fn job :done result) - - ;; ack, not retryable, log and set error status - (not (retryable-fn result)) - (do - (logging/with-mdc - (:trace-context job) - (log/debugf "Job %s returns error %s" (pr-str job) (pr-str result))) - (set-status-fn job :error result)) - - ;; nack, retryable error, too many retries - (>= (or (::retries job) 0) max-retries) - (set-status-fn job :time-out result) - - ;; nack, retryable error, schedule retry - :else - (do - (enqueue-first! config (update job ::retries (fnil inc 0))) - ;; extend lock lease to retry-wait - (extend-lock! config queue @token retry-wait-ms) - ;; prevent release - (reset! token nil)))))) + ; Set started-at when job is popped from queue. Only set it the first time, not during retries + (let [job (assoc job :started-at (or (:started-at job) + (str (Instant/now))))] + ;; Don't count job as started while retrying it + (when (nil? (::retries job)) + (metrics/increment-count config job :started)) + ;; run job asynchronous + (let [set-status-fn (metrics/wrap-increment-count config set-status-fn) + c (async/thread + (.setName (Thread/currentThread) (str "runner-" queue)) + (run-job-fn job))] + (set-status-fn job :in-progress) + + ;; wait for job to finish and refresh lock regularly while waiting + (loop [] + (let [result (async/alt!! c ([v] v) + (async/timeout timeout-ms) ::ping)] + (extend-lock! config queue @token lock-ttl-ms) + + (cond + (= ::ping result) + (recur) + + ;; ack, success + (not (or (error-fn result) + (retryable-fn result))) + (set-status-fn job :done result) + + ;; ack, not retryable, log and set error status + (not (retryable-fn result)) + (do + (logging/with-mdc + (:trace-context job) + (log/debugf "Job %s returns error %s" (pr-str job) (pr-str result))) + (set-status-fn job :error result)) + + ;; nack, retryable error, too many retries + (>= (or (::retries job) 0) max-retries) + (set-status-fn job :time-out result) + + ;; nack, retryable error, schedule retry + :else + (do + (enqueue-first! config (update job ::retries (fnil inc 0))) + ;; extend lock lease to retry-wait + (extend-lock! config queue @token retry-wait-ms) + ;; prevent release + (reset! token nil))))))) ;; done, remove from busy (job-done! config queue)) @@ -319,7 +323,7 @@ (if (ex-util/backtrace-matches-regex? ex #"carmine") (RuntimeException. "Redis is not available") ex)) - (catch Exception ex + (catch Throwable ex ex) (finally (reset! worker-busy false))))])) diff --git a/test/nl/surf/eduhub_rio_mapper/endpoints/api_test.clj b/test/nl/surf/eduhub_rio_mapper/endpoints/api_test.clj index 5b221cef..7adbe96f 100644 --- a/test/nl/surf/eduhub_rio_mapper/endpoints/api_test.clj +++ b/test/nl/surf/eduhub_rio_mapper/endpoints/api_test.clj @@ -230,9 +230,11 @@ (is (-> res :body :token) "token returned") - (is (= {:test "dummy"} - (-> @queue-atom first (dissoc :token))) - "job queued") + (let [job-result (-> @queue-atom first (dissoc :token))] + (is (= [:test :created-at] (keys job-result))) + (is (= {:test "dummy"} + (dissoc job-result :created-at)) + "job queued")) (is (-> @queue-atom first :token) "job has token") @@ -265,12 +267,16 @@ (status-set! {:token "test-pending" :action "upsert" + :created-at "2024-08-30T08:41:34.929378Z" + :started-at nil ::ooapi/type "test" ::ooapi/id "314"} :pending) (status-set! {:token "test-error" :action "link" + :created-at "2024-08-30T08:41:34.929378Z" + :started-at "2024-08-30T08:41:34.929378Z" ::ooapi/type "test" ::ooapi/id "3141"} :error @@ -281,6 +287,8 @@ (status-set! {:token "test-done" :action "delete" + :created-at "2024-08-30T08:41:34.929378Z" + :started-at "2024-08-30T08:41:34.929378Z" ::ooapi/type "test" ::ooapi/id "31415"} :done @@ -310,8 +318,11 @@ :body {:status :pending :action "upsert" :token "test-pending" + :created-at true + :started-at nil :resource "test/314"}} - (app {:token "test-pending"}))) + (cond-> (app {:token "test-pending"}) + :created-at (assoc-in [:body :created-at] true)))) ;; test done status (is (= {:token "test-done" @@ -320,8 +331,13 @@ :action "delete" :token "test-done" :resource "test/31415" + :created-at true + :started-at "2024-08-30T08:41:34.929378Z" + :finished-at true :attributes {:opleidingseenheidcode "code"}}} - (app {:token "test-done"}))) + (cond-> (app {:token "test-done"}) + :created-at (assoc-in [:body :created-at] true) + :finished-at (assoc-in [:body :finished-at] true)))) ;; test error status (is (= {:token "test-error" @@ -330,9 +346,14 @@ :token "test-error" :action "link" :resource "test/3141" + :created-at true + :started-at "2024-08-30T08:41:34.929378Z" + :finished-at true :phase "middle" :message "error"}} - (app {:token "test-error"}))) + (cond-> (app {:token "test-error"}) + :created-at (assoc-in [:body :created-at] true) + :finished-at (assoc-in [:body :finished-at] true)))) (status/purge! config))) diff --git a/test/nl/surf/eduhub_rio_mapper/job_test.clj b/test/nl/surf/eduhub_rio_mapper/job_test.clj index 5ac50d45..865cd008 100644 --- a/test/nl/surf/eduhub_rio_mapper/job_test.clj +++ b/test/nl/surf/eduhub_rio_mapper/job_test.clj @@ -61,6 +61,8 @@ :action "delete" ::ooapi/type "course" ::ooapi/id "123123" + :created-at "2024-08-30T08:41:34.929378Z" + :started-at "2024-08-30T08:41:34.929378Z" :token "12345"} mock-webhook (fn mock-webhook [req] (reset! last-seen-request-atom req) @@ -75,6 +77,8 @@ :action "delete" :resource "course/123123" :attributes {:opleidingseenheidcode "123"} + :created-at "2024-08-30T08:41:34.929378Z" + :started-at "2024-08-30T08:41:34.929378Z" :token "12345"} - (json/read-str (:body req) {:key-fn keyword}))) + (dissoc (json/read-str (:body req) {:key-fn keyword}) :finished-at))) (is (= (:url req) "https://github.com/"))))))) diff --git a/test/nl/surf/eduhub_rio_mapper/worker_test.clj b/test/nl/surf/eduhub_rio_mapper/worker_test.clj index 111d77aa..fbe65440 100644 --- a/test/nl/surf/eduhub_rio_mapper/worker_test.clj +++ b/test/nl/surf/eduhub_rio_mapper/worker_test.clj @@ -81,7 +81,7 @@ (assoc-in [:worker :max-retries] max-retries) (assoc-in [:worker :run-job-fn] (fn [job] - (reset! last-seen-job job) + (reset! last-seen-job (dissoc job :started-at)) :from-job)) (assoc-in [:worker :retryable-fn] (fn [result] @@ -109,7 +109,7 @@ (assoc-in [:worker :max-retries] max-retries) (assoc-in [:worker :run-job-fn] (fn [job] - (reset! last-seen-job job) + (reset! last-seen-job (dissoc job :started-at)) (::worker/retries job))) (assoc-in [:worker :retryable-fn] (fn [result] @@ -137,7 +137,7 @@ (assoc-in [:worker :retry-wait-ms] retry-wait-ms) (assoc-in [:worker :run-job-fn] (fn [job] - (reset! last-seen-job job) + (reset! last-seen-job (dissoc job :started-at)) (::worker/retries job))) (assoc-in [:worker :retryable-fn] (fn [result] @@ -170,9 +170,9 @@ (assoc-in [:worker :run-job-fn] identity) (assoc-in [:worker :set-status-fn] (fn [job status & [data]] - (reset! last-seen-status {:job job + (reset! last-seen-status {:job (dissoc job :started-at) :status status - :data data}))))] + :data (dissoc data :started-at)}))))] (worker/purge! config) ;; queue a successful job